You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/28 10:45:36 UTC

[incubator-inlong] branch master updated: [INLONG-3389][Manager] Remove deprecated constants of MQ type (#3400)

This is an automated email from the ASF dual-hosted git repository.

healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new daa515d  [INLONG-3389][Manager] Remove deprecated constants of MQ type (#3400)
daa515d is described below

commit daa515dab3044d713eb8a4fc08d67948b480b049
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Mar 28 18:45:29 2022 +0800

    [INLONG-3389][Manager] Remove deprecated constants of MQ type (#3400)
---
 .../inlong/manager/client/api/InlongGroupConf.java |  2 +-
 .../api/{MqBaseConf.java => MQBaseConf.java}       | 12 ++++-----
 .../inlong/manager/client/api/PulsarBaseConf.java  |  6 ++---
 .../manager/client/api/TdmqPulsarBaseConf.java     |  4 +--
 .../inlong/manager/client/api/TubeBaseConf.java    |  6 ++---
 .../client/api/util/InlongGroupTransfer.java       | 18 ++++++-------
 .../manager/client/api/util/InlongParser.java      | 12 ++++-----
 .../inlong/manager/common/enums/Constant.java      |  5 ----
 .../common/enums/{MqType.java => MQType.java}      | 26 ++++++++++++------
 .../pojo/consumption/ConsumptionMqExtBase.java     |  8 +++---
 .../pojo/consumption/ConsumptionPulsarInfo.java    |  7 ++---
 .../common/pojo/group/InlongGroupMqExtBase.java    |  5 ++--
 .../common/pojo/group/InlongGroupPulsarInfo.java   |  4 +--
 .../manager/service/CommonOperateService.java      | 17 +++++++-----
 .../service/core/impl/ConsumptionServiceImpl.java  | 31 +++++++++++-----------
 .../service/core/impl/InlongGroupServiceImpl.java  | 29 ++++++++++----------
 .../core/impl/ThirdPartyClusterServiceImpl.java    | 12 +++++----
 .../service/thirdparty/mq/PulsarEventSelector.java |  6 ++---
 .../service/thirdparty/mq/TubeEventSelector.java   |  4 +--
 .../thirdparty/sort/ZkDisabledEventSelector.java   |  6 ++---
 .../thirdparty/sort/ZkEnabledEventSelector.java    |  4 +--
 .../thirdparty/sort/util/SourceInfoUtils.java      | 10 +++----
 .../ConsumptionCompleteProcessListener.java        |  8 +++---
 .../stream/CreateStreamWorkflowDefinition.java     |  9 ++++---
 .../service/core/impl/ConsumptionServiceTest.java  |  6 ++---
 .../core/impl/InlongGroupProcessOperationTest.java |  4 +--
 .../service/core/impl/InlongGroupServiceTest.java  | 11 ++++----
 .../workflow/ServiceTaskListenerFactoryTest.java   |  9 ++++---
 .../service/workflow/WorkflowServiceImplTest.java  | 14 +++++-----
 29 files changed, 153 insertions(+), 142 deletions(-)

diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
index 4ab7ff8..bef4dcc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
@@ -39,7 +39,7 @@ public class InlongGroupConf {
     private String description;
 
     @ApiModelProperty("Message queue configuration")
-    private MqBaseConf mqBaseConf;
+    private MQBaseConf mqBaseConf;
 
     @ApiModelProperty("Sort configuration")
     private SortBaseConf sortBaseConf;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MQBaseConf.java
similarity index 82%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
rename to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MQBaseConf.java
index d3993b2..950e2ee 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MQBaseConf.java
@@ -20,18 +20,18 @@ package org.apache.inlong.manager.client.api;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
 
 import java.io.Serializable;
 
 @Data
 @ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+public abstract class MQBaseConf implements Serializable {
 
-    public static final MqBaseConf BLANK_MQ_CONF = new MqBaseConf() {
+    public static final MQBaseConf BLANK_MQ_CONF = new MQBaseConf() {
         @Override
-        public MqType getType() {
-            return MqType.NONE;
+        public MQType getType() {
+            return MQType.NONE;
         }
     };
 
@@ -41,5 +41,5 @@ public abstract class MqBaseConf implements Serializable {
     @ApiModelProperty("Is need create for mq resources")
     private boolean enableCreateResource = true;
 
-    public abstract MqType getType();
+    public abstract MQType getType();
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
index dc05abc..bce79e3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
@@ -24,17 +24,17 @@ import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
 import org.apache.inlong.manager.client.api.auth.Authentication;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
 @AllArgsConstructor
 @NoArgsConstructor
 @ApiModel("Base configuration for Pulsar")
-public class PulsarBaseConf extends MqBaseConf {
+public class PulsarBaseConf extends MQBaseConf {
 
     @ApiModelProperty("Message queue type")
-    private MqType type = MqType.PULSAR;
+    private MQType type = MQType.PULSAR;
 
     @ApiModelProperty("Pulsar admin URL")
     private String pulsarAdminUrl;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
index ae8d231..151df13 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
@@ -22,7 +22,7 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
 
 @Data
 @EqualsAndHashCode(callSuper = true)
@@ -31,6 +31,6 @@ import org.apache.inlong.manager.common.enums.MqType;
 public class TdmqPulsarBaseConf extends PulsarBaseConf {
 
     @ApiModelProperty("Message queue type")
-    private MqType type = MqType.TDMQ_PULSAR;
+    private MQType type = MQType.TDMQ_PULSAR;
 
 }
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
index a16ff2b..7a8dccf 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
@@ -22,16 +22,16 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
 
 @Data
 @AllArgsConstructor
 @NoArgsConstructor
 @ApiModel("Base configuration for Tube")
-public class TubeBaseConf extends MqBaseConf {
+public class TubeBaseConf extends MQBaseConf {
 
     @ApiModelProperty("Message queue type")
-    private MqType type = MqType.TUBE;
+    private MQType type = MQType.TUBE;
 
     @ApiModelProperty("Tube manager URL")
     private String tubeManagerUrl;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 8057967..fec990e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -24,7 +24,7 @@ import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
 import org.apache.inlong.manager.client.api.InlongGroupConf;
-import org.apache.inlong.manager.client.api.MqBaseConf;
+import org.apache.inlong.manager.client.api.MQBaseConf;
 import org.apache.inlong.manager.client.api.PulsarBaseConf;
 import org.apache.inlong.manager.client.api.SortBaseConf;
 import org.apache.inlong.manager.client.api.SortBaseConf.SortType;
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.client.api.auth.Authentication;
 import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
 import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
 import org.apache.inlong.manager.client.api.auth.TokenAuthentication;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupMqExtBase;
@@ -63,16 +63,16 @@ public class InlongGroupTransfer {
         return inlongGroupConf;
     }
 
-    public static MqBaseConf parseMqBaseConf(InlongGroupResponse inlongGroupResponse) {
+    public static MQBaseConf parseMqBaseConf(InlongGroupResponse inlongGroupResponse) {
         InlongGroupMqExtBase mqExtBase = inlongGroupResponse.getMqExtInfo();
         if (null == mqExtBase || StringUtils.isBlank(mqExtBase.getMiddlewareType())) {
             return null;
         }
         String middleWare = mqExtBase.getMiddlewareType();
-        MqType mqType = MqType.forType(middleWare);
+        MQType mqType = MQType.forType(middleWare);
         switch (mqType) {
             case NONE:
-                return MqBaseConf.BLANK_MQ_CONF;
+                return MQBaseConf.BLANK_MQ_CONF;
             case PULSAR:
             case TDMQ_PULSAR:
                 return parsePulsarConf(inlongGroupResponse);
@@ -197,8 +197,8 @@ public class InlongGroupTransfer {
         groupInfo.setPeakRecords(groupConf.getPeakRecords().intValue());
         groupInfo.setMaxLength(groupConf.getMaxLength());
         groupInfo.setProxyClusterId(groupConf.getProxyClusterId());
-        MqBaseConf mqConf = groupConf.getMqBaseConf();
-        MqType mqType = MqType.NONE;
+        MQBaseConf mqConf = groupConf.getMqBaseConf();
+        MQType mqType = MQType.NONE;
         if (null != mqConf) {
             mqType = mqConf.getType();
             groupInfo.setMiddlewareType(mqType.name());
@@ -206,7 +206,7 @@ public class InlongGroupTransfer {
         groupInfo.setInCharges(groupConf.getOperator());
         groupInfo.setExtList(Lists.newArrayList());
         groupInfo.setCreator(groupConf.getOperator());
-        if (mqType == MqType.PULSAR || mqType == MqType.TDMQ_PULSAR) {
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             PulsarBaseConf pulsarBaseConf = (PulsarBaseConf) mqConf;
             groupInfo.setMqResourceObj(pulsarBaseConf.getNamespace());
             InlongGroupPulsarInfo pulsarInfo = createPulsarInfo(pulsarBaseConf);
@@ -214,7 +214,7 @@ public class InlongGroupTransfer {
             List<InlongGroupExtInfo> extInfos = createPulsarExtInfo(pulsarBaseConf);
             groupInfo.getExtList().addAll(extInfos);
             groupInfo.setTopicPartitionNum(pulsarBaseConf.getTopicPartitionNum());
-        } else if (mqType == MqType.TUBE) {
+        } else if (mqType == MQType.TUBE) {
             TubeBaseConf tubeBaseConf = (TubeBaseConf) mqConf;
             List<InlongGroupExtInfo> extInfos = createTubeExtInfo(tubeBaseConf);
             groupInfo.setMqResourceObj(tubeBaseConf.getGroupName());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 4a61ca7..4042fa1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -24,7 +24,7 @@ import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
@@ -85,9 +85,8 @@ public class InlongParser {
         InlongGroupResponse inlongGroupResponse = GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class);
         JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
         if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
-            String middlewareType = mqExtInfo.get(MIDDLEWARE_TYPE).getAsString();
-            if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(
-                    middlewareType)) {
+            MQType mqType = MQType.forType(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString());
+            if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                 InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
                 inlongGroupResponse.setMqExtInfo(pulsarInfo);
             }
@@ -220,9 +219,8 @@ public class InlongParser {
                 InlongGroupApproveRequest.class);
         JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
         if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
-            String middlewareType = mqExtInfo.get(MIDDLEWARE_TYPE).getAsString();
-            if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(
-                    middlewareType)) {
+            MQType mqType = MQType.forType(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString());
+            if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                 InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
                 groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
                 groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 2701a5a..643a0cc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -62,11 +62,6 @@ public class Constant {
 
     public static final String FILE_FORMAT_PARQUET = "Parquet";
 
-    public static final String MIDDLEWARE_TUBE = "TUBE";
-    public static final String MIDDLEWARE_PULSAR = "PULSAR";
-    public static final String MIDDLEWARE_TDMQ_PULSAR = "TDMQ_PULSAR";
-    public static final String MIDDLEWARE_NONE = "NONE";
-
     public static final String SCHEMA_M0_DAY = "m0_day";
 
     public static final String CLUSTER_TUBE = "TUBE";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MqType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
similarity index 73%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MqType.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
index e1d27c1..855f740 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MqType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
@@ -17,15 +17,25 @@
 
 package org.apache.inlong.manager.common.enums;
 
-public enum MqType {
-    PULSAR,
-    TUBE,
-    TDMQ_PULSAR,
-    NONE;
+import lombok.Getter;
 
-    public static MqType forType(String type) {
-        for (MqType mqType : values()) {
-            if (mqType.name().equals(type)) {
+public enum MQType {
+
+    PULSAR("PULSAR"),
+    TUBE("TUBE"),
+    TDMQ_PULSAR("TDMQ_PULSAR"),
+    NONE("NONE");
+
+    @Getter
+    private String type;
+
+    MQType(String type) {
+        this.type = type;
+    }
+
+    public static MQType forType(String type) {
+        for (MQType mqType : values()) {
+            if (mqType.getType().equals(type)) {
                 return mqType;
             }
         }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
index 0f0784f..a49a1e2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.Constant;
 
 /**
  * Extended consumption information of different MQs
@@ -32,8 +31,8 @@ import org.apache.inlong.manager.common.enums.Constant;
 @ApiModel("Extended consumption information of different MQs")
 @JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class)
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR),
-        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR)
+        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "PULSAR"),
+        @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "TDMQ_PULSAR")
 })
 public class ConsumptionMqExtBase {
 
@@ -52,7 +51,6 @@ public class ConsumptionMqExtBase {
     @ApiModelProperty("Whether to delete, 0: not deleted, 1: deleted")
     private Integer isDeleted = 0;
 
-    @ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
+    @ApiModelProperty("The middleware type of MQ")
     private String middlewareType;
-
 }
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
index 22c2a98..3c49ba6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 
 /**
  * Pulsar consumer information
@@ -33,8 +33,9 @@ import org.apache.inlong.manager.common.enums.Constant;
 @ApiModel("Pulsar consumer information")
 public class ConsumptionPulsarInfo extends ConsumptionMqExtBase {
 
-    @ApiModelProperty("The middleware type of MQ")
-    private String middlewareType = Constant.MIDDLEWARE_PULSAR;
+    public ConsumptionPulsarInfo() {
+        this.setMiddlewareType(MQType.PULSAR.getType());
+    }
 
     @ApiModelProperty("Whether to configure the dead letter queue, 0: do not configure, 1: configure")
     private Integer isDlq;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java
index 8fdd587..3ad4b1c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
-import org.apache.inlong.manager.common.enums.Constant;
 
 /**
  * Extended inlong group info of different MQs
@@ -32,8 +31,8 @@ import org.apache.inlong.manager.common.enums.Constant;
 @ApiModel("Extended inlong group info of different MQs")
 @JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = InlongGroupMqExtBase.class)
 @JsonSubTypes({
-        @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR),
-        @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR)
+        @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = "PULSAR"),
+        @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = "TDMQ_PULSAR")
 })
 public class InlongGroupMqExtBase {
 
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java
index 66dfb6f..7663815 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 
 /**
  * Inlong group information for Pulsar
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.enums.Constant;
 public class InlongGroupPulsarInfo extends InlongGroupMqExtBase {
 
     public InlongGroupPulsarInfo() {
-        this.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+        this.setMiddlewareType(MQType.PULSAR.getType());
     }
 
     @ApiModelProperty(value = "Tenant name of Inlong group")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index eaa314f..28c06d7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.beans.ClusterBean;
 import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.SinkType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -97,14 +98,14 @@ public class CommonOperateService {
 
         switch (key) {
             case Constant.PULSAR_SERVICEURL: {
-                clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
+                clusterEntity = getMQCluster(MQType.PULSAR);
                 if (clusterEntity != null) {
                     result = clusterEntity.getUrl();
                 }
                 break;
             }
             case Constant.PULSAR_ADMINURL: {
-                clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
+                clusterEntity = getMQCluster(MQType.PULSAR);
                 if (clusterEntity != null) {
                     params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
                     result = params.get(key);
@@ -114,7 +115,7 @@ public class CommonOperateService {
             case Constant.CLUSTER_TUBE_MANAGER:
             case Constant.CLUSTER_TUBE_CLUSTER_ID:
             case Constant.TUBE_MASTER_URL: {
-                clusterEntity = getMQCluster(Constant.MIDDLEWARE_TUBE);
+                clusterEntity = getMQCluster(MQType.PULSAR);
                 if (clusterEntity != null) {
                     if (key.equals(Constant.TUBE_MASTER_URL)) {
                         result = clusterEntity.getUrl();
@@ -134,9 +135,10 @@ public class CommonOperateService {
      *
      * TODO Add data_proxy_cluster_name for query.
      *
-     * @param type Cluster type, such as TUBE, PULSAR, etc.
+     * @param type
+     * @return
      */
-    private ThirdPartyClusterEntity getMQCluster(String type) {
+    private ThirdPartyClusterEntity getMQCluster(MQType type) {
         List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
         if (CollectionUtils.isEmpty(clusterList)) {
             LOGGER.warn("no data proxy cluster found");
@@ -144,7 +146,7 @@ public class CommonOperateService {
         }
         String mqSetName = clusterList.get(0).getMqSetName();
         List<ThirdPartyClusterEntity> mqClusterList = thirdPartyClusterMapper.selectMQCluster(mqSetName,
-                Collections.singletonList(type));
+                Collections.singletonList(type.getType()));
         if (CollectionUtils.isEmpty(mqClusterList)) {
             LOGGER.warn("no mq cluster found by type={} and mq set name={}", type, mqSetName);
             return null;
@@ -159,7 +161,8 @@ public class CommonOperateService {
      * @return Pulsar cluster info.
      */
     public PulsarClusterInfo getPulsarClusterInfo(String type) {
-        ThirdPartyClusterEntity clusterEntity = getMQCluster(type);
+        MQType mqType = MQType.forType(type);
+        ThirdPartyClusterEntity clusterEntity = getMQCluster(mqType);
         if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) {
             throw new BusinessException("pulsar cluster or pulsar ext params is empty");
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 413fbe2..7e9d48f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.common.CountInfo;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
@@ -120,8 +121,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
         ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
 
-        String mqType = info.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(info.getMiddlewareType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
             Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
             ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
@@ -151,8 +152,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         fullConsumptionInfo(info);
         Date now = new Date();
         ConsumptionEntity entity = this.saveConsumption(info, operator, now);
-        String mqType = entity.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(entity.getMiddlewareType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             savePulsarInfo(info.getMqExtInfo(), entity);
         }
 
@@ -240,8 +241,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         entity.setModifyTime(now);
 
         // Modify Pulsar consumption info
-        String mqType = info.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(info.getMiddlewareType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
             Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
             pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
@@ -347,10 +348,10 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         }
 
         log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
-        String mqType = bizInfo.getMiddlewareType();
+        MQType mqType = MQType.forType(bizInfo.getMiddlewareType());
         ConsumptionEntity entity = new ConsumptionEntity();
         entity.setInlongGroupId(groupId);
-        entity.setMiddlewareType(mqType);
+        entity.setMiddlewareType(mqType.getType());
         entity.setTopic(topic);
         entity.setConsumerGroupId(consumerGroup);
         entity.setConsumerGroupName(consumerGroup);
@@ -364,7 +365,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
 
         consumptionMapper.insert(entity);
 
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
             pulsarEntity.setConsumptionId(entity.getId());
             pulsarEntity.setConsumerGroupId(consumerGroup);
@@ -379,8 +380,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
     private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
         NewConsumptionProcessForm form = new NewConsumptionProcessForm();
         Integer id = consumptionInfo.getId();
-        String mqType = consumptionInfo.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(consumptionInfo.getMiddlewareType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
             ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
                     ConsumptionPulsarInfo::new);
@@ -436,12 +437,12 @@ public class ConsumptionServiceImpl implements ConsumptionService {
         Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId);
 
         // Tube’s topic is the inlong group level, one inlong group, one Tube topic
-        String mqType = topicVO.getMiddlewareType();
-        if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+        MQType mqType = MQType.forType(topicVO.getMiddlewareType());
+        if (mqType == MQType.TUBE) {
             String bizTopic = topicVO.getMqResourceObj();
             Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
                     "topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
-        } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             // Pulsar's topic is the inlong stream level.
             // There will be multiple inlong streams under one inlong group, and there will be multiple topics
             List<InlongStreamTopicResponse> dsTopicList = topicVO.getDsTopicList();
@@ -451,7 +452,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
                 Preconditions.checkEmpty(topicSet, "topic [" + topicSet + "] not belong to inlong group " + groupId);
             }
         }
-        info.setMiddlewareType(mqType);
+        info.setMiddlewareType(mqType.getType());
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index 8f7dca9..33db3f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
@@ -131,8 +132,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         groupMapper.insertSelective(entity);
         this.saveOrUpdateExt(groupId, groupInfo.getExtList());
 
-        String mqType = groupInfo.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(groupInfo.getMiddlewareType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
             Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
 
@@ -187,21 +188,21 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         groupInfo.setExtList(extInfoList);
 
         // If the middleware is Pulsar, we need to encapsulate Pulsar related data
-        String mqType = entity.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(entity.getMiddlewareType());
+        if (MQType.PULSAR == mqType || MQType.TDMQ_PULSAR == mqType) {
             InlongGroupPulsarEntity pulsarEntity = groupPulsarMapper.selectByGroupId(groupId);
             Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found by the groupId=" + groupId);
             InlongGroupPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, InlongGroupPulsarInfo::new);
-            pulsarInfo.setMiddlewareType(mqType);
+            pulsarInfo.setMiddlewareType(mqType.name());
             groupInfo.setMqExtInfo(pulsarInfo);
         }
 
         // For approved inlong group, encapsulate the cluster address of the middleware
         if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode(groupInfo.getStatus())) {
-            if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(mqType)) {
+            if (mqType == MQType.TUBE) {
                 groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
-            } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
-                PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType);
+            } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+                PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType.name());
                 groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl());
                 groupInfo.setPulsarServiceUrl(pulsarCluster.getBrokerServiceUrl());
             }
@@ -270,8 +271,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         this.saveOrUpdateExt(groupId, groupRequest.getExtList());
 
         // Update the Pulsar info
-        String mqType = groupRequest.getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(groupRequest.getMiddlewareType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupRequest.getMqExtInfo();
             Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
             Integer writeQuorum = pulsarInfo.getWriteQuorum();
@@ -433,14 +434,14 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         LOGGER.debug("begin to get topic by groupId={}", groupId);
         InlongGroupInfo groupInfo = this.get(groupId);
 
-        String mqType = groupInfo.getMiddlewareType();
+        MQType mqType = MQType.forType(groupInfo.getMiddlewareType());
         InlongGroupTopicResponse topicVO = new InlongGroupTopicResponse();
 
-        if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+        if (mqType == MQType.TUBE) {
             // Tube Topic corresponds to inlong group one-to-one
             topicVO.setMqResourceObj(groupInfo.getMqResourceObj());
             topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
-        } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             // Pulsar's topic corresponds to the inlong stream one-to-one
             topicVO.setDsTopicList(streamService.getTopicList(groupId));
             topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
@@ -451,7 +452,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
         }
 
         topicVO.setInlongGroupId(groupId);
-        topicVO.setMiddlewareType(mqType);
+        topicVO.setMiddlewareType(mqType.name());
         return topicVO;
     }
 
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index 78a6ecc..4782abb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
 import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
 import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
@@ -253,11 +254,11 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
 
             DataProxyConfig config = new DataProxyConfig();
             config.setM(groupEntity.getSchemaName());
-            String mqType = groupEntity.getMiddlewareType();
-            if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+            MQType mqType = MQType.forType(groupEntity.getMiddlewareType());
+            if (mqType == MQType.TUBE) {
                 config.setInlongGroupId(groupId);
                 config.setTopic(bizResource);
-            } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+            } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                 List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
                 for (InlongStreamEntity stream : streamList) {
                     String topic = stream.getMqResourceObj();
@@ -301,7 +302,8 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
         for (InlongGroupEntity groupEntity : groupEntityList) {
             final String groupId = groupEntity.getInlongGroupId();
             final String mqResource = groupEntity.getMqResourceObj();
-            if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+            MQType type = MQType.forType(mqType);
+            if (type == MQType.PULSAR || type == MQType.TDMQ_PULSAR) {
                 List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
                 for (InlongStreamEntity stream : streamList) {
                     DataProxyConfig topicConfig = new DataProxyConfig();
@@ -316,7 +318,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
                     topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
                     topicList.add(topicConfig);
                 }
-            } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+            } else if (type == MQType.TUBE) {
                 DataProxyConfig topicConfig = new DataProxyConfig();
                 topicConfig.setInlongGroupId(groupId);
                 topicConfig.setTopic(mqResource);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
index 0bf990c..5d4289b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.thirdparty.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -35,8 +35,8 @@ public class PulsarEventSelector implements EventSelector {
             return false;
         }
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
-        String mqType = form.getGroupInfo().getMiddlewareType();
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) form.getGroupInfo().getMqExtInfo();
             return pulsarInfo.getEnableCreateResource() == 1;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
index 90ee4f3..3b5e713 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
@@ -18,7 +18,7 @@
 package org.apache.inlong.manager.service.thirdparty.mq;
 
 import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -36,7 +36,7 @@ public class TubeEventSelector implements EventSelector {
         }
         GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
         InlongGroupInfo groupInfo = form.getGroupInfo();
-        if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(groupInfo.getMiddlewareType())) {
+        if (MQType.forType(groupInfo.getMiddlewareType()) == MQType.TUBE) {
             return true;
         }
         log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index d6c529e..9686c8d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -39,12 +39,12 @@ public class ZkDisabledEventSelector implements EventSelector {
             GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
             InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
             return groupInfo.getZookeeperEnabled() == 0
-                    && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
+                    && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
         } else if (processForm instanceof UpdateGroupProcessForm) {
             UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
             InlongGroupInfo groupInfo = updateGroupProcessForm.getGroupInfo();
             return groupInfo.getZookeeperEnabled() == 0
-                    && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
+                    && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
         } else {
             return false;
         }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index a8ee6e3..05e2b80 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.thirdparty.sort;
 
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -39,7 +39,7 @@ public class ZkEnabledEventSelector implements EventSelector {
         }
         GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
         InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
-        return groupInfo.getZookeeperEnabled() == 1 && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
+        return groupInfo.getZookeeperEnabled() == 1 && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
     }
 
 }
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 13a74c8..06d11c1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.thirdparty.sort.util;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.SourceType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -64,13 +64,13 @@ public class SourceInfoUtils {
             ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
             SourceResponse sourceResponse, List<FieldInfo> sourceFields) {
 
-        String mqType = groupInfo.getMiddlewareType();
+        MQType mqType = MQType.forType(groupInfo.getMiddlewareType());
         DeserializationInfo deserializationInfo = SerializationUtils.createDeserialInfo(sourceResponse, streamInfo);
         SourceInfo sourceInfo;
-        if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             sourceInfo = createPulsarSourceInfo(pulsarCluster, clusterBean, groupInfo, streamInfo, deserializationInfo,
                     sourceFields);
-        } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+        } else if (mqType == MQType.TUBE) {
             // InlongGroupInfo groupInfo, String masterAddress,
             sourceInfo = createTubeSourceInfo(groupInfo, masterAddress, clusterBean, deserializationInfo, sourceFields);
         } else {
@@ -100,7 +100,7 @@ public class SourceInfoUtils {
         FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
 
         String type = pulsarCluster.getType();
-        if (StringUtils.isNotEmpty(type) && Constant.MIDDLEWARE_TDMQ_PULSAR.equals(type)) {
+        if (StringUtils.isNotEmpty(type) && MQType.forType(type) == MQType.TDMQ_PULSAR) {
             return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
                     fullTopicName, consumerGroup, pulsarCluster.getToken(), deserializationInfo, fieldInfosArr);
         } else {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 7f0b49d..d6d842d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.workflow.consumption.listener;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
 import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -84,11 +84,11 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
             throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
         }
 
-        String mqType = entity.getMiddlewareType();
-        if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+        MQType mqType = MQType.forType(entity.getMiddlewareType());
+        if (mqType == MQType.TUBE) {
             this.createTubeConsumerGroup(entity);
             return ListenerResult.success("Create Tube consumer group successful");
-        } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+        } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
             this.createPulsarTopicMessage(entity);
         } else {
             throw new WorkflowListenerException("middleware type [" + mqType + "] not supported");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 8e879ec..e5c106d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.stream;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.sink.StreamSinkService;
 import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableForStreamListener;
@@ -89,8 +90,8 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask createPulsarTopicTask = new ServiceTask();
         createPulsarTopicTask.setSkipResolver(c -> {
             GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
-            String mqType = form.getGroupInfo().getMiddlewareType();
-            if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+            MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+            if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                 return false;
             }
             log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}",
@@ -105,8 +106,8 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
         ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask();
         createPulsarSubscriptionGroupTask.setSkipResolver(c -> {
             GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
-            String mqType = form.getGroupInfo().getMiddlewareType();
-            if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+            MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+            if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
                 return false;
             }
             log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}",
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
index 975c24e..a7f5832 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
@@ -17,7 +17,7 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
 import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
 import org.apache.inlong.manager.service.ServiceBaseTest;
@@ -41,11 +41,11 @@ public class ConsumptionServiceTest extends ServiceBaseTest {
         consumptionInfo.setTopic(inlongGroup);
         consumptionInfo.setConsumerGroupName(consumerGroup);
         consumptionInfo.setInlongGroupId("b_" + inlongGroup);
-        consumptionInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+        consumptionInfo.setMiddlewareType(MQType.PULSAR.getType());
         consumptionInfo.setCreator(operator);
 
         ConsumptionPulsarInfo pulsarInfo = new ConsumptionPulsarInfo();
-        pulsarInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+        pulsarInfo.setMiddlewareType(MQType.PULSAR.getType());
         pulsarInfo.setIsDlq(1);
         pulsarInfo.setDeadLetterTopic("test_dlq");
         pulsarInfo.setIsRlq(0);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
index d5ae35b..4e5cbda 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
@@ -17,9 +17,9 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
 import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
@@ -64,7 +64,7 @@ public class InlongGroupProcessOperationTest extends ServiceBaseTest {
         groupInfo.setInlongGroupId(GROUP_ID);
         groupInfo.setName(GROUP_NAME);
         groupInfo.setInCharges(OPERATOR);
-        groupInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+        groupInfo.setMiddlewareType(MQType.PULSAR.getType());
         InlongGroupPulsarInfo pulsarInfo = new InlongGroupPulsarInfo();
         pulsarInfo.setInlongGroupId(GROUP_ID);
         groupInfo.setMqExtInfo(pulsarInfo);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
index 464e8ed..6a036e8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
@@ -17,10 +17,8 @@
 
 package org.apache.inlong.manager.service.core.impl;
 
-import java.util.Arrays;
-import java.util.List;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
@@ -32,6 +30,9 @@ import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.TestComponent;
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * Inlong group service test
  */
@@ -63,13 +64,13 @@ public class InlongGroupServiceTest {
 
         groupInfo = new InlongGroupInfo();
         groupInfo.setName(groupName);
-        groupInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+        groupInfo.setMiddlewareType(MQType.PULSAR.getType());
         groupInfo.setCreator(operator);
         groupInfo.setInCharges(operator);
         groupInfo.setStatus(EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode());
 
         InlongGroupPulsarInfo pulsarInfo = new InlongGroupPulsarInfo();
-        pulsarInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+        pulsarInfo.setMiddlewareType(MQType.PULSAR.getType());
         pulsarInfo.setEnsemble(3);
         pulsarInfo.setWriteQuorum(3);
         pulsarInfo.setAckQuorum(2);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java
index 2e32e1b..3c6454b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.inlong.manager.service.workflow;
 
-import java.util.List;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
 import org.apache.inlong.manager.service.ServiceBaseTest;
@@ -32,6 +31,8 @@ import org.junit.Assert;
 import org.junit.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 
+import java.util.List;
+
 public class ServiceTaskListenerFactoryTest extends ServiceBaseTest {
 
     @Autowired
@@ -43,7 +44,7 @@ public class ServiceTaskListenerFactoryTest extends ServiceBaseTest {
         GroupResourceProcessForm processForm = new GroupResourceProcessForm();
         InlongGroupInfo groupInfo = new InlongGroupInfo();
         //check pulsar listener
-        groupInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+        groupInfo.setMiddlewareType(MQType.PULSAR.getType());
         processForm.setGroupInfo(groupInfo);
         context.setProcessForm(processForm);
         List<QueueOperateListener> queueOperateListeners = serviceTaskListenerFactory.getQueueOperateListener(context);
@@ -52,7 +53,7 @@ public class ServiceTaskListenerFactoryTest extends ServiceBaseTest {
         Assert.assertTrue(queueOperateListeners.get(1) instanceof CreatePulsarGroupTaskListener);
 
         // check tube listener
-        groupInfo.setMiddlewareType(Constant.MIDDLEWARE_TUBE);
+        groupInfo.setMiddlewareType(MQType.TUBE.getType());
         queueOperateListeners = serviceTaskListenerFactory.getQueueOperateListener(context);
         Assert.assertEquals(2, queueOperateListeners.size());
         Assert.assertTrue(queueOperateListeners.get(0) instanceof CreateTubeTopicTaskListener);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index ebec742..a2347eb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -19,14 +19,14 @@ package org.apache.inlong.manager.service.workflow;
 
 import com.github.pagehelper.PageInfo;
 import com.google.common.collect.Lists;
-import org.apache.inlong.manager.common.enums.Constant;
 import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
 import org.apache.inlong.manager.common.enums.ProcessStatus;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
 import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
 import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
 import org.apache.inlong.manager.common.pojo.workflow.TaskExecuteLogQuery;
 import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -229,7 +229,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     @Test
     public void testStartCreatePulsarWorkflow() {
-        initGroupForm(Constant.MIDDLEWARE_PULSAR);
+        initGroupForm(MQType.PULSAR.getType());
         mockTaskListenerFactory();
         WorkflowContext context = workflowEngine.processService().start(processName.name(), applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
@@ -247,7 +247,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     @Test
     public void testStartCreateTubeWorkflow() {
-        initGroupForm(Constant.MIDDLEWARE_TUBE);
+        initGroupForm(MQType.TUBE.getType());
         mockTaskListenerFactory();
         WorkflowContext context = workflowEngine.processService().start(processName.name(), applicant, form);
         WorkflowResult result = WorkflowBeanUtils.result(context);
@@ -266,7 +266,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     @Test
     public void testSuspendProcess() {
-        InlongGroupInfo groupInfo = initGroupForm(Constant.MIDDLEWARE_PULSAR);
+        InlongGroupInfo groupInfo = initGroupForm(MQType.PULSAR.getType());
         groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
         groupService.update(groupInfo.genRequest(), OPERATOR);
         UpdateGroupProcessForm form = new UpdateGroupProcessForm();
@@ -293,7 +293,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     @Test
     public void testRestartProcess() {
-        InlongGroupInfo groupInfo = initGroupForm(Constant.MIDDLEWARE_PULSAR);
+        InlongGroupInfo groupInfo = initGroupForm(MQType.PULSAR.getType());
         groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
         groupService.update(groupInfo.genRequest(), OPERATOR);
         groupInfo.setStatus(GroupState.SUSPENDED.getCode());
@@ -324,7 +324,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
 
     @Test
     public void testStopProcess() {
-        InlongGroupInfo groupInfo = initGroupForm(Constant.MIDDLEWARE_PULSAR);
+        InlongGroupInfo groupInfo = initGroupForm(MQType.PULSAR.getType());
         groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
         groupService.update(groupInfo.genRequest(), OPERATOR);
         groupInfo.setStatus(GroupState.SUSPENDED.getCode());