You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/07 15:47:32 UTC

[incubator-inlong] branch master updated: [INLONG-4563][Manager] Support create TubeMQ resources by its origin APIs (#4564)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 099305c2e [INLONG-4563][Manager] Support create TubeMQ resources by its origin APIs (#4564)
099305c2e is described below

commit 099305c2ed96096e688fa6092e0a9c8473e46de1
Author: healzhou <he...@gmail.com>
AuthorDate: Tue Jun 7 23:47:27 2022 +0800

    [INLONG-4563][Manager] Support create TubeMQ resources by its origin APIs (#4564)
---
 ...gerResponse.java => ConsumerGroupResponse.java} |  28 ++-
 ...TubeManagerResponse.java => TopicResponse.java} |  26 ++-
 .../manager/common/pojo/tubemq/TubeBrokerInfo.java | 168 ++++++++++++++
 .../common/pojo/tubemq/TubeClusterResponse.java    |  81 -------
 ...eManagerResponse.java => TubeHttpResponse.java} |   7 +-
 .../service/mq/CreatePulsarGroupTaskListener.java  |   8 +-
 .../mq/CreatePulsarResourceTaskListener.java       |  10 +-
 .../mq/CreatePulsarSubscriptionTaskListener.java   |   8 +-
 .../service/mq/CreatePulsarTopicTaskListener.java  |   6 +-
 .../service/mq/CreateTubeGroupTaskListener.java    |  61 +----
 .../service/mq/CreateTubeTopicTaskListener.java    |  30 ++-
 ...lsarOptServiceImpl.java => PulsarOperator.java} |  50 ++---
 .../manager/service/mq/util/PulsarOptService.java  |  48 ----
 .../manager/service/mq/util/TubeMQOperator.java    | 248 +++++++++++++++++++++
 .../manager/service/mq/util/TubeMqOptService.java  | 132 -----------
 .../service/sort/CreateSortConfigListenerV2.java   |   5 +-
 .../sort/CreateStreamSortConfigListener.java       |   2 +-
 .../ConsumptionCompleteProcessListener.java        |  56 ++---
 18 files changed, 566 insertions(+), 408 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/ConsumerGroupResponse.java
similarity index 64%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/ConsumerGroupResponse.java
index 34ebb63c0..9a2f39b65 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/ConsumerGroupResponse.java
@@ -19,14 +19,36 @@ package org.apache.inlong.manager.common.pojo.tubemq;
 
 import lombok.Data;
 
+import java.util.List;
+
 /**
- * The response info of tube manager.
+ * Topic view of TubeMQ
  */
 @Data
-public class TubeManagerResponse {
+public class ConsumerGroupResponse {
 
+    // true, or false
     private boolean result;
+
+    // 0 is success, other is failed
     private int errCode;
+
+    // OK, or err msg
     private String errMsg;
 
-}
+    private List<ConsumerGroupInfo> data;
+
+    private int count;
+
+    @Data
+    public static class ConsumerGroupInfo {
+
+        private String topicName;
+        private String groupName;
+        private String createUser;
+        private String modifyUser;
+        private String createDate; // 20150619115100
+        private String modifyDate;
+    }
+
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TopicResponse.java
similarity index 64%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TopicResponse.java
index 34ebb63c0..66496fa44 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TopicResponse.java
@@ -19,14 +19,32 @@ package org.apache.inlong.manager.common.pojo.tubemq;
 
 import lombok.Data;
 
+import java.util.List;
+
 /**
- * The response info of tube manager.
+ * Topic view of TubeMQ
  */
 @Data
-public class TubeManagerResponse {
+public class TopicResponse {
 
-    private boolean result;
     private int errCode;
     private String errMsg;
 
-}
+    // total topic info list
+    private List<TopicInfo> data;
+
+    private int dataCount;
+
+    @Data
+    public static class TopicInfo {
+
+        private String topicName;
+        private int totalCfgBrokerCnt;
+        private int totalCfgNumPart;
+        private int totalRunNumPartCount;
+        private boolean isSrvAcceptPublish;
+        private boolean isSrvAcceptSubscribe;
+        private boolean enableAuthControl;
+    }
+
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeBrokerInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeBrokerInfo.java
new file mode 100644
index 000000000..a95d17a98
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeBrokerInfo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.tubemq;
+
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Broker view of TubeMQ, includes different status brokers list.
+ */
+@Data
+public class TubeBrokerInfo {
+
+    private static final String IDLE = "idle";
+    private static final String RUNNING = "running";
+    private static final String ONLINE = "online";
+    private static final String ONLY_READ = "only-read";
+    private static final String ONLY_WRITE = "only-write";
+
+    private int errCode;
+    private String errMsg;
+
+    // total broker info list of brokers
+    private List<BrokerInfo> data;
+    // configurable list of brokers.
+    private List<BrokerInfo> configurableList;
+    // working state list of brokers
+    private List<BrokerInfo> workingList;
+    // idle broker list
+    private List<BrokerInfo> idleList;
+    // need reload broker list
+    private List<Integer> needReloadList;
+
+    /**
+     * Divide broker list into different list by broker status.
+     */
+    public void divideBrokerListByStatus() {
+        if (data != null) {
+            configurableList = new ArrayList<>();
+            workingList = new ArrayList<>();
+            idleList = new ArrayList<>();
+            needReloadList = new ArrayList<>();
+            for (BrokerInfo brokerInfo : data) {
+                if (brokerInfo.isConfigurable()) {
+                    configurableList.add(brokerInfo);
+                }
+                if (brokerInfo.isWorking()) {
+                    workingList.add(brokerInfo);
+                }
+                if (brokerInfo.isIdle()) {
+                    idleList.add(brokerInfo);
+                }
+                if (brokerInfo.isConfChanged) {
+                    needReloadList.add(brokerInfo.getBrokerId());
+                }
+            }
+        }
+    }
+
+    /**
+     * Get all configurable broker id list.
+     */
+    public List<Integer> getConfigurableBrokerIdList() {
+        List<Integer> tmpBrokerIdList = new ArrayList<>();
+        if (configurableList != null) {
+            for (BrokerInfo brokerInfo : configurableList) {
+                tmpBrokerIdList.add(brokerInfo.getBrokerId());
+            }
+        }
+        return tmpBrokerIdList;
+    }
+
+    /**
+     * Get all working broker id list.
+     */
+    public List<Integer> getWorkingBrokerIdList() {
+        List<Integer> tmpBrokerIdList = new ArrayList<>();
+        if (workingList != null) {
+            for (BrokerInfo brokerInfo : workingList) {
+                tmpBrokerIdList.add(brokerInfo.getBrokerId());
+            }
+        }
+        return tmpBrokerIdList;
+    }
+
+    /**
+     * Get all broker id list.
+     */
+    public List<Integer> getAllBrokerIdList() {
+        List<Integer> allIdList = new ArrayList<>();
+        if (data != null) {
+            for (BrokerInfo brokerInfo : data) {
+                allIdList.add(brokerInfo.getBrokerId());
+            }
+        }
+        return allIdList;
+    }
+
+    /**
+     * Broker info
+     */
+    @Data
+    public static class BrokerInfo {
+
+        private int brokerId;
+        private String brokerIp;
+        private int brokerPort;
+        private String manageStatus;
+        private String runStatus;
+        private String subStatus;
+        private int stepOp;
+        private boolean isConfChanged;
+        private boolean isConfLoaded;
+        private boolean isBrokerOnline;
+        private String brokerVersion;
+        private boolean acceptPublish;
+        private boolean acceptSubscribe;
+
+        private boolean isIdle() {
+            return IDLE.equals(subStatus);
+        }
+
+        private boolean isWorking() {
+            return RUNNING.equals(runStatus) && (
+                    ONLINE.equals(manageStatus) || ONLY_READ.equals(manageStatus) || ONLY_WRITE.equals(manageStatus));
+        }
+
+        private boolean isConfigurable() {
+            return stepOp == 0 || stepOp == -2 || stepOp == 31 || stepOp == 32;
+        }
+
+        @Override
+        public int hashCode() {
+            return brokerId;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (o == this) {
+                return true;
+            }
+            if (!(o instanceof BrokerInfo)) {
+                return false;
+            }
+
+            BrokerInfo brokerInfo = (BrokerInfo) o;
+            return brokerId == brokerInfo.brokerId;
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeClusterResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeClusterResponse.java
deleted file mode 100644
index 4ee5ac577..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeClusterResponse.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.tubemq;
-
-import java.util.List;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * The response info of tube cluster.
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class TubeClusterResponse extends TubeManagerResponse {
-
-    private List<DataBean> data;
-
-    public static class DataBean {
-
-        private int clusterId;
-        private String clusterName;
-        private String createTime;
-        private Object modifyTime;
-        private String createUser;
-
-        public int getClusterId() {
-            return clusterId;
-        }
-
-        public void setClusterId(int clusterId) {
-            this.clusterId = clusterId;
-        }
-
-        public String getClusterName() {
-            return clusterName;
-        }
-
-        public void setClusterName(String clusterName) {
-            this.clusterName = clusterName;
-        }
-
-        public String getCreateTime() {
-            return createTime;
-        }
-
-        public void setCreateTime(String createTime) {
-            this.createTime = createTime;
-        }
-
-        public Object getModifyTime() {
-            return modifyTime;
-        }
-
-        public void setModifyTime(Object modifyTime) {
-            this.modifyTime = modifyTime;
-        }
-
-        public String getCreateUser() {
-            return createUser;
-        }
-
-        public void setCreateUser(String createUser) {
-            this.createUser = createUser;
-        }
-    }
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeHttpResponse.java
similarity index 89%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeHttpResponse.java
index 34ebb63c0..9466305b8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeHttpResponse.java
@@ -23,10 +23,15 @@ import lombok.Data;
  * The response info of tube manager.
  */
 @Data
-public class TubeManagerResponse {
+public class TubeHttpResponse {
 
+    // true, or false
     private boolean result;
+
+    // 0 is success, other is failed
     private int errCode;
+
+    // OK, or err msg
     private String errMsg;
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index 39c211ff6..e50e14f37 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -32,7 +32,7 @@ import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -60,7 +60,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
     @Autowired
     private ConsumptionService consumptionService;
     @Autowired
-    private PulsarOptService pulsarOptService;
+    private PulsarOperator pulsarOperator;
 
     @Override
     public TaskEvent event() {
@@ -103,7 +103,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
 
                 // Create a subscription in the Pulsar cluster you need to ensure that the Topic exists
                 try {
-                    boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+                    boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topic);
                     if (!exist) {
                         String topicFull = tenant + "/" + namespace + "/" + topic;
                         String serviceUrl = pulsarCluster.getAdminUrl();
@@ -113,7 +113,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
 
                     // Consumer naming rules: clusterTag_topicName_consumer_group
                     String subscription = clusterTag + "_" + topic + "_consumer_group";
-                    pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+                    pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
 
                     // Insert the consumption data into the consumption table
                     consumptionService.saveSortConsumption(groupInfo, topic, subscription);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index b58e9b921..146669fad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.InlongStreamService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -59,7 +59,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
-    private PulsarOptService pulsarOptService;
+    private PulsarOperator pulsarOperator;
 
     @Override
     public TaskEvent event() {
@@ -109,10 +109,10 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
             if (StringUtils.isEmpty(tenant)) {
                 tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
             }
-            pulsarOptService.createTenant(pulsarAdmin, tenant);
+            pulsarOperator.createTenant(pulsarAdmin, tenant);
 
             // create pulsar namespace
-            pulsarOptService.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
+            pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
 
             // create pulsar topic
             Integer partitionNum = pulsarInfo.getPartitionNum();
@@ -122,7 +122,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
 
             for (InlongStreamBriefInfo streamInfo : streamTopicList) {
                 topicBean.setTopicName(streamInfo.getMqResource());
-                pulsarOptService.createTopic(pulsarAdmin, topicBean);
+                pulsarOperator.createTopic(pulsarAdmin, topicBean);
             }
         }
         log.info("finish to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
index b806aef74..c06dd42aa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -52,7 +52,7 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
-    private PulsarOptService pulsarOptService;
+    private PulsarOperator pulsarOperator;
     @Autowired
     private StreamSinkService sinkService;
     @Autowired
@@ -92,7 +92,7 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
             topicBean.setTopicName(topic);
 
             // Create a subscription in the Pulsar cluster, you need to ensure that the Topic exists
-            boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+            boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topic);
             if (!exist) {
                 String fullTopic = tenant + "/" + namespace + "/" + topic;
                 String msg = String.format("topic=%s not exists in %s", fullTopic, pulsarCluster.getAdminUrl());
@@ -102,7 +102,7 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
 
             // Consumer naming rules: clusterTag_topicName_consumer_group
             String subscription = clusterTag + "_" + topic + "_consumer_group";
-            pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+            pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
 
             // Insert the consumption data into the consumption table
             consumptionService.saveSortConsumption(groupInfo, topic, subscription);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 8e39f9b48..5b7e63725 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -30,7 +30,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -50,7 +50,7 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
-    private PulsarOptService pulsarOptService;
+    private PulsarOperator pulsarOperator;
 
     @Override
     public TaskEvent event() {
@@ -85,7 +85,7 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
                         .queueModule(pulsarInfo.getQueueModule())
                         .numPartitions(pulsarInfo.getPartitionNum())
                         .build();
-                pulsarOptService.createTopic(pulsarAdmin, topicBean);
+                pulsarOperator.createTopic(pulsarAdmin, topicBean);
             }
         } catch (Exception e) {
             String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
index 0acf8fac8..3ed670a9a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
@@ -18,18 +18,14 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.beans.ReTryConfigBean;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
-import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
+import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -37,8 +33,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.Collections;
-
 /**
  * Event listener of create tube consumer group.
  */
@@ -51,9 +45,7 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
     @Autowired
     private InlongClusterService clusterService;
     @Autowired
-    private TubeMqOptService tubeMqOptService;
-    @Autowired
-    private ReTryConfigBean reTryConfigBean;
+    private TubeMQOperator tubeMQOperator;
 
     @Override
     public TaskEvent event() {
@@ -64,58 +56,27 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
         String groupId = form.getInlongGroupId();
-        log.info("try to create consumer group for groupId {}", groupId);
+        log.info("begin to create tube consumer group for groupId {}", groupId);
 
         InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
-        InlongClusterInfo tubeCluster = clusterService.getOne(groupEntity.getInlongClusterTag(),
-                null, ClusterType.CLS_TUBE);
-
-        // TODO use the original method of TubeMQ to create group
-        // TubeClusterDTO clusterDTO = TubeClusterDTO.getFromJson(clusters.get(0).getExtParams());
-        // int clusterId = clusterDTO.getClusterId();
+        String clusterTag = groupEntity.getInlongClusterTag();
+        TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.CLS_TUBE);
 
         String topicName = groupEntity.getMqResource();
-        QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
-                .topicName(topicName).clusterId(1)
-                .user(groupEntity.getCreator()).build();
-        // Query whether the tube topic exists
-        boolean topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
-
-        Integer tryNumber = reTryConfigBean.getMaxAttempts();
-        Long delay = reTryConfigBean.getDelay();
-        while (!topicExist && --tryNumber > 0) {
-            log.info("check whether the tube topic exists, try count={}", tryNumber);
-            try {
-                Thread.sleep(delay);
-                delay *= reTryConfigBean.getMultiplier();
-                topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
-            } catch (InterruptedException e) {
-                log.error("check the tube topic exists error", e);
-            }
-        }
-
-        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
-        addTubeConsumeGroupRequest.setClusterId(1);
-        addTubeConsumeGroupRequest.setCreateUser(groupEntity.getCreator());
-
-        GroupNameJsonSetBean groupNameJsonSetBean = new GroupNameJsonSetBean();
-        groupNameJsonSetBean.setTopicName(topicName);
-        String consumeGroupName = "sort_" + topicName + "_group";
-        groupNameJsonSetBean.setGroupName(consumeGroupName);
-        addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(groupNameJsonSetBean));
-
+        // Consumer naming rules: clusterTag_topicName_consumer_group
+        String consumeGroup = clusterTag + "_" + topicName + "_consumer_group";
         try {
-            tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+            tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, context.getOperator());
         } catch (Exception e) {
             throw new WorkflowListenerException("create tube consumer group for groupId=" + groupId + " error", e);
         }
-        log.info("finish to create consumer group for {}", groupId);
+        log.info("finish to create tube consumer group for groupId={}", groupId);
         return ListenerResult.success();
     }
 
     @Override
     public boolean async() {
-        return true;
+        return false;
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index d0f459351..50ace8a4a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -18,12 +18,15 @@
 package org.apache.inlong.manager.service.mq;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
 import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
+import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -31,8 +34,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
-import java.util.Collections;
-
 /**
  * Create a listener for MQ resource tasks
  */
@@ -41,7 +42,9 @@ import java.util.Collections;
 public class CreateTubeTopicTaskListener implements QueueOperateListener {
 
     @Autowired
-    private TubeMqOptService tubeMqOptService;
+    private InlongClusterService clusterService;
+    @Autowired
+    private TubeMQOperator tubeMQOperator;
     @Autowired
     private InlongGroupService groupService;
 
@@ -53,20 +56,15 @@ public class CreateTubeTopicTaskListener implements QueueOperateListener {
     @Override
     public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
         GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-
         log.info("begin create tube topic for groupId={}", form.getInlongGroupId());
-        String groupId = form.getInlongGroupId();
 
+        String groupId = form.getInlongGroupId();
         try {
-            InlongGroupInfo groupInfo = groupService.get(groupId);
-            String topicName = groupInfo.getMqResource();
-            AddTubeMqTopicRequest request = new AddTubeMqTopicRequest();
-            request.setUser("inlong-manager");
-            AddTubeMqTopicRequest.AddTopicTasksBean tasksBean = new AddTubeMqTopicRequest.AddTopicTasksBean();
-            tasksBean.setTopicName(topicName);
-            request.setAddTopicTasks(Collections.singletonList(tasksBean));
-            String result = tubeMqOptService.createNewTopic(request);
-            log.info("finish to create tube topic for groupId={}, result={}", groupId, result);
+            InlongGroupInfo groupInfo = form.getGroupInfo();
+            String clusterTag = groupInfo.getInlongClusterTag();
+            InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_TUBE);
+            tubeMQOperator.createTopic((TubeClusterInfo) clusterInfo, groupInfo.getMqResource(), context.getOperator());
+            log.info("finish to create tube topic for groupId={}", groupId);
         } catch (Exception e) {
             log.error("create tube topic for groupId={} error, exception {} ", groupId, e.getMessage(), e);
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
similarity index 84%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
index 45eba4d6b..d8c66e5b9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
@@ -18,12 +18,12 @@
 package org.apache.inlong.manager.service.mq.util;
 
 import com.google.common.collect.Sets;
-import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.conversion.ConversionHandle;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -31,60 +31,60 @@ import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
 
 /**
- * Operation interface of Pulsar
+ * Pulsar operator, supports creating topics and creating subscription.
  */
 @Service
-@Slf4j
-public class PulsarOptServiceImpl implements PulsarOptService {
+public class PulsarOperator {
 
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
     private static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
 
     @Autowired
     private ConversionHandle conversionHandle;
 
-    @Override
     public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
-        log.info("begin to create pulsar tenant={}", tenant);
+        LOGGER.info("begin to create pulsar tenant={}", tenant);
 
         Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
         try {
             List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
             boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
             if (exists) {
-                log.warn("pulsar tenant={} already exists, skip to create", tenant);
+                LOGGER.warn("pulsar tenant={} already exists, skip to create", tenant);
                 return;
             }
             TenantInfoImpl tenantInfo = new TenantInfoImpl();
             tenantInfo.setAllowedClusters(Sets.newHashSet(clusters));
             tenantInfo.setAdminRoles(Sets.newHashSet());
             pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
-            log.info("success to create pulsar tenant={}", tenant);
+            LOGGER.info("success to create pulsar tenant={}", tenant);
         } catch (PulsarAdminException e) {
-            log.error("failed to create pulsar tenant=" + tenant, e);
+            LOGGER.error("failed to create pulsar tenant=" + tenant, e);
             throw e;
         }
     }
 
-    @Override
     public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo,
             String tenant, String namespace) throws PulsarAdminException {
         Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
         Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
 
         String namespaceName = tenant + "/" + namespace;
-        log.info("begin to create namespace={}", namespaceName);
+        LOGGER.info("begin to create namespace={}", namespaceName);
 
         try {
             // Check whether the namespace exists, and create it if it does not exist
             boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
             if (isExists) {
-                log.warn("namespace={} already exists, skip to create", namespaceName);
+                LOGGER.warn("namespace={} already exists, skip to create", namespaceName);
                 return;
             }
 
@@ -119,14 +119,13 @@ public class PulsarOptServiceImpl implements PulsarOptService {
             PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(),
                     pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), 0);
             namespaces.setPersistence(namespaceName, persistencePolicies);
-            log.info("success to create namespace={}", namespaceName);
+            LOGGER.info("success to create namespace={}", namespaceName);
         } catch (PulsarAdminException e) {
-            log.error("failed to create namespace=" + namespaceName, e);
+            LOGGER.error("failed to create namespace=" + namespaceName, e);
             throw e;
         }
     }
 
-    @Override
     public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
         Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
 
@@ -137,7 +136,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
 
         // Topic will be returned if it exists, and created if it does not exist
         if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
-            log.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
+            LOGGER.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
             return;
         }
 
@@ -157,44 +156,42 @@ public class PulsarOptServiceImpl implements PulsarOptService {
                 pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
             }
 
-            log.info("success to create topic={}", topicFullName);
+            LOGGER.info("success to create topic={}", topicFullName);
         } catch (Exception e) {
-            log.error("failed to create topic=" + topicFullName, e);
+            LOGGER.error("failed to create topic=" + topicFullName, e);
             throw e;
         }
     }
 
-    @Override
     public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
             throws PulsarAdminException {
         Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
         Preconditions.checkNotNull(subscription, "subscription cannot be empty during creating subscription");
 
         String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
-        log.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
+        LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
 
         try {
             boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
             if (!isExists) {
                 pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
-                log.info("success to create subscription={}", subscription);
+                LOGGER.info("success to create subscription={}", subscription);
             } else {
-                log.warn("pulsar subscription={} already exists, skip to create", subscription);
+                LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
             }
         } catch (Exception e) {
-            log.error("failed to create pulsar subscription=" + subscription, e);
+            LOGGER.error("failed to create pulsar subscription=" + subscription, e);
             throw e;
         }
     }
 
-    @Override
     public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
             List<String> topicList) throws PulsarAdminException {
         for (String topic : topicList) {
             topicBean.setTopicName(topic);
             this.createSubscription(pulsarAdmin, topicBean, subscription);
         }
-        log.info("success to create subscription={} for multiple topics={}", subscription, topicList);
+        LOGGER.info("success to create subscription={} for multiple topics={}", subscription, topicList);
     }
 
     /**
@@ -220,7 +217,6 @@ public class PulsarOptServiceImpl implements PulsarOptService {
      * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
      *         Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
      */
-    @Override
     public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
             throws PulsarAdminException {
         if (StringUtils.isBlank(topic)) {
@@ -243,7 +239,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
             List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
             return subscriptionList.contains(subscription);
         } catch (PulsarAdminException e) {
-            log.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
+            LOGGER.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
             return false;
         }
     }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptService.java
deleted file mode 100644
index 9b519104e..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptService.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq.util;
-
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-
-import java.util.List;
-
-/**
- * Interface of Pulsar operation
- */
-public interface PulsarOptService {
-
-    void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException;
-
-    void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo groupInfo, String tenant,
-            String namespace) throws PulsarAdminException;
-
-    void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException;
-
-    void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
-            throws PulsarAdminException;
-
-    void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
-            List<String> topics) throws PulsarAdminException;
-
-    boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
-            throws PulsarAdminException;
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
new file mode 100644
index 000000000..0f68ba995
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.mq.util;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.ConsumerGroupResponse;
+import org.apache.inlong.manager.common.pojo.tubemq.TopicResponse;
+import org.apache.inlong.manager.common.pojo.tubemq.TubeBrokerInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.TubeHttpResponse;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Nonnull;
+import java.util.List;
+
+/**
+ * TubeMQ operator, supports creating topics and creating consumer groups.
+ */
+@Service
+public class TubeMQOperator {
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
+    private static final Integer SUCCESS_CODE = 0;
+
+    /**
+     * TubeMQ const for HTTP URL format
+     */
+    private static final String TOPIC_NAME = "&topicName=";
+    private static final String CONSUME_GROUP = "&consumeGroup=";
+    private static final String GROUP_NAME = "&groupName=";
+    private static final String BROKER_ID = "&brokerId=";
+    private static final String CREATE_USER = "&createUser=";
+    private static final String CONF_MOD_AUTH_TOKEN = "&confModAuthToken=";
+
+    private static final String QUERY_TOPIC_PATH = "/webapi.htm?method=admin_query_cluster_topic_view";
+    private static final String QUERY_BROKER_PATH = "/webapi.htm?method=admin_query_broker_run_status";
+    private static final String ADD_TOPIC_PATH = "/webapi.htm?method=admin_add_new_topic_record";
+    private static final String QUERY_CONSUMER_PATH = "/webapi.htm?method=admin_query_allowed_consumer_group_info";
+    private static final String ADD_CONSUMER_PATH = "/webapi.htm?method=admin_add_authorized_consumergroup_info";
+
+    @Autowired
+    private HttpUtils httpUtils;
+
+    /**
+     * Create topic for the given tube cluster.
+     */
+    public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String topicName, String operator) {
+        String masterUrl = tubeCluster.getUrl();
+        LOGGER.info("begin to create tube topic {} in master {}", topicName, masterUrl);
+        if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(topicName)) {
+            throw new BusinessException("tube master url or tube topic cannot be null");
+        }
+
+        if (this.isTopicExist(masterUrl, topicName)) {
+            LOGGER.warn("tube topic {} already exists in {}, skip to create", topicName, masterUrl);
+            return;
+        }
+
+        this.createTopicOpt(masterUrl, topicName, tubeCluster.getToken(), operator);
+        LOGGER.info("success to create tube topic {} in {}", topicName, masterUrl);
+    }
+
+    /**
+     * Create consumer group for the given tube topic and cluster.
+     */
+    public void createConsumerGroup(TubeClusterInfo tubeCluster, String topic, String consumerGroup, String operator) {
+        String masterUrl = tubeCluster.getUrl();
+        LOGGER.info("begin to create consumer group {} for topic {} in master {}", consumerGroup, topic, masterUrl);
+        if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(consumerGroup) || StringUtils.isEmpty(topic)) {
+            throw new BusinessException("tube master url, consumer group, or tube topic cannot be null");
+        }
+
+        if (!this.isTopicExist(masterUrl, topic)) {
+            LOGGER.warn("cannot create tube consumer group {}, as the topic {} not exists in master {}",
+                    consumerGroup, topic, masterUrl);
+            return;
+        }
+
+        if (this.isConsumerGroupExist(masterUrl, topic, consumerGroup)) {
+            LOGGER.warn("tube consumer group {} already exists for topic {} in master {}, skip to create",
+                    consumerGroup, topic, masterUrl);
+            return;
+        }
+
+        this.createConsumerGroupOpt(masterUrl, topic, consumerGroup, tubeCluster.getToken(), operator);
+        LOGGER.info("success to create tube consumer group {} for topic {} in {}", consumerGroup, topic, masterUrl);
+    }
+
+    /**
+     * Check if the topic is exists in the TubeMQ.
+     */
+    public boolean isTopicExist(String masterUrl, String topicName) {
+        LOGGER.info("begin to check if the tube topic {} exists", topicName);
+        String url = masterUrl + QUERY_TOPIC_PATH + TOPIC_NAME + topicName;
+        try {
+            TopicResponse topicView = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+                    TopicResponse.class);
+            if (CollectionUtils.isEmpty(topicView.getData())) {
+                LOGGER.warn("tube topic {} not exists in {}", topicName, url);
+                return false;
+            }
+            LOGGER.info("tube topic {} exists in {}", topicName, url);
+            return true;
+        } catch (Exception e) {
+            String msg = String.format("failed to check if the topic %s exist in ", topicName);
+            LOGGER.error(msg + url, e);
+            throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Check if the consumer group is exists for the given topic.
+     */
+    public boolean isConsumerGroupExist(String masterUrl, String topicName, String consumerGroup) {
+        LOGGER.info("begin to check if the consumer group {} exists on topic {}", consumerGroup, topicName);
+        String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName + CONSUME_GROUP + consumerGroup;
+        try {
+            ConsumerGroupResponse response = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+                    ConsumerGroupResponse.class);
+            if (CollectionUtils.isEmpty(response.getData())) {
+                LOGGER.warn("tube consumer group {} not exists for topic {} in {}", consumerGroup, topicName, url);
+                return false;
+            }
+            LOGGER.info("tube consumer group {} exists for topic {} in {}", consumerGroup, topicName, url);
+            return true;
+        } catch (Exception e) {
+            String msg = String.format("failed to check if the consumer group %s for topic %s exist in ",
+                    consumerGroup, topicName);
+            LOGGER.error(msg + url, e);
+            throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Get the broker list by the given TubeMQ master URL.
+     */
+    private TubeBrokerInfo getBrokerInfo(String masterUrl) {
+        String url = masterUrl + QUERY_BROKER_PATH;
+        try {
+            TubeBrokerInfo brokerInfo = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+                    TubeBrokerInfo.class);
+            if (brokerInfo.getErrCode() != SUCCESS_CODE) {
+                String msg = "failed to query tube broker from %s, error: %s";
+                LOGGER.error(String.format(msg, url, brokerInfo.getErrMsg()));
+                throw new BusinessException(String.format(msg, masterUrl, brokerInfo.getErrMsg()));
+            }
+
+            // is success, divide the broker by status
+            brokerInfo.divideBrokerListByStatus();
+            if (LOGGER.isDebugEnabled()) {
+                LOGGER.debug("success to query tube broker from {}, result {}", url, brokerInfo.getData());
+            }
+            return brokerInfo;
+        } catch (Exception e) {
+            String msg = "failed to query tube broker from %s";
+            LOGGER.error(String.format(msg, url), e);
+            throw new BusinessException(String.format(msg, masterUrl) + ", error: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Create topic operation.
+     */
+    private void createTopicOpt(String masterUrl, String topicName, String token, String operator) {
+        LOGGER.info(String.format("begin to create tube topic %s in master %s", topicName, masterUrl));
+        TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
+        List<Integer> allBrokers = brokerView.getAllBrokerIdList();
+        if (CollectionUtils.isEmpty(allBrokers)) {
+            String msg = String.format("cannot create topic %s, as not any brokers found in %s", topicName, masterUrl);
+            LOGGER.error(msg);
+            throw new BusinessException(msg);
+        }
+
+        // create topic for all brokers
+        String url = masterUrl + ADD_TOPIC_PATH + TOPIC_NAME + topicName
+                + BROKER_ID + StringUtils.join(allBrokers, ",")
+                + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
+        try {
+            TubeHttpResponse response = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+                    TubeHttpResponse.class);
+            if (response.getErrCode() != SUCCESS_CODE) {
+                String msg = String.format("failed to create tube topic %s, error: %s",
+                        topicName, response.getErrMsg());
+                LOGGER.error(msg + " in {} for brokers {}", masterUrl, allBrokers);
+                throw new BusinessException(msg);
+            }
+
+            LOGGER.info("success to create tube topic {} in {}", topicName, url);
+        } catch (Exception e) {
+            String msg = String.format("failed to create tube topic %s in %s", topicName, masterUrl);
+            LOGGER.error(msg, e);
+            throw new BusinessException(msg + ", error: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Create consumer group operation.
+     */
+    private void createConsumerGroupOpt(String masterUrl, String topicName, String consumerGroup, String token,
+            String operator) {
+        LOGGER.info(String.format("begin to create consumer group %s for topic %s in master %s",
+                consumerGroup, topicName, masterUrl));
+
+        String url = masterUrl + ADD_CONSUMER_PATH + TOPIC_NAME + topicName
+                + GROUP_NAME + consumerGroup
+                + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
+        try {
+            TubeHttpResponse response = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+                    TubeHttpResponse.class);
+            if (response.getErrCode() != SUCCESS_CODE) {
+                String msg = String.format("failed to create tube consumer group %s for topic %s, error: %s",
+                        consumerGroup, topicName, response.getErrMsg());
+                LOGGER.error(msg + ", url {}", url);
+                throw new BusinessException(msg);
+            }
+            LOGGER.info("success to create tube topic {} in {}", topicName, url);
+        } catch (Exception e) {
+            String msg = String.format("failed to create tube topic %s in %s", topicName, masterUrl);
+            LOGGER.error(msg, e);
+            throw new BusinessException(msg + ", error: " + e.getMessage());
+        }
+    }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
deleted file mode 100644
index 3774aba4a..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq.util;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.TubeManagerResponse;
-import org.apache.inlong.manager.common.util.HttpUtils;
-import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-
-/**
- * TubeMq operation service, such as create new topic, consumer group, etc.
- */
-@Slf4j
-@Service
-public class TubeMqOptService {
-
-    private static final Gson GSON = new GsonBuilder().create(); // thread safe
-
-    @Autowired
-    private InlongClusterEntityMapper clusterMapper;
-    @Autowired
-    private HttpUtils httpUtils;
-
-    /**
-     * Create new topic
-     */
-    public String createNewTopic(AddTubeMqTopicRequest request) {
-        HttpHeaders httpHeaders = new HttpHeaders();
-        httpHeaders.add("Content-Type", "application/json");
-        try {
-            if (CollectionUtils.isEmpty(request.getAddTopicTasks())) {
-                throw new Exception("topic cannot be empty");
-            }
-
-            // TODO use the original method of TubeMQ to create group
-            AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
-            QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
-                    .topicName(addTopicTasksBean.getTopicName()).clusterId(1)
-                    .user(request.getUser()).build();
-
-            String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
-            TubeManagerResponse response = httpUtils
-                    .request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
-                            GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
-            if (response.getErrCode() == 101) { // topic already exists
-                log.info(" create tube topic  {}  on {} ", GSON.toJson(request),
-                        tubeManager + "/v1/task?method=addTopicTask");
-
-                request.setClusterId(1);
-                TubeManagerResponse createRsp = httpUtils
-                        .request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
-                                GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
-                log.info("create tube topic success, result: {}", createRsp);
-            } else {
-                log.warn("tube topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
-            }
-        } catch (Exception e) {
-            log.error("failed to create tube topic: " + request.getAddTopicTasks().get(0).getTopicName(), e);
-        }
-        return "";
-    }
-
-    /**
-     * Create new consumer group
-     */
-    public String createNewConsumerGroup(AddTubeConsumeGroupRequest request) throws Exception {
-        HttpHeaders httpHeaders = new HttpHeaders();
-        httpHeaders.add("Content-Type", "application/json");
-        try {
-            String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
-            log.info("create tube consumer group {}  on {} ", GSON.toJson(request),
-                    tubeManager + "/v1/task?method=addTopicTask");
-            TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/group?method=add",
-                    HttpMethod.POST, GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
-            if (response.getErrCode() == -1) { // Creation failed
-                throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, response.getErrMsg());
-            }
-            log.info("create tube consumer group success, result: {}", response);
-        } catch (BusinessException e) {
-            log.error("failed to create tube consumer group: " + GSON.toJson(request), e);
-            throw e;
-        }
-        return "";
-    }
-
-    /**
-     * Check if the topic is exists
-     */
-    public boolean queryTopicIsExist(QueryTubeTopicRequest queryTubeTopicRequest) {
-        HttpHeaders httpHeaders = new HttpHeaders();
-        httpHeaders.add("Content-Type", "application/json");
-        try {
-            String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
-            TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
-                    HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
-            if (response.getErrCode() == 0) { // topic already exists
-                log.error("topic {} exists in {} ", queryTubeTopicRequest.getTopicName(), tubeManager);
-                return true;
-            }
-        } catch (Exception e) {
-            log.error("fail to query tube topic {}", queryTubeTopicRequest.getTopicName(), e);
-        }
-        return false;
-    }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 6f0f5c28a..b107998bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -105,6 +105,9 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
         return ListenerResult.success();
     }
 
+    /**
+     * TODO need support TubeMQ
+     */
     private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
         String groupId = groupInfo.getInlongGroupId();
         List<StreamSink> streamSinks = sinkService.listSink(groupId, null);
@@ -129,7 +132,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
             InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
 
         if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
-            String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
+            String errMsg = String.format("Unsupported MQ type %s", groupInfo.getMqType());
             log.error(errMsg);
             throw new WorkflowListenerException(errMsg);
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index daaf55050..22723787c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -122,7 +122,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
 
     private List<StreamSource> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
         if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
-            String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
+            String errMsg = String.format("Unsupported MQ type %s", groupInfo.getMqType());
             log.error(errMsg);
             throw new WorkflowListenerException(errMsg);
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index df8ddad08..2baff6770 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -25,19 +25,19 @@ import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
 import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
 import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
 import org.apache.inlong.manager.common.settings.InlongGroupSettings;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
 import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
 import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
 import org.apache.inlong.manager.service.mq.util.PulsarUtils;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
+import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
 import org.apache.inlong.manager.workflow.WorkflowContext;
 import org.apache.inlong.manager.workflow.event.ListenerResult;
 import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -47,7 +47,6 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Component;
 
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 
@@ -59,15 +58,15 @@ import java.util.List;
 public class ConsumptionCompleteProcessListener implements ProcessEventListener {
 
     @Autowired
-    private PulsarOptService pulsarMqOptService;
+    private InlongGroupEntityMapper groupMapper;
     @Autowired
-    private InlongClusterService clusterService;
+    private ConsumptionEntityMapper consumptionMapper;
     @Autowired
-    private InlongGroupService groupService;
+    private InlongClusterService clusterService;
     @Autowired
-    private ConsumptionEntityMapper consumptionMapper;
+    private PulsarOperator pulsarOperator;
     @Autowired
-    private TubeMqOptService tubeMqOptService;
+    private TubeMQOperator tubeMQOperator;
 
     @Override
     public ProcessEvent event() {
@@ -87,12 +86,12 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
 
         MQType mqType = MQType.forType(entity.getMqType());
         if (mqType == MQType.TUBE) {
-            this.createTubeConsumerGroup(entity);
+            this.createTubeConsumerGroup(entity, context.getOperator());
             return ListenerResult.success("Create Tube consumer group successful");
         } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             this.createPulsarSubscription(entity);
         } else {
-            throw new WorkflowListenerException("Unsupported MQ type [" + mqType + "]");
+            throw new WorkflowListenerException("Unsupported MQ type " + mqType);
         }
 
         this.updateConsumerInfo(consumptionId, entity.getConsumerGroup());
@@ -116,12 +115,12 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
      */
     private void createPulsarSubscription(ConsumptionEntity entity) {
         String groupId = entity.getInlongGroupId();
-        InlongGroupInfo groupInfo = groupService.get(groupId);
-        Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
-        String mqResource = groupInfo.getMqResource();
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
+        String mqResource = groupEntity.getMqResource();
         Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
 
-        String clusterTag = groupInfo.getInlongClusterTag();
+        String clusterTag = groupEntity.getInlongClusterTag();
         InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
         PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
         try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
@@ -146,7 +145,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
     private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
             List<String> topics) {
         try {
-            pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
+            pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
         } catch (Exception e) {
             log.error("create pulsar consumer group failed", e);
             throw new WorkflowListenerException("failed to create pulsar consumer group");
@@ -156,19 +155,20 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
     /**
      * Create tube consumer group
      */
-    private void createTubeConsumerGroup(ConsumptionEntity consumption) {
-        AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
-        addTubeConsumeGroupRequest.setClusterId(1); // TODO is cluster id needed?
-        addTubeConsumeGroupRequest.setCreateUser(consumption.getCreator());
-        AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
-        bean.setTopicName(consumption.getTopic());
-        bean.setGroupName(consumption.getConsumerGroup());
-        addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
+    private void createTubeConsumerGroup(ConsumptionEntity entity, String operator) {
+        String groupId = entity.getInlongGroupId();
+        InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+        Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
+        String mqResource = groupEntity.getMqResource();
+        Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
 
+        String clusterTag = groupEntity.getInlongClusterTag();
+        TubeClusterInfo clusterInfo = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.CLS_TUBE);
         try {
-            tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+            tubeMQOperator.createConsumerGroup(clusterInfo, entity.getTopic(), entity.getConsumerGroup(), operator);
         } catch (Exception e) {
-            throw new WorkflowListenerException("failed to create tube consumer group: " + addTubeConsumeGroupRequest);
+            log.error("failed to create tube consumer group: ", e);
+            throw new WorkflowListenerException("failed to create tube consumer group: " + e.getMessage());
         }
     }