You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/03 10:08:26 UTC
[inlong] branch master updated: [INLONG-4976][Manager] The Manager module supports the use of Kafka (#5389)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 790f797c6 [INLONG-4976][Manager] The Manager module supports the use of Kafka (#5389)
790f797c6 is described below
commit 790f797c62f764f763de9a6d251ef06253e27866
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Sat Sep 3 18:08:21 2022 +0800
[INLONG-4976][Manager] The Manager module supports the use of Kafka (#5389)
---
.../inlong/manager/common/enums/ClusterType.java | 1 +
.../inlong/manager/pojo/cluster/ClusterInfo.java | 2 +-
.../pojo/cluster/kafka/KafkaClusterDTO.java | 62 +++++++
.../pojo/cluster/kafka/KafkaClusterInfo.java | 50 ++++++
.../pojo/cluster/kafka/KafkaClusterRequest.java} | 27 ++-
.../manager/pojo/group/InlongGroupTopicInfo.java | 3 +
.../manager/pojo/group/kafka/InlongKafkaDTO.java | 69 ++++++++
.../manager/pojo/group/kafka/InlongKafkaInfo.java | 59 +++++++
.../pojo/group/kafka/InlongKafkaRequest.java | 51 ++++++
.../service/cluster/KafkaClusterOperator.java | 88 ++++++++++
.../manager/service/group/InlongKafkaOperator.java | 95 ++++++++++
.../ConsumptionCompleteProcessListener.java | 3 +
.../resource/queue/kafka/KafkaOperator.java | 101 +++++++++++
.../queue/kafka/KafkaResourceOperators.java | 191 +++++++++++++++++++++
.../service/resource/queue/kafka/KafkaUtils.java | 63 +++++++
15 files changed, 857 insertions(+), 8 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
index 8862bad9f..8b433888e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
@@ -26,4 +26,5 @@ public class ClusterType {
public static final String TUBEMQ = "TUBEMQ";
public static final String PULSAR = "PULSAR";
public static final String DATAPROXY = "DATAPROXY";
+ public static final String KAFKA = "KAFKA";
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java
index 92a766c29..0a2a31f92 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java
@@ -45,7 +45,7 @@ public abstract class ClusterInfo {
@ApiModelProperty(value = "Cluster name")
private String name;
- @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, DATAPROXY, etc.")
+ @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, KAFKA, DATAPROXY, etc.")
private String type;
@ApiModelProperty(value = "Cluster url")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
new file mode 100644
index 000000000..79b4f9039
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.kafka;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Kafka cluster info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@ApiModel("Kafka cluster info")
+public class KafkaClusterDTO {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // thread safe
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static KafkaClusterDTO getFromRequest(KafkaClusterRequest request) {
+ return KafkaClusterDTO.builder()
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static KafkaClusterDTO getFromJson(@NotNull String extParams) {
+ try {
+ return OBJECT_MAPPER.readValue(extParams, KafkaClusterDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterInfo.java
new file mode 100644
index 000000000..b27f092f8
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+
+/**
+ * Inlong cluster info for Kafka
+ */
+@Data
+@SuperBuilder
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.KAFKA)
+@ApiModel("Inlong cluster info for Kafka")
+public class KafkaClusterInfo extends ClusterInfo {
+
+ public KafkaClusterInfo() {
+ this.setType(ClusterType.KAFKA);
+ }
+
+ @Override
+ public KafkaClusterRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, KafkaClusterRequest::new);
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterRequest.java
similarity index 53%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterRequest.java
index 8862bad9f..e89bf41fd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterRequest.java
@@ -15,15 +15,28 @@
* limitations under the License.
*/
-package org.apache.inlong.manager.common.enums;
+package org.apache.inlong.manager.pojo.cluster.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
/**
- * Constant of cluster type.
+ * Inlong cluster request for Kafka
*/
-public class ClusterType {
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.KAFKA)
+@ApiModel("Inlong cluster request for Kafka")
+public class KafkaClusterRequest extends ClusterRequest {
+
+ public KafkaClusterRequest() {
+ this.setType(ClusterType.KAFKA);
+ }
- public static final String AGENT = "AGENT";
- public static final String TUBEMQ = "TUBEMQ";
- public static final String PULSAR = "PULSAR";
- public static final String DATAPROXY = "DATAPROXY";
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
index 9fe1563d1..e23a2ba78 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
@@ -53,4 +53,7 @@ public class InlongGroupTopicInfo {
@ApiModelProperty(value = "Pulsar admin URL")
private String pulsarAdminUrl;
+ @ApiModelProperty(value = "Kafka admin URL")
+ private String kafkaBootstrapServers;
+
}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
new file mode 100644
index 000000000..7ab8220e6
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong group info for Kafka
+ */
+@Data
+@Builder
+@ApiModel("Inlong group info for Kafka")
+public class InlongKafkaDTO {
+
+ // partition number
+ private Integer numPartitions;
+ // replicationFactor number
+ private Short replicationFactor = 1;
+ //consumer grouping
+ private String groupId;
+ // autocommit interval
+ private String autoCommit;
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+ .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // thread safe
+
+ /**
+ * Get the dto instance from the request
+ */
+ public static InlongKafkaDTO getFromRequest(InlongKafkaRequest request) {
+ return InlongKafkaDTO.builder()
+ .build();
+ }
+
+ /**
+ * Get the dto instance from the JSON string.
+ */
+ public static InlongKafkaDTO getFromJson(@NotNull String extParams) {
+ try {
+ return OBJECT_MAPPER.readValue(extParams, InlongKafkaDTO.class);
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
new file mode 100644
index 000000000..cc9d36668
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Inlong group info for Kafka
+ */
+@Data
+@SuperBuilder
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.KAFKA)
+@ApiModel("Inlong group info for Kafka")
+public class InlongKafkaInfo extends InlongGroupInfo {
+
+ // partition number
+ private Integer numPartitions;
+ // replicationFactor number
+ private Short replicationFactor = 1;
+ //consumer grouping
+ private String groupId;
+ // autocommit interval
+ private String autoCommit;
+
+ public InlongKafkaInfo() {
+ this.setMqType(MQType.KAFKA);
+ }
+
+ @Override
+ public InlongKafkaRequest genRequest() {
+ return CommonBeanUtils.copyProperties(this, InlongKafkaRequest::new);
+ }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
new file mode 100644
index 000000000..67cfe0550
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+
+/**
+ * Inlong group request for Kafka
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Inlong group request for Kafka")
+@JsonTypeDefine(value = MQType.KAFKA)
+public class InlongKafkaRequest extends InlongGroupRequest {
+
+ // partition number
+ private Integer numPartitions;
+ // replicationFactor number
+ private Short replicationFactor = 1;
+ //consumer grouping
+ private String groupId;
+ // autocommit interval
+ private String autoCommit;
+
+ public InlongKafkaRequest() {
+ this.setMqType(MQType.KAFKA);
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
new file mode 100644
index 000000000..4578f9e39
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Kafka cluster operator.
+ */
+@Service
+public class KafkaClusterOperator extends AbstractClusterOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClusterOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String clusterType) {
+ return getClusterType().equals(clusterType);
+ }
+
+ @Override
+ public String getClusterType() {
+ return ClusterType.KAFKA;
+ }
+
+ @Override
+ public ClusterInfo getFromEntity(InlongClusterEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+ }
+
+ KafkaClusterInfo kafkaClusterInfo = new KafkaClusterInfo();
+ CommonBeanUtils.copyProperties(entity, kafkaClusterInfo);
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ KafkaClusterDTO dto = KafkaClusterDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, kafkaClusterInfo);
+ }
+
+ LOGGER.info("success to get kafka cluster info from entity");
+ return kafkaClusterInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) {
+ KafkaClusterRequest kafkaRequest = (KafkaClusterRequest) request;
+ CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
+ try {
+ KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ LOGGER.info("success to set entity for kafka cluster");
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
new file mode 100644
index 000000000..24807e5d2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.group;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaDTO;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Inlong group operator for Kafka.
+ */
+@Service
+public class InlongKafkaOperator extends AbstractGroupOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InlongKafkaOperator.class);
+
+ @Autowired
+ private ObjectMapper objectMapper;
+
+ @Override
+ public Boolean accept(String mqType) {
+ return getMQType().equals(mqType);
+ }
+
+ @Override
+ public String getMQType() {
+ return MQType.KAFKA;
+ }
+
+ @Override
+ public InlongGroupInfo getFromEntity(InlongGroupEntity entity) {
+ if (entity == null) {
+ throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+ }
+
+ InlongKafkaInfo kafkaInfo = new InlongKafkaInfo();
+ CommonBeanUtils.copyProperties(entity, kafkaInfo);
+
+ if (StringUtils.isNotBlank(entity.getExtParams())) {
+ InlongKafkaDTO dto = InlongKafkaDTO.getFromJson(entity.getExtParams());
+ CommonBeanUtils.copyProperties(dto, kafkaInfo);
+ }
+
+ return kafkaInfo;
+ }
+
+ @Override
+ protected void setTargetEntity(InlongGroupRequest request, InlongGroupEntity targetEntity) {
+ InlongKafkaRequest kafkaRequest = (InlongKafkaRequest) request;
+ CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
+ try {
+ InlongKafkaDTO dto = InlongKafkaDTO.getFromRequest(kafkaRequest);
+ targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+ } catch (Exception e) {
+ throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+ }
+ LOGGER.info("success set entity for inlong group with Kafka");
+ }
+
+ @Override
+ public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
+ InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
+ return topicInfo;
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
index e9d0795b0..3b7ac58ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
@@ -91,6 +91,9 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
return ListenerResult.success("Create TubeMQ consumer group successful");
} else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
this.createPulsarSubscription(entity);
+ } else if (MQType.KAFKA.equals(mqType)) {
+ //TODO add kakfa
+
} else {
throw new WorkflowListenerException("Unsupported MQ type " + mqType);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
new file mode 100644
index 000000000..3d3a7e063
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * kafka operator, supports creating topics and creating subscription.
+ */
+@Service
+public class KafkaOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
+
+ /**
+ * Create Kafka topic inlongKafkaInfo
+ */
+ public void createTopic(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo, String topicName)
+ throws InterruptedException, ExecutionException {
+ AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
+ NewTopic topic = new NewTopic(topicName,
+ inlongKafkaInfo.getNumPartitions(),
+ inlongKafkaInfo.getReplicationFactor());
+ CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(topic));
+ // To prevent the client from disconnecting too quickly and causing the Topic to not be created successfully
+ Thread.sleep(500);
+ LOGGER.info("success to create kafka topic={}, with={} numPartitions",
+ topicName,
+ result.numPartitions(topicName).get());
+ }
+
+ /**
+ * Force delete Kafka topic
+ */
+ public void forceDeleteTopic(KafkaClusterInfo kafkaClusterInfo, String topicName) {
+ AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
+ DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(topicName));
+ LOGGER.info("success to delete topic={}", topicName);
+ }
+
+ public boolean topicIsExists(KafkaClusterInfo kafkaClusterInfo, String topic)
+ throws ExecutionException, InterruptedException {
+ AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
+ Set<String> topicList = adminClient.listTopics().names().get();
+ return topicList.contains(topic);
+ }
+
+ public void createSubscription(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
+ String subscription) {
+
+ KafkaConsumer kafkaConsumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo);
+ // subscription
+ kafkaConsumer.subscribe(Collections.singletonList(subscription));
+ }
+
+ public boolean subscriptionIsExists(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
+ String topic) {
+ try (KafkaConsumer consumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo)) {
+ Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+ List<PartitionInfo> partitions = topics.get(topic);
+ if (partitions == null) {
+ LOGGER.info("subscription is not exist");
+ return false;
+ }
+ return true;
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
new file mode 100644
index 000000000..7f06633bd
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Operator for creating Kafka Topic and Subscription
+ */
+@Slf4j
+@Service
+public class KafkaResourceOperators implements QueueResourceOperator {
+
+ @Autowired
+ private InlongClusterService clusterService;
+ @Autowired
+ private InlongStreamService streamService;
+ @Autowired
+ private KafkaOperator kafkaOperator;
+ @Autowired
+ private ConsumptionService consumptionService;
+
+ @Override
+ public boolean accept(String mqType) {
+ return MQType.KAFKA.equals(mqType);
+ }
+
+ @Override
+ public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
+ String groupId = groupInfo.getInlongGroupId();
+ log.info("begin to create kafka resource for groupId={}", groupId);
+
+ InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
+ try {
+ // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
+ List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+ if (streamInfoList == null || streamInfoList.isEmpty()) {
+ log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
+ return;
+ }
+ for (InlongStreamBriefInfo streamInfo : streamInfoList) {
+ this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId());
+ }
+ } catch (Exception e) {
+ String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg + ": " + e.getMessage());
+ }
+ log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+ }
+
+ @Override
+ public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
+ Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
+
+ String groupId = groupInfo.getInlongGroupId();
+ log.info("begin to delete kafka resource for groupId={}", groupId);
+ ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+ try {
+ List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+ if (streamInfoList == null || streamInfoList.isEmpty()) {
+ log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
+ return;
+ }
+ for (InlongStreamBriefInfo streamInfo : streamInfoList) {
+ this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
+ }
+ } catch (Exception e) {
+ log.error("failed to delete kafka resource for groupId=" + groupId, e);
+ throw new WorkflowListenerException("failed to delete kafka resource: " + e.getMessage());
+ }
+ log.info("success to delete kafka resource for groupId={}, cluster={}", groupId, clusterInfo);
+ }
+
+ @Override
+ public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+ String operator) {
+ Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
+ Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
+ Preconditions.checkNotNull(operator, "operator cannot be null");
+
+ String groupId = streamInfo.getInlongGroupId();
+ String streamId = streamInfo.getInlongStreamId();
+ log.info("begin to create kafka resource for groupId={}, streamId={}", groupId, streamId);
+
+ try {
+ InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
+ // create kafka topic
+ this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId());
+ } catch (Exception e) {
+ String msg = String.format("failed to create kafka topic for groupId=%s, streamId=%s", groupId, streamId);
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg + ": " + e.getMessage());
+ }
+ log.info("success to create kafka resource for groupId={}, streamId={}", groupId, streamId);
+ }
+
+ @Override
+ public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+ String operator) {
+ Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
+ Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
+
+ String groupId = streamInfo.getInlongGroupId();
+ String streamId = streamInfo.getInlongStreamId();
+ log.info("begin to delete kafka resource for groupId={} streamId={}", groupId, streamId);
+
+ try {
+ this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource());
+ log.info("success to delete kafka topic for groupId={}, streamId={}", groupId, streamId);
+ } catch (Exception e) {
+ String msg = String.format("failed to delete kafka topic for groupId=%s, streamId=%s", groupId, streamId);
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg);
+ }
+ log.info("success to delete kafka resource for groupId={}, streamId={}", groupId, streamId);
+ }
+
+ /**
+ * Create Kafka Topic and Subscription, and save the consumer group info.
+ */
+ private void createKafkaTopic(InlongKafkaInfo inlongKafkaInfo, String streamId)
+ throws Exception {
+ // 1. create kafka topic
+ ClusterInfo clusterInfo = clusterService.getOne(inlongKafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+ String topicName = inlongKafkaInfo.getInlongGroupId() + "_" + streamId;
+ kafkaOperator.createTopic(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+
+ boolean exist = kafkaOperator.topicIsExists((KafkaClusterInfo) clusterInfo, topicName);
+ if (!exist) {
+ String bootStrapServers = clusterInfo.getUrl();
+ log.error("topic={} not exists in {}", topicName, bootStrapServers);
+ throw new WorkflowListenerException("topic=" + topicName + " not exists in " + bootStrapServers);
+ }
+
+ // 2. create a subscription for the kafka topic
+ kafkaOperator.createSubscription(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+ String groupId = inlongKafkaInfo.getInlongGroupId();
+ log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}",
+ groupId, topicName, topicName);
+
+ // 3. insert the consumer group info into the consumption table
+ consumptionService.saveSortConsumption(inlongKafkaInfo, topicName, topicName);
+ log.info("success to save consume for groupId={}, topic={}, subs={}", groupId, topicName, topicName);
+ }
+
+ /**
+ * Delete Kafka Topic and Subscription, and delete the consumer group info.
+ */
+ private void deleteKafkaTopic(InlongGroupInfo groupInfo, String streamId) {
+ ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+ String topicName = groupInfo.getInlongGroupId() + "_" + streamId;
+ kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo, topicName);
+ }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
new file mode 100644
index 000000000..d793b8ce4
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+/**
+ * kafka connection utils
+ */
+@Slf4j
+public class KafkaUtils {
+
+ public static AdminClient getAdminClient(KafkaClusterInfo kafkaClusterInfo) {
+ Properties properties = new Properties();
+ // Configure the access address and port number of the Kafka service
+ properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getUrl());
+ // Create AdminClient instance
+ return AdminClient.create(properties);
+ }
+
+ public static KafkaConsumer createKafkaConsumer(InlongKafkaInfo inlongKafkaInfo,KafkaClusterInfo kafkaClusterInfo) {
+ Properties properties = new Properties();
+ // The connected kafka cluster address
+ properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getUrl());
+ // consumer grouping
+ properties.put(ConsumerConfig.GROUP_ID_CONFIG, inlongKafkaInfo.getGroupId());
+ // Confirm Auto Commit
+ properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+ // autocommit interval
+ properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+ // Serialization
+ properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.IntegerDeserializer");
+ properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ // For different groupid to ensure that the previous message can be consumed, reset the offset
+ properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ return new KafkaConsumer(properties);
+ }
+}