You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/09/03 10:08:26 UTC

[inlong] branch master updated: [INLONG-4976][Manager] The Manager module supports the use of Kafka (#5389)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 790f797c6 [INLONG-4976][Manager] The Manager module supports the use of Kafka (#5389)
790f797c6 is described below

commit 790f797c62f764f763de9a6d251ef06253e27866
Author: Xieqijun <44...@users.noreply.github.com>
AuthorDate: Sat Sep 3 18:08:21 2022 +0800

    [INLONG-4976][Manager] The Manager module supports the use of Kafka (#5389)
---
 .../inlong/manager/common/enums/ClusterType.java   |   1 +
 .../inlong/manager/pojo/cluster/ClusterInfo.java   |   2 +-
 .../pojo/cluster/kafka/KafkaClusterDTO.java        |  62 +++++++
 .../pojo/cluster/kafka/KafkaClusterInfo.java       |  50 ++++++
 .../pojo/cluster/kafka/KafkaClusterRequest.java}   |  27 ++-
 .../manager/pojo/group/InlongGroupTopicInfo.java   |   3 +
 .../manager/pojo/group/kafka/InlongKafkaDTO.java   |  69 ++++++++
 .../manager/pojo/group/kafka/InlongKafkaInfo.java  |  59 +++++++
 .../pojo/group/kafka/InlongKafkaRequest.java       |  51 ++++++
 .../service/cluster/KafkaClusterOperator.java      |  88 ++++++++++
 .../manager/service/group/InlongKafkaOperator.java |  95 ++++++++++
 .../ConsumptionCompleteProcessListener.java        |   3 +
 .../resource/queue/kafka/KafkaOperator.java        | 101 +++++++++++
 .../queue/kafka/KafkaResourceOperators.java        | 191 +++++++++++++++++++++
 .../service/resource/queue/kafka/KafkaUtils.java   |  63 +++++++
 15 files changed, 857 insertions(+), 8 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
index 8862bad9f..8b433888e 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
@@ -26,4 +26,5 @@ public class ClusterType {
     public static final String TUBEMQ = "TUBEMQ";
     public static final String PULSAR = "PULSAR";
     public static final String DATAPROXY = "DATAPROXY";
+    public static final String KAFKA = "KAFKA";
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java
index 92a766c29..0a2a31f92 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/ClusterInfo.java
@@ -45,7 +45,7 @@ public abstract class ClusterInfo {
     @ApiModelProperty(value = "Cluster name")
     private String name;
 
-    @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, DATAPROXY, etc.")
+    @ApiModelProperty(value = "Cluster type, including TUBEMQ, PULSAR, KAFKA, DATAPROXY, etc.")
     private String type;
 
     @ApiModelProperty(value = "Cluster url")
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
new file mode 100644
index 000000000..79b4f9039
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterDTO.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.kafka;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Kafka cluster info
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@ApiModel("Kafka cluster info")
+public class KafkaClusterDTO {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // thread safe
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static KafkaClusterDTO getFromRequest(KafkaClusterRequest request) {
+        return KafkaClusterDTO.builder()
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static KafkaClusterDTO getFromJson(@NotNull String extParams) {
+        try {
+            return OBJECT_MAPPER.readValue(extParams, KafkaClusterDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterInfo.java
new file mode 100644
index 000000000..b27f092f8
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterInfo.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.cluster.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+
+/**
+ * Inlong cluster info for Kafka
+ */
+@Data
+@SuperBuilder
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.KAFKA)
+@ApiModel("Inlong cluster info for Kafka")
+public class KafkaClusterInfo extends ClusterInfo {
+
+    public KafkaClusterInfo() {
+        this.setType(ClusterType.KAFKA);
+    }
+
+    @Override
+    public KafkaClusterRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, KafkaClusterRequest::new);
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterRequest.java
similarity index 53%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
copy to inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterRequest.java
index 8862bad9f..e89bf41fd 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/ClusterType.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/cluster/kafka/KafkaClusterRequest.java
@@ -15,15 +15,28 @@
  * limitations under the License.
  */
 
-package org.apache.inlong.manager.common.enums;
+package org.apache.inlong.manager.pojo.cluster.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
 
 /**
- * Constant of cluster type.
+ * Inlong cluster request for Kafka
  */
-public class ClusterType {
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = ClusterType.KAFKA)
+@ApiModel("Inlong cluster request for Kafka")
+public class KafkaClusterRequest extends ClusterRequest {
+
+    public KafkaClusterRequest() {
+        this.setType(ClusterType.KAFKA);
+    }
 
-    public static final String AGENT = "AGENT";
-    public static final String TUBEMQ = "TUBEMQ";
-    public static final String PULSAR = "PULSAR";
-    public static final String DATAPROXY = "DATAPROXY";
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
index 9fe1563d1..e23a2ba78 100644
--- a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/InlongGroupTopicInfo.java
@@ -53,4 +53,7 @@ public class InlongGroupTopicInfo {
     @ApiModelProperty(value = "Pulsar admin URL")
     private String pulsarAdminUrl;
 
+    @ApiModelProperty(value = "Kafka admin URL")
+    private String kafkaBootstrapServers;
+
 }
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
new file mode 100644
index 000000000..7ab8220e6
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaDTO.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import io.swagger.annotations.ApiModel;
+import lombok.Builder;
+import lombok.Data;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * Inlong group info for Kafka
+ */
+@Data
+@Builder
+@ApiModel("Inlong group info for Kafka")
+public class InlongKafkaDTO {
+
+    // partition number
+    private Integer numPartitions;
+    // replicationFactor number
+    private Short replicationFactor = 1;
+    //consumer grouping
+    private String groupId;
+    // autocommit interval
+    private String autoCommit;
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper()
+            .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); // thread safe
+
+    /**
+     * Get the dto instance from the request
+     */
+    public static InlongKafkaDTO getFromRequest(InlongKafkaRequest request) {
+        return InlongKafkaDTO.builder()
+                .build();
+    }
+
+    /**
+     * Get the dto instance from the JSON string.
+     */
+    public static InlongKafkaDTO getFromJson(@NotNull String extParams) {
+        try {
+            return OBJECT_MAPPER.readValue(extParams, InlongKafkaDTO.class);
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
new file mode 100644
index 000000000..cc9d36668
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaInfo.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import lombok.experimental.SuperBuilder;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+
+/**
+ * Inlong group info for Kafka
+ */
+@Data
+@SuperBuilder
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@JsonTypeDefine(value = MQType.KAFKA)
+@ApiModel("Inlong group info for Kafka")
+public class InlongKafkaInfo extends InlongGroupInfo {
+
+    // partition number
+    private Integer numPartitions;
+    // replicationFactor number
+    private Short replicationFactor = 1;
+    //consumer grouping
+    private String groupId;
+    // autocommit interval
+    private String autoCommit;
+
+    public InlongKafkaInfo() {
+        this.setMqType(MQType.KAFKA);
+    }
+
+    @Override
+    public InlongKafkaRequest genRequest() {
+        return CommonBeanUtils.copyProperties(this, InlongKafkaRequest::new);
+    }
+
+}
diff --git a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
new file mode 100644
index 000000000..67cfe0550
--- /dev/null
+++ b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/group/kafka/InlongKafkaRequest.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.pojo.group.kafka;
+
+import io.swagger.annotations.ApiModel;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.util.JsonTypeDefine;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+
+/**
+ * Inlong group request for Kafka
+ */
+@Data
+@ToString(callSuper = true)
+@EqualsAndHashCode(callSuper = true)
+@ApiModel("Inlong group request for Kafka")
+@JsonTypeDefine(value = MQType.KAFKA)
+public class InlongKafkaRequest extends InlongGroupRequest {
+
+    // partition number
+    private Integer numPartitions;
+    // replicationFactor number
+    private Short replicationFactor = 1;
+    //consumer grouping
+    private String groupId;
+    // autocommit interval
+    private String autoCommit;
+
+    public InlongKafkaRequest() {
+        this.setMqType(MQType.KAFKA);
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
new file mode 100644
index 000000000..4578f9e39
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/KafkaClusterOperator.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.cluster;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterDTO;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Kafka cluster operator.
+ */
+@Service
+public class KafkaClusterOperator extends AbstractClusterOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaClusterOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String clusterType) {
+        return getClusterType().equals(clusterType);
+    }
+
+    @Override
+    public String getClusterType() {
+        return ClusterType.KAFKA;
+    }
+
+    @Override
+    public ClusterInfo getFromEntity(InlongClusterEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
+        }
+
+        KafkaClusterInfo kafkaClusterInfo = new KafkaClusterInfo();
+        CommonBeanUtils.copyProperties(entity, kafkaClusterInfo);
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            KafkaClusterDTO dto = KafkaClusterDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, kafkaClusterInfo);
+        }
+
+        LOGGER.info("success to get kafka cluster info from entity");
+        return kafkaClusterInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) {
+        KafkaClusterRequest kafkaRequest = (KafkaClusterRequest) request;
+        CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
+        try {
+            KafkaClusterDTO dto = KafkaClusterDTO.getFromRequest(kafkaRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+            LOGGER.info("success to set entity for kafka cluster");
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
new file mode 100644
index 000000000..24807e5d2
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/InlongKafkaOperator.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.group;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.CommonBeanUtils;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupRequest;
+import org.apache.inlong.manager.pojo.group.InlongGroupTopicInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaDTO;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaRequest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+/**
+ * Inlong group operator for Kafka.
+ */
+@Service
+public class InlongKafkaOperator extends AbstractGroupOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongKafkaOperator.class);
+
+    @Autowired
+    private ObjectMapper objectMapper;
+
+    @Override
+    public Boolean accept(String mqType) {
+        return getMQType().equals(mqType);
+    }
+
+    @Override
+    public String getMQType() {
+        return MQType.KAFKA;
+    }
+
+    @Override
+    public InlongGroupInfo getFromEntity(InlongGroupEntity entity) {
+        if (entity == null) {
+            throw new BusinessException(ErrorCodeEnum.GROUP_NOT_FOUND);
+        }
+
+        InlongKafkaInfo kafkaInfo = new InlongKafkaInfo();
+        CommonBeanUtils.copyProperties(entity, kafkaInfo);
+
+        if (StringUtils.isNotBlank(entity.getExtParams())) {
+            InlongKafkaDTO dto = InlongKafkaDTO.getFromJson(entity.getExtParams());
+            CommonBeanUtils.copyProperties(dto, kafkaInfo);
+        }
+
+        return kafkaInfo;
+    }
+
+    @Override
+    protected void setTargetEntity(InlongGroupRequest request, InlongGroupEntity targetEntity) {
+        InlongKafkaRequest kafkaRequest = (InlongKafkaRequest) request;
+        CommonBeanUtils.copyProperties(kafkaRequest, targetEntity, true);
+        try {
+            InlongKafkaDTO dto = InlongKafkaDTO.getFromRequest(kafkaRequest);
+            targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
+        } catch (Exception e) {
+            throw new BusinessException(ErrorCodeEnum.SOURCE_INFO_INCORRECT.getMessage() + ": " + e.getMessage());
+        }
+        LOGGER.info("success set entity for inlong group with Kafka");
+    }
+
+    @Override
+    public InlongGroupTopicInfo getTopic(InlongGroupInfo groupInfo) {
+        InlongGroupTopicInfo topicInfo = super.getTopic(groupInfo);
+        return topicInfo;
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
index e9d0795b0..3b7ac58ad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/listener/consumption/ConsumptionCompleteProcessListener.java
@@ -91,6 +91,9 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
             return ListenerResult.success("Create TubeMQ consumer group successful");
         } else if (MQType.PULSAR.equals(mqType) || MQType.TDMQ_PULSAR.equals(mqType)) {
             this.createPulsarSubscription(entity);
+        } else if (MQType.KAFKA.equals(mqType)) {
+            //TODO add kakfa
+
         } else {
             throw new WorkflowListenerException("Unsupported MQ type " + mqType);
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
new file mode 100644
index 000000000..3d3a7e063
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.CreateTopicsResult;
+import org.apache.kafka.clients.admin.DeleteTopicsResult;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Service;
+
+/**
+ * kafka operator, supports creating topics and creating subscription.
+ */
+@Service
+public class KafkaOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
+
+    /**
+     * Create Kafka topic inlongKafkaInfo
+     */
+    public void createTopic(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo, String topicName)
+            throws InterruptedException, ExecutionException {
+        AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
+        NewTopic topic = new NewTopic(topicName,
+                inlongKafkaInfo.getNumPartitions(),
+                inlongKafkaInfo.getReplicationFactor());
+        CreateTopicsResult result = adminClient.createTopics(Collections.singletonList(topic));
+        // To prevent the client from disconnecting too quickly and causing the Topic to not be created successfully
+        Thread.sleep(500);
+        LOGGER.info("success to create kafka topic={}, with={} numPartitions",
+                topicName,
+                result.numPartitions(topicName).get());
+    }
+
+    /**
+     * Force delete Kafka topic
+     */
+    public void forceDeleteTopic(KafkaClusterInfo kafkaClusterInfo, String topicName) {
+        AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
+        DeleteTopicsResult result = adminClient.deleteTopics(Collections.singletonList(topicName));
+        LOGGER.info("success to delete topic={}", topicName);
+    }
+
+    public boolean topicIsExists(KafkaClusterInfo kafkaClusterInfo, String topic)
+            throws ExecutionException, InterruptedException {
+        AdminClient adminClient = KafkaUtils.getAdminClient(kafkaClusterInfo);
+        Set<String> topicList = adminClient.listTopics().names().get();
+        return topicList.contains(topic);
+    }
+
+    public void createSubscription(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
+            String subscription) {
+
+        KafkaConsumer kafkaConsumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo);
+        // subscription
+        kafkaConsumer.subscribe(Collections.singletonList(subscription));
+    }
+
+    public boolean subscriptionIsExists(InlongKafkaInfo inlongKafkaInfo, KafkaClusterInfo kafkaClusterInfo,
+            String topic) {
+        try (KafkaConsumer consumer = KafkaUtils.createKafkaConsumer(inlongKafkaInfo, kafkaClusterInfo)) {
+            Map<String, List<PartitionInfo>> topics = consumer.listTopics();
+            List<PartitionInfo> partitions = topics.get(topic);
+            if (partitions == null) {
+                LOGGER.info("subscription is not exist");
+                return false;
+            }
+            return true;
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
new file mode 100644
index 000000000..7f06633bd
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaResourceOperators.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import javax.validation.constraints.NotBlank;
+import javax.validation.constraints.NotNull;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.consts.MQType;
+import org.apache.inlong.manager.common.enums.ClusterType;
+import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamBriefInfo;
+import org.apache.inlong.manager.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.resource.queue.QueueResourceOperator;
+import org.apache.inlong.manager.service.stream.InlongStreamService;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * Operator for creating Kafka Topic and Subscription
+ */
+@Slf4j
+@Service
+public class KafkaResourceOperators implements QueueResourceOperator {
+
+    @Autowired
+    private InlongClusterService clusterService;
+    @Autowired
+    private InlongStreamService streamService;
+    @Autowired
+    private KafkaOperator kafkaOperator;
+    @Autowired
+    private ConsumptionService consumptionService;
+
+    @Override
+    public boolean accept(String mqType) {
+        return MQType.KAFKA.equals(mqType);
+    }
+
+    @Override
+    public void createQueueForGroup(@NotNull InlongGroupInfo groupInfo, @NotBlank String operator) {
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("begin to create kafka resource for groupId={}", groupId);
+
+        InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
+        try {
+            // 1. create kafka Topic - each Inlong Stream corresponds to a Kafka Topic
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (streamInfoList == null || streamInfoList.isEmpty()) {
+                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
+                return;
+            }
+            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
+                this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId());
+            }
+        } catch (Exception e) {
+            String msg = String.format("failed to create kafka resource for groupId=%s", groupId);
+            log.error(msg, e);
+            throw new WorkflowListenerException(msg + ": " + e.getMessage());
+        }
+        log.info("success to create kafka resource for groupId={}, cluster={}", groupId, inlongKafkaInfo);
+    }
+
+    @Override
+    public void deleteQueueForGroup(InlongGroupInfo groupInfo, String operator) {
+        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
+
+        String groupId = groupInfo.getInlongGroupId();
+        log.info("begin to delete kafka resource for groupId={}", groupId);
+        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+        try {
+            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+            if (streamInfoList == null || streamInfoList.isEmpty()) {
+                log.warn("skip to create kafka topic and subscription as no streams for groupId={}", groupId);
+                return;
+            }
+            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
+                this.deleteKafkaTopic(groupInfo, streamInfo.getInlongStreamId());
+            }
+        } catch (Exception e) {
+            log.error("failed to delete kafka resource for groupId=" + groupId, e);
+            throw new WorkflowListenerException("failed to delete kafka resource: " + e.getMessage());
+        }
+        log.info("success to delete kafka resource for groupId={}, cluster={}", groupId, clusterInfo);
+    }
+
+    @Override
+    public void createQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            String operator) {
+        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
+        Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
+        Preconditions.checkNotNull(operator, "operator cannot be null");
+
+        String groupId = streamInfo.getInlongGroupId();
+        String streamId = streamInfo.getInlongStreamId();
+        log.info("begin to create kafka resource for groupId={}, streamId={}", groupId, streamId);
+
+        try {
+            InlongKafkaInfo inlongKafkaInfo = (InlongKafkaInfo) groupInfo;
+            // create kafka topic
+            this.createKafkaTopic(inlongKafkaInfo, streamInfo.getInlongStreamId());
+        } catch (Exception e) {
+            String msg = String.format("failed to create kafka topic for groupId=%s, streamId=%s", groupId, streamId);
+            log.error(msg, e);
+            throw new WorkflowListenerException(msg + ": " + e.getMessage());
+        }
+        log.info("success to create kafka resource for groupId={}, streamId={}", groupId, streamId);
+    }
+
+    @Override
+    public void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
+            String operator) {
+        Preconditions.checkNotNull(groupInfo, "inlong group info cannot be null");
+        Preconditions.checkNotNull(streamInfo, "inlong stream info cannot be null");
+
+        String groupId = streamInfo.getInlongGroupId();
+        String streamId = streamInfo.getInlongStreamId();
+        log.info("begin to delete kafka resource for groupId={} streamId={}", groupId, streamId);
+
+        try {
+            this.deleteKafkaTopic(groupInfo, streamInfo.getMqResource());
+            log.info("success to delete kafka topic for groupId={}, streamId={}", groupId, streamId);
+        } catch (Exception e) {
+            String msg = String.format("failed to delete kafka topic for groupId=%s, streamId=%s", groupId, streamId);
+            log.error(msg, e);
+            throw new WorkflowListenerException(msg);
+        }
+        log.info("success to delete kafka resource for groupId={}, streamId={}", groupId, streamId);
+    }
+
+    /**
+     * Create Kafka Topic and Subscription, and save the consumer group info.
+     */
+    private void createKafkaTopic(InlongKafkaInfo inlongKafkaInfo, String streamId)
+            throws Exception {
+        // 1. create kafka topic
+        ClusterInfo clusterInfo = clusterService.getOne(inlongKafkaInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+        String topicName = inlongKafkaInfo.getInlongGroupId() + "_" + streamId;
+        kafkaOperator.createTopic(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+
+        boolean exist = kafkaOperator.topicIsExists((KafkaClusterInfo) clusterInfo, topicName);
+        if (!exist) {
+            String bootStrapServers = clusterInfo.getUrl();
+            log.error("topic={} not exists in {}", topicName, bootStrapServers);
+            throw new WorkflowListenerException("topic=" + topicName + " not exists in " + bootStrapServers);
+        }
+
+        // 2. create a subscription for the kafka topic
+        kafkaOperator.createSubscription(inlongKafkaInfo, (KafkaClusterInfo) clusterInfo, topicName);
+        String groupId = inlongKafkaInfo.getInlongGroupId();
+        log.info("success to create pulsar subscription for groupId={}, topic={}, subs={}",
+                groupId, topicName, topicName);
+
+        // 3. insert the consumer group info into the consumption table
+        consumptionService.saveSortConsumption(inlongKafkaInfo, topicName, topicName);
+        log.info("success to save consume for groupId={}, topic={}, subs={}", groupId, topicName, topicName);
+    }
+
+    /**
+     * Delete Kafka Topic and Subscription, and delete the consumer group info.
+     */
+    private void deleteKafkaTopic(InlongGroupInfo groupInfo, String streamId) {
+        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.KAFKA);
+        String topicName = groupInfo.getInlongGroupId() + "_" + streamId;
+        kafkaOperator.forceDeleteTopic((KafkaClusterInfo) clusterInfo, topicName);
+    }
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
new file mode 100644
index 000000000..d793b8ce4
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/kafka/KafkaUtils.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.resource.queue.kafka;
+
+import java.util.Properties;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
+import org.apache.inlong.manager.pojo.group.kafka.InlongKafkaInfo;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+
+/**
+ * kafka connection utils
+ */
+@Slf4j
+public class KafkaUtils {
+
+    public static AdminClient getAdminClient(KafkaClusterInfo kafkaClusterInfo) {
+        Properties properties = new Properties();
+        // Configure the access address and port number of the Kafka service
+        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getUrl());
+        // Create AdminClient instance
+        return AdminClient.create(properties);
+    }
+
+    public static KafkaConsumer createKafkaConsumer(InlongKafkaInfo inlongKafkaInfo,KafkaClusterInfo kafkaClusterInfo) {
+        Properties properties = new Properties();
+        // The connected kafka cluster address
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getUrl());
+        // consumer grouping
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, inlongKafkaInfo.getGroupId());
+        // Confirm Auto Commit
+        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
+        // autocommit interval
+        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
+        // Serialization
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.IntegerDeserializer");
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                "org.apache.kafka.common.serialization.StringDeserializer");
+        // For different groupid to ensure that the previous message can be consumed, reset the offset
+        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        return new KafkaConsumer(properties);
+    }
+}