You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/03/28 10:45:36 UTC
[incubator-inlong] branch master updated: [INLONG-3389][Manager] Remove deprecated constants of MQ type (#3400)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new daa515d [INLONG-3389][Manager] Remove deprecated constants of MQ type (#3400)
daa515d is described below
commit daa515dab3044d713eb8a4fc08d67948b480b049
Author: kipshi <48...@users.noreply.github.com>
AuthorDate: Mon Mar 28 18:45:29 2022 +0800
[INLONG-3389][Manager] Remove deprecated constants of MQ type (#3400)
---
.../inlong/manager/client/api/InlongGroupConf.java | 2 +-
.../api/{MqBaseConf.java => MQBaseConf.java} | 12 ++++-----
.../inlong/manager/client/api/PulsarBaseConf.java | 6 ++---
.../manager/client/api/TdmqPulsarBaseConf.java | 4 +--
.../inlong/manager/client/api/TubeBaseConf.java | 6 ++---
.../client/api/util/InlongGroupTransfer.java | 18 ++++++-------
.../manager/client/api/util/InlongParser.java | 12 ++++-----
.../inlong/manager/common/enums/Constant.java | 5 ----
.../common/enums/{MqType.java => MQType.java} | 26 ++++++++++++------
.../pojo/consumption/ConsumptionMqExtBase.java | 8 +++---
.../pojo/consumption/ConsumptionPulsarInfo.java | 7 ++---
.../common/pojo/group/InlongGroupMqExtBase.java | 5 ++--
.../common/pojo/group/InlongGroupPulsarInfo.java | 4 +--
.../manager/service/CommonOperateService.java | 17 +++++++-----
.../service/core/impl/ConsumptionServiceImpl.java | 31 +++++++++++-----------
.../service/core/impl/InlongGroupServiceImpl.java | 29 ++++++++++----------
.../core/impl/ThirdPartyClusterServiceImpl.java | 12 +++++----
.../service/thirdparty/mq/PulsarEventSelector.java | 6 ++---
.../service/thirdparty/mq/TubeEventSelector.java | 4 +--
.../thirdparty/sort/ZkDisabledEventSelector.java | 6 ++---
.../thirdparty/sort/ZkEnabledEventSelector.java | 4 +--
.../thirdparty/sort/util/SourceInfoUtils.java | 10 +++----
.../ConsumptionCompleteProcessListener.java | 8 +++---
.../stream/CreateStreamWorkflowDefinition.java | 9 ++++---
.../service/core/impl/ConsumptionServiceTest.java | 6 ++---
.../core/impl/InlongGroupProcessOperationTest.java | 4 +--
.../service/core/impl/InlongGroupServiceTest.java | 11 ++++----
.../workflow/ServiceTaskListenerFactoryTest.java | 9 ++++---
.../service/workflow/WorkflowServiceImplTest.java | 14 +++++-----
29 files changed, 153 insertions(+), 142 deletions(-)
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
index 4ab7ff8..bef4dcc 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/InlongGroupConf.java
@@ -39,7 +39,7 @@ public class InlongGroupConf {
private String description;
@ApiModelProperty("Message queue configuration")
- private MqBaseConf mqBaseConf;
+ private MQBaseConf mqBaseConf;
@ApiModelProperty("Sort configuration")
private SortBaseConf sortBaseConf;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MQBaseConf.java
similarity index 82%
rename from inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
rename to inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MQBaseConf.java
index d3993b2..950e2ee 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MqBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/MQBaseConf.java
@@ -20,18 +20,18 @@ package org.apache.inlong.manager.client.api;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
import java.io.Serializable;
@Data
@ApiModel("Base configuration for message queue")
-public abstract class MqBaseConf implements Serializable {
+public abstract class MQBaseConf implements Serializable {
- public static final MqBaseConf BLANK_MQ_CONF = new MqBaseConf() {
+ public static final MQBaseConf BLANK_MQ_CONF = new MQBaseConf() {
@Override
- public MqType getType() {
- return MqType.NONE;
+ public MQType getType() {
+ return MQType.NONE;
}
};
@@ -41,5 +41,5 @@ public abstract class MqBaseConf implements Serializable {
@ApiModelProperty("Is need create for mq resources")
private boolean enableCreateResource = true;
- public abstract MqType getType();
+ public abstract MQType getType();
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
index dc05abc..bce79e3 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/PulsarBaseConf.java
@@ -24,17 +24,17 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
import org.apache.inlong.manager.client.api.auth.Authentication;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
@Data
@EqualsAndHashCode(callSuper = true)
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("Base configuration for Pulsar")
-public class PulsarBaseConf extends MqBaseConf {
+public class PulsarBaseConf extends MQBaseConf {
@ApiModelProperty("Message queue type")
- private MqType type = MqType.PULSAR;
+ private MQType type = MQType.PULSAR;
@ApiModelProperty("Pulsar admin URL")
private String pulsarAdminUrl;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
index ae8d231..151df13 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TdmqPulsarBaseConf.java
@@ -22,7 +22,7 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
@Data
@EqualsAndHashCode(callSuper = true)
@@ -31,6 +31,6 @@ import org.apache.inlong.manager.common.enums.MqType;
public class TdmqPulsarBaseConf extends PulsarBaseConf {
@ApiModelProperty("Message queue type")
- private MqType type = MqType.TDMQ_PULSAR;
+ private MQType type = MQType.TDMQ_PULSAR;
}
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
index a16ff2b..7a8dccf 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/TubeBaseConf.java
@@ -22,16 +22,16 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
@Data
@AllArgsConstructor
@NoArgsConstructor
@ApiModel("Base configuration for Tube")
-public class TubeBaseConf extends MqBaseConf {
+public class TubeBaseConf extends MQBaseConf {
@ApiModelProperty("Message queue type")
- private MqType type = MqType.TUBE;
+ private MQType type = MQType.TUBE;
@ApiModelProperty("Tube manager URL")
private String tubeManagerUrl;
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
index 8057967..fec990e 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongGroupTransfer.java
@@ -24,7 +24,7 @@ import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.client.api.FlinkSortBaseConf;
import org.apache.inlong.manager.client.api.InlongGroupConf;
-import org.apache.inlong.manager.client.api.MqBaseConf;
+import org.apache.inlong.manager.client.api.MQBaseConf;
import org.apache.inlong.manager.client.api.PulsarBaseConf;
import org.apache.inlong.manager.client.api.SortBaseConf;
import org.apache.inlong.manager.client.api.SortBaseConf.SortType;
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.client.api.auth.Authentication;
import org.apache.inlong.manager.client.api.auth.Authentication.AuthType;
import org.apache.inlong.manager.client.api.auth.SecretTokenAuthentication;
import org.apache.inlong.manager.client.api.auth.TokenAuthentication;
-import org.apache.inlong.manager.common.enums.MqType;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupMqExtBase;
@@ -63,16 +63,16 @@ public class InlongGroupTransfer {
return inlongGroupConf;
}
- public static MqBaseConf parseMqBaseConf(InlongGroupResponse inlongGroupResponse) {
+ public static MQBaseConf parseMqBaseConf(InlongGroupResponse inlongGroupResponse) {
InlongGroupMqExtBase mqExtBase = inlongGroupResponse.getMqExtInfo();
if (null == mqExtBase || StringUtils.isBlank(mqExtBase.getMiddlewareType())) {
return null;
}
String middleWare = mqExtBase.getMiddlewareType();
- MqType mqType = MqType.forType(middleWare);
+ MQType mqType = MQType.forType(middleWare);
switch (mqType) {
case NONE:
- return MqBaseConf.BLANK_MQ_CONF;
+ return MQBaseConf.BLANK_MQ_CONF;
case PULSAR:
case TDMQ_PULSAR:
return parsePulsarConf(inlongGroupResponse);
@@ -197,8 +197,8 @@ public class InlongGroupTransfer {
groupInfo.setPeakRecords(groupConf.getPeakRecords().intValue());
groupInfo.setMaxLength(groupConf.getMaxLength());
groupInfo.setProxyClusterId(groupConf.getProxyClusterId());
- MqBaseConf mqConf = groupConf.getMqBaseConf();
- MqType mqType = MqType.NONE;
+ MQBaseConf mqConf = groupConf.getMqBaseConf();
+ MQType mqType = MQType.NONE;
if (null != mqConf) {
mqType = mqConf.getType();
groupInfo.setMiddlewareType(mqType.name());
@@ -206,7 +206,7 @@ public class InlongGroupTransfer {
groupInfo.setInCharges(groupConf.getOperator());
groupInfo.setExtList(Lists.newArrayList());
groupInfo.setCreator(groupConf.getOperator());
- if (mqType == MqType.PULSAR || mqType == MqType.TDMQ_PULSAR) {
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
PulsarBaseConf pulsarBaseConf = (PulsarBaseConf) mqConf;
groupInfo.setMqResourceObj(pulsarBaseConf.getNamespace());
InlongGroupPulsarInfo pulsarInfo = createPulsarInfo(pulsarBaseConf);
@@ -214,7 +214,7 @@ public class InlongGroupTransfer {
List<InlongGroupExtInfo> extInfos = createPulsarExtInfo(pulsarBaseConf);
groupInfo.getExtList().addAll(extInfos);
groupInfo.setTopicPartitionNum(pulsarBaseConf.getTopicPartitionNum());
- } else if (mqType == MqType.TUBE) {
+ } else if (mqType == MQType.TUBE) {
TubeBaseConf tubeBaseConf = (TubeBaseConf) mqConf;
List<InlongGroupExtInfo> extInfos = createTubeExtInfo(tubeBaseConf);
groupInfo.setMqResourceObj(tubeBaseConf.getGroupName());
diff --git a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
index 4a61ca7..4042fa1 100644
--- a/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
+++ b/inlong-manager/manager-client/src/main/java/org/apache/inlong/manager/client/api/util/InlongParser.java
@@ -24,7 +24,7 @@ import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.inlong.manager.common.beans.Response;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
@@ -85,9 +85,8 @@ public class InlongParser {
InlongGroupResponse inlongGroupResponse = GsonUtil.fromJson(GsonUtil.toJson(data), InlongGroupResponse.class);
JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
- String middlewareType = mqExtInfo.get(MIDDLEWARE_TYPE).getAsString();
- if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(
- middlewareType)) {
+ MQType mqType = MQType.forType(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
inlongGroupResponse.setMqExtInfo(pulsarInfo);
}
@@ -220,9 +219,8 @@ public class InlongParser {
InlongGroupApproveRequest.class);
JsonObject mqExtInfo = groupJson.getAsJsonObject(MQ_EXT_INFO);
if (mqExtInfo != null && mqExtInfo.get(MIDDLEWARE_TYPE) != null) {
- String middlewareType = mqExtInfo.get(MIDDLEWARE_TYPE).getAsString();
- if (Constant.MIDDLEWARE_PULSAR.equals(middlewareType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(
- middlewareType)) {
+ MQType mqType = MQType.forType(mqExtInfo.get(MIDDLEWARE_TYPE).getAsString());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
InlongGroupPulsarInfo pulsarInfo = GsonUtil.fromJson(mqExtInfo.toString(), InlongGroupPulsarInfo.class);
groupApproveInfo.setAckQuorum(pulsarInfo.getAckQuorum());
groupApproveInfo.setEnsemble(pulsarInfo.getEnsemble());
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
index 2701a5a..643a0cc 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/Constant.java
@@ -62,11 +62,6 @@ public class Constant {
public static final String FILE_FORMAT_PARQUET = "Parquet";
- public static final String MIDDLEWARE_TUBE = "TUBE";
- public static final String MIDDLEWARE_PULSAR = "PULSAR";
- public static final String MIDDLEWARE_TDMQ_PULSAR = "TDMQ_PULSAR";
- public static final String MIDDLEWARE_NONE = "NONE";
-
public static final String SCHEMA_M0_DAY = "m0_day";
public static final String CLUSTER_TUBE = "TUBE";
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MqType.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
similarity index 73%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MqType.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
index e1d27c1..855f740 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MqType.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/enums/MQType.java
@@ -17,15 +17,25 @@
package org.apache.inlong.manager.common.enums;
-public enum MqType {
- PULSAR,
- TUBE,
- TDMQ_PULSAR,
- NONE;
+import lombok.Getter;
- public static MqType forType(String type) {
- for (MqType mqType : values()) {
- if (mqType.name().equals(type)) {
+public enum MQType {
+
+ PULSAR("PULSAR"),
+ TUBE("TUBE"),
+ TDMQ_PULSAR("TDMQ_PULSAR"),
+ NONE("NONE");
+
+ @Getter
+ private String type;
+
+ MQType(String type) {
+ this.type = type;
+ }
+
+ public static MQType forType(String type) {
+ for (MQType mqType : values()) {
+ if (mqType.getType().equals(type)) {
return mqType;
}
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
index 0f0784f..a49a1e2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionMqExtBase.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.Constant;
/**
* Extended consumption information of different MQs
@@ -32,8 +31,8 @@ import org.apache.inlong.manager.common.enums.Constant;
@ApiModel("Extended consumption information of different MQs")
@JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = ConsumptionMqExtBase.class)
@JsonSubTypes({
- @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR),
- @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR)
+ @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "PULSAR"),
+ @JsonSubTypes.Type(value = ConsumptionPulsarInfo.class, name = "TDMQ_PULSAR")
})
public class ConsumptionMqExtBase {
@@ -52,7 +51,6 @@ public class ConsumptionMqExtBase {
@ApiModelProperty("Whether to delete, 0: not deleted, 1: deleted")
private Integer isDeleted = 0;
- @ApiModelProperty(value = "Middleware type, high throughput: TUBE, high consistency: PULSAR")
+ @ApiModelProperty("The middleware type of MQ")
private String middlewareType;
-
}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
index 22c2a98..3c49ba6 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/consumption/ConsumptionPulsarInfo.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
/**
* Pulsar consumer information
@@ -33,8 +33,9 @@ import org.apache.inlong.manager.common.enums.Constant;
@ApiModel("Pulsar consumer information")
public class ConsumptionPulsarInfo extends ConsumptionMqExtBase {
- @ApiModelProperty("The middleware type of MQ")
- private String middlewareType = Constant.MIDDLEWARE_PULSAR;
+ public ConsumptionPulsarInfo() {
+ this.setMiddlewareType(MQType.PULSAR.getType());
+ }
@ApiModelProperty("Whether to configure the dead letter queue, 0: do not configure, 1: configure")
private Integer isDlq;
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java
index 8fdd587..3ad4b1c 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupMqExtBase.java
@@ -23,7 +23,6 @@ import com.fasterxml.jackson.annotation.JsonTypeInfo.Id;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
-import org.apache.inlong.manager.common.enums.Constant;
/**
* Extended inlong group info of different MQs
@@ -32,8 +31,8 @@ import org.apache.inlong.manager.common.enums.Constant;
@ApiModel("Extended inlong group info of different MQs")
@JsonTypeInfo(use = Id.NAME, visible = true, property = "middlewareType", defaultImpl = InlongGroupMqExtBase.class)
@JsonSubTypes({
- @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = Constant.MIDDLEWARE_PULSAR),
- @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = Constant.MIDDLEWARE_TDMQ_PULSAR)
+ @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = "PULSAR"),
+ @JsonSubTypes.Type(value = InlongGroupPulsarInfo.class, name = "TDMQ_PULSAR")
})
public class InlongGroupMqExtBase {
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java
index 66dfb6f..7663815 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/group/InlongGroupPulsarInfo.java
@@ -22,7 +22,7 @@ import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
/**
* Inlong group information for Pulsar
@@ -34,7 +34,7 @@ import org.apache.inlong.manager.common.enums.Constant;
public class InlongGroupPulsarInfo extends InlongGroupMqExtBase {
public InlongGroupPulsarInfo() {
- this.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ this.setMiddlewareType(MQType.PULSAR.getType());
}
@ApiModelProperty(value = "Tenant name of Inlong group")
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
index eaa314f..28c06d7 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SinkType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
@@ -97,14 +98,14 @@ public class CommonOperateService {
switch (key) {
case Constant.PULSAR_SERVICEURL: {
- clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
+ clusterEntity = getMQCluster(MQType.PULSAR);
if (clusterEntity != null) {
result = clusterEntity.getUrl();
}
break;
}
case Constant.PULSAR_ADMINURL: {
- clusterEntity = getMQCluster(Constant.MIDDLEWARE_PULSAR);
+ clusterEntity = getMQCluster(MQType.PULSAR);
if (clusterEntity != null) {
params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
result = params.get(key);
@@ -114,7 +115,7 @@ public class CommonOperateService {
case Constant.CLUSTER_TUBE_MANAGER:
case Constant.CLUSTER_TUBE_CLUSTER_ID:
case Constant.TUBE_MASTER_URL: {
- clusterEntity = getMQCluster(Constant.MIDDLEWARE_TUBE);
+ clusterEntity = getMQCluster(MQType.PULSAR);
if (clusterEntity != null) {
if (key.equals(Constant.TUBE_MASTER_URL)) {
result = clusterEntity.getUrl();
@@ -134,9 +135,10 @@ public class CommonOperateService {
*
* TODO Add data_proxy_cluster_name for query.
*
- * @param type Cluster type, such as TUBE, PULSAR, etc.
+ * @param type
+ * @return
*/
- private ThirdPartyClusterEntity getMQCluster(String type) {
+ private ThirdPartyClusterEntity getMQCluster(MQType type) {
List<ThirdPartyClusterEntity> clusterList = thirdPartyClusterMapper.selectByType(Constant.CLUSTER_DATA_PROXY);
if (CollectionUtils.isEmpty(clusterList)) {
LOGGER.warn("no data proxy cluster found");
@@ -144,7 +146,7 @@ public class CommonOperateService {
}
String mqSetName = clusterList.get(0).getMqSetName();
List<ThirdPartyClusterEntity> mqClusterList = thirdPartyClusterMapper.selectMQCluster(mqSetName,
- Collections.singletonList(type));
+ Collections.singletonList(type.getType()));
if (CollectionUtils.isEmpty(mqClusterList)) {
LOGGER.warn("no mq cluster found by type={} and mq set name={}", type, mqSetName);
return null;
@@ -159,7 +161,8 @@ public class CommonOperateService {
* @return Pulsar cluster info.
*/
public PulsarClusterInfo getPulsarClusterInfo(String type) {
- ThirdPartyClusterEntity clusterEntity = getMQCluster(type);
+ MQType mqType = MQType.forType(type);
+ ThirdPartyClusterEntity clusterEntity = getMQCluster(mqType);
if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) {
throw new BusinessException("pulsar cluster or pulsar ext params is empty");
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index 413fbe2..7e9d48f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -26,6 +26,7 @@ import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.common.CountInfo;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
@@ -120,8 +121,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
ConsumptionInfo info = CommonBeanUtils.copyProperties(entity, ConsumptionInfo::new);
- String mqType = info.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(info.getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(info.getId());
Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be empty, as the middleware is Pulsar");
ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, ConsumptionPulsarInfo::new);
@@ -151,8 +152,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
fullConsumptionInfo(info);
Date now = new Date();
ConsumptionEntity entity = this.saveConsumption(info, operator, now);
- String mqType = entity.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(entity.getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
savePulsarInfo(info.getMqExtInfo(), entity);
}
@@ -240,8 +241,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
entity.setModifyTime(now);
// Modify Pulsar consumption info
- String mqType = info.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(info.getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
ConsumptionPulsarEntity pulsarEntity = consumptionPulsarMapper.selectByConsumptionId(consumptionId);
Preconditions.checkNotNull(pulsarEntity, "Pulsar consumption cannot be null");
pulsarEntity.setConsumerGroupId(info.getConsumerGroupId());
@@ -347,10 +348,10 @@ public class ConsumptionServiceImpl implements ConsumptionService {
}
log.debug("begin to save consumption, groupId={}, topic={}, consumer group={}", groupId, topic, consumerGroup);
- String mqType = bizInfo.getMiddlewareType();
+ MQType mqType = MQType.forType(bizInfo.getMiddlewareType());
ConsumptionEntity entity = new ConsumptionEntity();
entity.setInlongGroupId(groupId);
- entity.setMiddlewareType(mqType);
+ entity.setMiddlewareType(mqType.getType());
entity.setTopic(topic);
entity.setConsumerGroupId(consumerGroup);
entity.setConsumerGroupName(consumerGroup);
@@ -364,7 +365,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
consumptionMapper.insert(entity);
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
ConsumptionPulsarEntity pulsarEntity = new ConsumptionPulsarEntity();
pulsarEntity.setConsumptionId(entity.getId());
pulsarEntity.setConsumerGroupId(consumerGroup);
@@ -379,8 +380,8 @@ public class ConsumptionServiceImpl implements ConsumptionService {
private NewConsumptionProcessForm genNewConsumptionProcessForm(ConsumptionInfo consumptionInfo) {
NewConsumptionProcessForm form = new NewConsumptionProcessForm();
Integer id = consumptionInfo.getId();
- String mqType = consumptionInfo.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(consumptionInfo.getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
ConsumptionPulsarEntity consumptionPulsarEntity = consumptionPulsarMapper.selectByConsumptionId(id);
ConsumptionPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(consumptionPulsarEntity,
ConsumptionPulsarInfo::new);
@@ -436,12 +437,12 @@ public class ConsumptionServiceImpl implements ConsumptionService {
Preconditions.checkNotNull(topicVO, "inlong group not exist: " + groupId);
// Tube’s topic is the inlong group level, one inlong group, one Tube topic
- String mqType = topicVO.getMiddlewareType();
- if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+ MQType mqType = MQType.forType(topicVO.getMiddlewareType());
+ if (mqType == MQType.TUBE) {
String bizTopic = topicVO.getMqResourceObj();
Preconditions.checkTrue(bizTopic == null || bizTopic.equals(info.getTopic()),
"topic [" + info.getTopic() + "] not belong to inlong group " + groupId);
- } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
// Pulsar's topic is the inlong stream level.
// There will be multiple inlong streams under one inlong group, and there will be multiple topics
List<InlongStreamTopicResponse> dsTopicList = topicVO.getDsTopicList();
@@ -451,7 +452,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
Preconditions.checkEmpty(topicSet, "topic [" + topicSet + "] not belong to inlong group " + groupId);
}
}
- info.setMiddlewareType(mqType);
+ info.setMiddlewareType(mqType.getType());
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
index 8f7dca9..33db3f1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceImpl.java
@@ -29,6 +29,7 @@ import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupApproveRequest;
@@ -131,8 +132,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
groupMapper.insertSelective(entity);
this.saveOrUpdateExt(groupId, groupInfo.getExtList());
- String mqType = groupInfo.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(groupInfo.getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupInfo.getMqExtInfo();
Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
@@ -187,21 +188,21 @@ public class InlongGroupServiceImpl implements InlongGroupService {
groupInfo.setExtList(extInfoList);
// If the middleware is Pulsar, we need to encapsulate Pulsar related data
- String mqType = entity.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(entity.getMiddlewareType());
+ if (MQType.PULSAR == mqType || MQType.TDMQ_PULSAR == mqType) {
InlongGroupPulsarEntity pulsarEntity = groupPulsarMapper.selectByGroupId(groupId);
Preconditions.checkNotNull(pulsarEntity, "Pulsar info not found by the groupId=" + groupId);
InlongGroupPulsarInfo pulsarInfo = CommonBeanUtils.copyProperties(pulsarEntity, InlongGroupPulsarInfo::new);
- pulsarInfo.setMiddlewareType(mqType);
+ pulsarInfo.setMiddlewareType(mqType.name());
groupInfo.setMqExtInfo(pulsarInfo);
}
// For approved inlong group, encapsulate the cluster address of the middleware
if (GroupState.CONFIG_SUCCESSFUL == GroupState.forCode(groupInfo.getStatus())) {
- if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(mqType)) {
+ if (mqType == MQType.TUBE) {
groupInfo.setTubeMaster(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
- } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
- PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType);
+ } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
+ PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(mqType.name());
groupInfo.setPulsarAdminUrl(pulsarCluster.getAdminUrl());
groupInfo.setPulsarServiceUrl(pulsarCluster.getBrokerServiceUrl());
}
@@ -270,8 +271,8 @@ public class InlongGroupServiceImpl implements InlongGroupService {
this.saveOrUpdateExt(groupId, groupRequest.getExtList());
// Update the Pulsar info
- String mqType = groupRequest.getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(groupRequest.getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) groupRequest.getMqExtInfo();
Preconditions.checkNotNull(pulsarInfo, "Pulsar info cannot be empty, as the middleware is Pulsar");
Integer writeQuorum = pulsarInfo.getWriteQuorum();
@@ -433,14 +434,14 @@ public class InlongGroupServiceImpl implements InlongGroupService {
LOGGER.debug("begin to get topic by groupId={}", groupId);
InlongGroupInfo groupInfo = this.get(groupId);
- String mqType = groupInfo.getMiddlewareType();
+ MQType mqType = MQType.forType(groupInfo.getMiddlewareType());
InlongGroupTopicResponse topicVO = new InlongGroupTopicResponse();
- if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+ if (mqType == MQType.TUBE) {
// Tube Topic corresponds to inlong group one-to-one
topicVO.setMqResourceObj(groupInfo.getMqResourceObj());
topicVO.setTubeMasterUrl(commonOperateService.getSpecifiedParam(Constant.TUBE_MASTER_URL));
- } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
// Pulsar's topic corresponds to the inlong stream one-to-one
topicVO.setDsTopicList(streamService.getTopicList(groupId));
topicVO.setPulsarAdminUrl(commonOperateService.getSpecifiedParam(Constant.PULSAR_ADMINURL));
@@ -451,7 +452,7 @@ public class InlongGroupServiceImpl implements InlongGroupService {
}
topicVO.setInlongGroupId(groupId);
- topicVO.setMiddlewareType(mqType);
+ topicVO.setMiddlewareType(mqType.name());
return topicVO;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
index 78a6ecc..4782abb 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ThirdPartyClusterServiceImpl.java
@@ -31,6 +31,7 @@ import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.cluster.ClusterPageRequest;
import org.apache.inlong.manager.common.pojo.cluster.ClusterRequest;
@@ -253,11 +254,11 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
DataProxyConfig config = new DataProxyConfig();
config.setM(groupEntity.getSchemaName());
- String mqType = groupEntity.getMiddlewareType();
- if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+ MQType mqType = MQType.forType(groupEntity.getMiddlewareType());
+ if (mqType == MQType.TUBE) {
config.setInlongGroupId(groupId);
config.setTopic(bizResource);
- } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
for (InlongStreamEntity stream : streamList) {
String topic = stream.getMqResourceObj();
@@ -301,7 +302,8 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
for (InlongGroupEntity groupEntity : groupEntityList) {
final String groupId = groupEntity.getInlongGroupId();
final String mqResource = groupEntity.getMqResourceObj();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType type = MQType.forType(mqType);
+ if (type == MQType.PULSAR || type == MQType.TDMQ_PULSAR) {
List<InlongStreamEntity> streamList = streamMapper.selectByGroupId(groupId);
for (InlongStreamEntity stream : streamList) {
DataProxyConfig topicConfig = new DataProxyConfig();
@@ -316,7 +318,7 @@ public class ThirdPartyClusterServiceImpl implements ThirdPartyClusterService {
topicConfig.setTopic("persistent://" + tenant + "/" + mqResource + "/" + topic);
topicList.add(topicConfig);
}
- } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+ } else if (type == MQType.TUBE) {
DataProxyConfig topicConfig = new DataProxyConfig();
topicConfig.setInlongGroupId(groupId);
topicConfig.setTopic(mqResource);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
index 0bf990c..5d4289b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/PulsarEventSelector.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.thirdparty.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -35,8 +35,8 @@ public class PulsarEventSelector implements EventSelector {
return false;
}
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
- String mqType = form.getGroupInfo().getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
InlongGroupPulsarInfo pulsarInfo = (InlongGroupPulsarInfo) form.getGroupInfo().getMqExtInfo();
return pulsarInfo.getEnableCreateResource() == 1;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
index 90ee4f3..3b5e713 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/mq/TubeEventSelector.java
@@ -18,7 +18,7 @@
package org.apache.inlong.manager.service.thirdparty.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -36,7 +36,7 @@ public class TubeEventSelector implements EventSelector {
}
GroupResourceProcessForm form = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = form.getGroupInfo();
- if (Constant.MIDDLEWARE_TUBE.equalsIgnoreCase(groupInfo.getMiddlewareType())) {
+ if (MQType.forType(groupInfo.getMiddlewareType()) == MQType.TUBE) {
return true;
}
log.warn("not need to create tube resource for groupId={}, as the middleware type is {}",
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
index d6c529e..9686c8d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkDisabledEventSelector.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.thirdparty.sort;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -39,12 +39,12 @@ public class ZkDisabledEventSelector implements EventSelector {
GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
return groupInfo.getZookeeperEnabled() == 0
- && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
+ && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
} else if (processForm instanceof UpdateGroupProcessForm) {
UpdateGroupProcessForm updateGroupProcessForm = (UpdateGroupProcessForm) processForm;
InlongGroupInfo groupInfo = updateGroupProcessForm.getGroupInfo();
return groupInfo.getZookeeperEnabled() == 0
- && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
+ && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
} else {
return false;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
index a8ee6e3..05e2b80 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/ZkEnabledEventSelector.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.thirdparty.sort;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.pojo.workflow.form.ProcessForm;
@@ -39,7 +39,7 @@ public class ZkEnabledEventSelector implements EventSelector {
}
GroupResourceProcessForm groupResourceForm = (GroupResourceProcessForm) processForm;
InlongGroupInfo groupInfo = groupResourceForm.getGroupInfo();
- return groupInfo.getZookeeperEnabled() == 1 && !groupInfo.getMiddlewareType().equals(Constant.MIDDLEWARE_NONE);
+ return groupInfo.getZookeeperEnabled() == 1 && MQType.forType(groupInfo.getMiddlewareType()) != MQType.NONE;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
index 13a74c8..06d11c1 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/thirdparty/sort/util/SourceInfoUtils.java
@@ -20,7 +20,7 @@ package org.apache.inlong.manager.service.thirdparty.sort.util;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
@@ -64,13 +64,13 @@ public class SourceInfoUtils {
ClusterBean clusterBean, InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
SourceResponse sourceResponse, List<FieldInfo> sourceFields) {
- String mqType = groupInfo.getMiddlewareType();
+ MQType mqType = MQType.forType(groupInfo.getMiddlewareType());
DeserializationInfo deserializationInfo = SerializationUtils.createDeserialInfo(sourceResponse, streamInfo);
SourceInfo sourceInfo;
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
sourceInfo = createPulsarSourceInfo(pulsarCluster, clusterBean, groupInfo, streamInfo, deserializationInfo,
sourceFields);
- } else if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+ } else if (mqType == MQType.TUBE) {
// InlongGroupInfo groupInfo, String masterAddress,
sourceInfo = createTubeSourceInfo(groupInfo, masterAddress, clusterBean, deserializationInfo, sourceFields);
} else {
@@ -100,7 +100,7 @@ public class SourceInfoUtils {
FieldInfo[] fieldInfosArr = fieldInfos.toArray(new FieldInfo[0]);
String type = pulsarCluster.getType();
- if (StringUtils.isNotEmpty(type) && Constant.MIDDLEWARE_TDMQ_PULSAR.equals(type)) {
+ if (StringUtils.isNotEmpty(type) && MQType.forType(type) == MQType.TDMQ_PULSAR) {
return new TDMQPulsarSourceInfo(pulsarCluster.getBrokerServiceUrl(),
fullTopicName, consumerGroup, pulsarCluster.getToken(), deserializationInfo, fieldInfosArr);
} else {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 7f0b49d..d6d842d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -20,8 +20,8 @@ package org.apache.inlong.manager.service.workflow.consumption.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
@@ -84,11 +84,11 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
throw new WorkflowListenerException("consumption not exits for id=" + consumptionId);
}
- String mqType = entity.getMiddlewareType();
- if (Constant.MIDDLEWARE_TUBE.equals(mqType)) {
+ MQType mqType = MQType.forType(entity.getMiddlewareType());
+ if (mqType == MQType.TUBE) {
this.createTubeConsumerGroup(entity);
return ListenerResult.success("Create Tube consumer group successful");
- } else if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ } else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
this.createPulsarTopicMessage(entity);
} else {
throw new WorkflowListenerException("middleware type [" + mqType + "] not supported");
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
index 8e879ec..e5c106d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/stream/CreateStreamWorkflowDefinition.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.workflow.stream;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.thirdparty.hive.CreateHiveTableForStreamListener;
@@ -89,8 +90,8 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask createPulsarTopicTask = new ServiceTask();
createPulsarTopicTask.setSkipResolver(c -> {
GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
- String mqType = form.getGroupInfo().getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
return false;
}
log.warn("no need to create pulsar topic for groupId={}, streamId={}, as the middlewareType={}",
@@ -105,8 +106,8 @@ public class CreateStreamWorkflowDefinition implements WorkflowDefinition {
ServiceTask createPulsarSubscriptionGroupTask = new ServiceTask();
createPulsarSubscriptionGroupTask.setSkipResolver(c -> {
GroupResourceProcessForm form = (GroupResourceProcessForm) c.getProcessForm();
- String mqType = form.getGroupInfo().getMiddlewareType();
- if (Constant.MIDDLEWARE_PULSAR.equals(mqType) || Constant.MIDDLEWARE_TDMQ_PULSAR.equals(mqType)) {
+ MQType mqType = MQType.forType(form.getGroupInfo().getMiddlewareType());
+ if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
return false;
}
log.warn("no need to create pulsar subscription for groupId={}, streamId={}, as the middlewareType={}",
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
index 975c24e..a7f5832 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceTest.java
@@ -17,7 +17,7 @@
package org.apache.inlong.manager.service.core.impl;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionInfo;
import org.apache.inlong.manager.common.pojo.consumption.ConsumptionPulsarInfo;
import org.apache.inlong.manager.service.ServiceBaseTest;
@@ -41,11 +41,11 @@ public class ConsumptionServiceTest extends ServiceBaseTest {
consumptionInfo.setTopic(inlongGroup);
consumptionInfo.setConsumerGroupName(consumerGroup);
consumptionInfo.setInlongGroupId("b_" + inlongGroup);
- consumptionInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ consumptionInfo.setMiddlewareType(MQType.PULSAR.getType());
consumptionInfo.setCreator(operator);
ConsumptionPulsarInfo pulsarInfo = new ConsumptionPulsarInfo();
- pulsarInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ pulsarInfo.setMiddlewareType(MQType.PULSAR.getType());
pulsarInfo.setIsDlq(1);
pulsarInfo.setDeadLetterTopic("test_dlq");
pulsarInfo.setIsRlq(0);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
index d5ae35b..4e5cbda 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupProcessOperationTest.java
@@ -17,9 +17,9 @@
package org.apache.inlong.manager.service.core.impl;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
@@ -64,7 +64,7 @@ public class InlongGroupProcessOperationTest extends ServiceBaseTest {
groupInfo.setInlongGroupId(GROUP_ID);
groupInfo.setName(GROUP_NAME);
groupInfo.setInCharges(OPERATOR);
- groupInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ groupInfo.setMiddlewareType(MQType.PULSAR.getType());
InlongGroupPulsarInfo pulsarInfo = new InlongGroupPulsarInfo();
pulsarInfo.setInlongGroupId(GROUP_ID);
groupInfo.setMqExtInfo(pulsarInfo);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
index 464e8ed..6a036e8 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/core/impl/InlongGroupServiceTest.java
@@ -17,10 +17,8 @@
package org.apache.inlong.manager.service.core.impl;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.EntityStatus;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
@@ -32,6 +30,9 @@ import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.TestComponent;
+import java.util.Arrays;
+import java.util.List;
+
/**
* Inlong group service test
*/
@@ -63,13 +64,13 @@ public class InlongGroupServiceTest {
groupInfo = new InlongGroupInfo();
groupInfo.setName(groupName);
- groupInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ groupInfo.setMiddlewareType(MQType.PULSAR.getType());
groupInfo.setCreator(operator);
groupInfo.setInCharges(operator);
groupInfo.setStatus(EntityStatus.GROUP_CONFIG_SUCCESSFUL.getCode());
InlongGroupPulsarInfo pulsarInfo = new InlongGroupPulsarInfo();
- pulsarInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ pulsarInfo.setMiddlewareType(MQType.PULSAR.getType());
pulsarInfo.setEnsemble(3);
pulsarInfo.setWriteQuorum(3);
pulsarInfo.setAckQuorum(2);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java
index 2e32e1b..3c6454b 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/ServiceTaskListenerFactoryTest.java
@@ -17,8 +17,7 @@
package org.apache.inlong.manager.service.workflow;
-import java.util.List;
-import org.apache.inlong.manager.common.enums.Constant;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.service.ServiceBaseTest;
@@ -32,6 +31,8 @@ import org.junit.Assert;
import org.junit.Test;
import org.springframework.beans.factory.annotation.Autowired;
+import java.util.List;
+
public class ServiceTaskListenerFactoryTest extends ServiceBaseTest {
@Autowired
@@ -43,7 +44,7 @@ public class ServiceTaskListenerFactoryTest extends ServiceBaseTest {
GroupResourceProcessForm processForm = new GroupResourceProcessForm();
InlongGroupInfo groupInfo = new InlongGroupInfo();
//check pulsar listener
- groupInfo.setMiddlewareType(Constant.MIDDLEWARE_PULSAR);
+ groupInfo.setMiddlewareType(MQType.PULSAR.getType());
processForm.setGroupInfo(groupInfo);
context.setProcessForm(processForm);
List<QueueOperateListener> queueOperateListeners = serviceTaskListenerFactory.getQueueOperateListener(context);
@@ -52,7 +53,7 @@ public class ServiceTaskListenerFactoryTest extends ServiceBaseTest {
Assert.assertTrue(queueOperateListeners.get(1) instanceof CreatePulsarGroupTaskListener);
// check tube listener
- groupInfo.setMiddlewareType(Constant.MIDDLEWARE_TUBE);
+ groupInfo.setMiddlewareType(MQType.TUBE.getType());
queueOperateListeners = serviceTaskListenerFactory.getQueueOperateListener(context);
Assert.assertEquals(2, queueOperateListeners.size());
Assert.assertTrue(queueOperateListeners.get(0) instanceof CreateTubeTopicTaskListener);
diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
index ebec742..a2347eb 100644
--- a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
+++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/workflow/WorkflowServiceImplTest.java
@@ -19,14 +19,14 @@ package org.apache.inlong.manager.service.workflow;
import com.github.pagehelper.PageInfo;
import com.google.common.collect.Lists;
-import org.apache.inlong.manager.common.enums.Constant;
import org.apache.inlong.manager.common.enums.GroupState;
+import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.ProcessStatus;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupPulsarInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamFieldInfo;
-import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamRequest;
import org.apache.inlong.manager.common.pojo.workflow.ProcessResponse;
import org.apache.inlong.manager.common.pojo.workflow.TaskExecuteLogQuery;
import org.apache.inlong.manager.common.pojo.workflow.WorkflowResult;
@@ -229,7 +229,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
@Test
public void testStartCreatePulsarWorkflow() {
- initGroupForm(Constant.MIDDLEWARE_PULSAR);
+ initGroupForm(MQType.PULSAR.getType());
mockTaskListenerFactory();
WorkflowContext context = workflowEngine.processService().start(processName.name(), applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
@@ -247,7 +247,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
@Test
public void testStartCreateTubeWorkflow() {
- initGroupForm(Constant.MIDDLEWARE_TUBE);
+ initGroupForm(MQType.TUBE.getType());
mockTaskListenerFactory();
WorkflowContext context = workflowEngine.processService().start(processName.name(), applicant, form);
WorkflowResult result = WorkflowBeanUtils.result(context);
@@ -266,7 +266,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
@Test
public void testSuspendProcess() {
- InlongGroupInfo groupInfo = initGroupForm(Constant.MIDDLEWARE_PULSAR);
+ InlongGroupInfo groupInfo = initGroupForm(MQType.PULSAR.getType());
groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
groupService.update(groupInfo.genRequest(), OPERATOR);
UpdateGroupProcessForm form = new UpdateGroupProcessForm();
@@ -293,7 +293,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
@Test
public void testRestartProcess() {
- InlongGroupInfo groupInfo = initGroupForm(Constant.MIDDLEWARE_PULSAR);
+ InlongGroupInfo groupInfo = initGroupForm(MQType.PULSAR.getType());
groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
groupService.update(groupInfo.genRequest(), OPERATOR);
groupInfo.setStatus(GroupState.SUSPENDED.getCode());
@@ -324,7 +324,7 @@ public class WorkflowServiceImplTest extends ServiceBaseTest {
@Test
public void testStopProcess() {
- InlongGroupInfo groupInfo = initGroupForm(Constant.MIDDLEWARE_PULSAR);
+ InlongGroupInfo groupInfo = initGroupForm(MQType.PULSAR.getType());
groupInfo.setStatus(GroupState.CONFIG_SUCCESSFUL.getCode());
groupService.update(groupInfo.genRequest(), OPERATOR);
groupInfo.setStatus(GroupState.SUSPENDED.getCode());