You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/07 15:47:32 UTC
[incubator-inlong] branch master updated: [INLONG-4563][Manager] Support create TubeMQ resources by its origin APIs (#4564)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 099305c2e [INLONG-4563][Manager] Support create TubeMQ resources by its origin APIs (#4564)
099305c2e is described below
commit 099305c2ed96096e688fa6092e0a9c8473e46de1
Author: healzhou <he...@gmail.com>
AuthorDate: Tue Jun 7 23:47:27 2022 +0800
[INLONG-4563][Manager] Support create TubeMQ resources by its origin APIs (#4564)
---
...gerResponse.java => ConsumerGroupResponse.java} | 28 ++-
...TubeManagerResponse.java => TopicResponse.java} | 26 ++-
.../manager/common/pojo/tubemq/TubeBrokerInfo.java | 168 ++++++++++++++
.../common/pojo/tubemq/TubeClusterResponse.java | 81 -------
...eManagerResponse.java => TubeHttpResponse.java} | 7 +-
.../service/mq/CreatePulsarGroupTaskListener.java | 8 +-
.../mq/CreatePulsarResourceTaskListener.java | 10 +-
.../mq/CreatePulsarSubscriptionTaskListener.java | 8 +-
.../service/mq/CreatePulsarTopicTaskListener.java | 6 +-
.../service/mq/CreateTubeGroupTaskListener.java | 61 +----
.../service/mq/CreateTubeTopicTaskListener.java | 30 ++-
...lsarOptServiceImpl.java => PulsarOperator.java} | 50 ++---
.../manager/service/mq/util/PulsarOptService.java | 48 ----
.../manager/service/mq/util/TubeMQOperator.java | 248 +++++++++++++++++++++
.../manager/service/mq/util/TubeMqOptService.java | 132 -----------
.../service/sort/CreateSortConfigListenerV2.java | 5 +-
.../sort/CreateStreamSortConfigListener.java | 2 +-
.../ConsumptionCompleteProcessListener.java | 56 ++---
18 files changed, 566 insertions(+), 408 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/ConsumerGroupResponse.java
similarity index 64%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/ConsumerGroupResponse.java
index 34ebb63c0..9a2f39b65 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/ConsumerGroupResponse.java
@@ -19,14 +19,36 @@ package org.apache.inlong.manager.common.pojo.tubemq;
import lombok.Data;
+import java.util.List;
+
/**
- * The response info of tube manager.
+ * Topic view of TubeMQ
*/
@Data
-public class TubeManagerResponse {
+public class ConsumerGroupResponse {
+ // true, or false
private boolean result;
+
+ // 0 is success, other is failed
private int errCode;
+
+ // OK, or err msg
private String errMsg;
-}
+ private List<ConsumerGroupInfo> data;
+
+ private int count;
+
+ @Data
+ public static class ConsumerGroupInfo {
+
+ private String topicName;
+ private String groupName;
+ private String createUser;
+ private String modifyUser;
+ private String createDate; // 20150619115100
+ private String modifyDate;
+ }
+
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TopicResponse.java
similarity index 64%
copy from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
copy to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TopicResponse.java
index 34ebb63c0..66496fa44 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TopicResponse.java
@@ -19,14 +19,32 @@ package org.apache.inlong.manager.common.pojo.tubemq;
import lombok.Data;
+import java.util.List;
+
/**
- * The response info of tube manager.
+ * Topic view of TubeMQ
*/
@Data
-public class TubeManagerResponse {
+public class TopicResponse {
- private boolean result;
private int errCode;
private String errMsg;
-}
+ // total topic info list
+ private List<TopicInfo> data;
+
+ private int dataCount;
+
+ @Data
+ public static class TopicInfo {
+
+ private String topicName;
+ private int totalCfgBrokerCnt;
+ private int totalCfgNumPart;
+ private int totalRunNumPartCount;
+ private boolean isSrvAcceptPublish;
+ private boolean isSrvAcceptSubscribe;
+ private boolean enableAuthControl;
+ }
+
+}
\ No newline at end of file
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeBrokerInfo.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeBrokerInfo.java
new file mode 100644
index 000000000..a95d17a98
--- /dev/null
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeBrokerInfo.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.common.pojo.tubemq;
+
+import lombok.Data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Broker view of TubeMQ, includes different status brokers list.
+ */
+@Data
+public class TubeBrokerInfo {
+
+ private static final String IDLE = "idle";
+ private static final String RUNNING = "running";
+ private static final String ONLINE = "online";
+ private static final String ONLY_READ = "only-read";
+ private static final String ONLY_WRITE = "only-write";
+
+ private int errCode;
+ private String errMsg;
+
+ // total broker info list of brokers
+ private List<BrokerInfo> data;
+ // configurable list of brokers.
+ private List<BrokerInfo> configurableList;
+ // working state list of brokers
+ private List<BrokerInfo> workingList;
+ // idle broker list
+ private List<BrokerInfo> idleList;
+ // need reload broker list
+ private List<Integer> needReloadList;
+
+ /**
+ * Divide broker list into different list by broker status.
+ */
+ public void divideBrokerListByStatus() {
+ if (data != null) {
+ configurableList = new ArrayList<>();
+ workingList = new ArrayList<>();
+ idleList = new ArrayList<>();
+ needReloadList = new ArrayList<>();
+ for (BrokerInfo brokerInfo : data) {
+ if (brokerInfo.isConfigurable()) {
+ configurableList.add(brokerInfo);
+ }
+ if (brokerInfo.isWorking()) {
+ workingList.add(brokerInfo);
+ }
+ if (brokerInfo.isIdle()) {
+ idleList.add(brokerInfo);
+ }
+ if (brokerInfo.isConfChanged) {
+ needReloadList.add(brokerInfo.getBrokerId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Get all configurable broker id list.
+ */
+ public List<Integer> getConfigurableBrokerIdList() {
+ List<Integer> tmpBrokerIdList = new ArrayList<>();
+ if (configurableList != null) {
+ for (BrokerInfo brokerInfo : configurableList) {
+ tmpBrokerIdList.add(brokerInfo.getBrokerId());
+ }
+ }
+ return tmpBrokerIdList;
+ }
+
+ /**
+ * Get all working broker id list.
+ */
+ public List<Integer> getWorkingBrokerIdList() {
+ List<Integer> tmpBrokerIdList = new ArrayList<>();
+ if (workingList != null) {
+ for (BrokerInfo brokerInfo : workingList) {
+ tmpBrokerIdList.add(brokerInfo.getBrokerId());
+ }
+ }
+ return tmpBrokerIdList;
+ }
+
+ /**
+ * Get all broker id list.
+ */
+ public List<Integer> getAllBrokerIdList() {
+ List<Integer> allIdList = new ArrayList<>();
+ if (data != null) {
+ for (BrokerInfo brokerInfo : data) {
+ allIdList.add(brokerInfo.getBrokerId());
+ }
+ }
+ return allIdList;
+ }
+
+ /**
+ * Broker info
+ */
+ @Data
+ public static class BrokerInfo {
+
+ private int brokerId;
+ private String brokerIp;
+ private int brokerPort;
+ private String manageStatus;
+ private String runStatus;
+ private String subStatus;
+ private int stepOp;
+ private boolean isConfChanged;
+ private boolean isConfLoaded;
+ private boolean isBrokerOnline;
+ private String brokerVersion;
+ private boolean acceptPublish;
+ private boolean acceptSubscribe;
+
+ private boolean isIdle() {
+ return IDLE.equals(subStatus);
+ }
+
+ private boolean isWorking() {
+ return RUNNING.equals(runStatus) && (
+ ONLINE.equals(manageStatus) || ONLY_READ.equals(manageStatus) || ONLY_WRITE.equals(manageStatus));
+ }
+
+ private boolean isConfigurable() {
+ return stepOp == 0 || stepOp == -2 || stepOp == 31 || stepOp == 32;
+ }
+
+ @Override
+ public int hashCode() {
+ return brokerId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == this) {
+ return true;
+ }
+ if (!(o instanceof BrokerInfo)) {
+ return false;
+ }
+
+ BrokerInfo brokerInfo = (BrokerInfo) o;
+ return brokerId == brokerInfo.brokerId;
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeClusterResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeClusterResponse.java
deleted file mode 100644
index 4ee5ac577..000000000
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeClusterResponse.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.common.pojo.tubemq;
-
-import java.util.List;
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-
-/**
- * The response info of tube cluster.
- */
-@Data
-@EqualsAndHashCode(callSuper = true)
-public class TubeClusterResponse extends TubeManagerResponse {
-
- private List<DataBean> data;
-
- public static class DataBean {
-
- private int clusterId;
- private String clusterName;
- private String createTime;
- private Object modifyTime;
- private String createUser;
-
- public int getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(int clusterId) {
- this.clusterId = clusterId;
- }
-
- public String getClusterName() {
- return clusterName;
- }
-
- public void setClusterName(String clusterName) {
- this.clusterName = clusterName;
- }
-
- public String getCreateTime() {
- return createTime;
- }
-
- public void setCreateTime(String createTime) {
- this.createTime = createTime;
- }
-
- public Object getModifyTime() {
- return modifyTime;
- }
-
- public void setModifyTime(Object modifyTime) {
- this.modifyTime = modifyTime;
- }
-
- public String getCreateUser() {
- return createUser;
- }
-
- public void setCreateUser(String createUser) {
- this.createUser = createUser;
- }
- }
-}
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeHttpResponse.java
similarity index 89%
rename from inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
rename to inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeHttpResponse.java
index 34ebb63c0..9466305b8 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeManagerResponse.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/pojo/tubemq/TubeHttpResponse.java
@@ -23,10 +23,15 @@ import lombok.Data;
* The response info of tube manager.
*/
@Data
-public class TubeManagerResponse {
+public class TubeHttpResponse {
+ // true, or false
private boolean result;
+
+ // 0 is success, other is failed
private int errCode;
+
+ // OK, or err msg
private String errMsg;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index 39c211ff6..e50e14f37 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -32,7 +32,7 @@ import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -60,7 +60,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
@Autowired
private ConsumptionService consumptionService;
@Autowired
- private PulsarOptService pulsarOptService;
+ private PulsarOperator pulsarOperator;
@Override
public TaskEvent event() {
@@ -103,7 +103,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
// Create a subscription in the Pulsar cluster you need to ensure that the Topic exists
try {
- boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+ boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String topicFull = tenant + "/" + namespace + "/" + topic;
String serviceUrl = pulsarCluster.getAdminUrl();
@@ -113,7 +113,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
// Consumer naming rules: clusterTag_topicName_consumer_group
String subscription = clusterTag + "_" + topic + "_consumer_group";
- pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+ pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
// Insert the consumption data into the consumption table
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index b58e9b921..146669fad 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -59,7 +59,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
@Autowired
private InlongClusterService clusterService;
@Autowired
- private PulsarOptService pulsarOptService;
+ private PulsarOperator pulsarOperator;
@Override
public TaskEvent event() {
@@ -109,10 +109,10 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
if (StringUtils.isEmpty(tenant)) {
tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
}
- pulsarOptService.createTenant(pulsarAdmin, tenant);
+ pulsarOperator.createTenant(pulsarAdmin, tenant);
// create pulsar namespace
- pulsarOptService.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
+ pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
// create pulsar topic
Integer partitionNum = pulsarInfo.getPartitionNum();
@@ -122,7 +122,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
for (InlongStreamBriefInfo streamInfo : streamTopicList) {
topicBean.setTopicName(streamInfo.getMqResource());
- pulsarOptService.createTopic(pulsarAdmin, topicBean);
+ pulsarOperator.createTopic(pulsarAdmin, topicBean);
}
}
log.info("finish to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
index b806aef74..c06dd42aa 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
@@ -29,7 +29,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -52,7 +52,7 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
@Autowired
private InlongClusterService clusterService;
@Autowired
- private PulsarOptService pulsarOptService;
+ private PulsarOperator pulsarOperator;
@Autowired
private StreamSinkService sinkService;
@Autowired
@@ -92,7 +92,7 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
topicBean.setTopicName(topic);
// Create a subscription in the Pulsar cluster, you need to ensure that the Topic exists
- boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+ boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String fullTopic = tenant + "/" + namespace + "/" + topic;
String msg = String.format("topic=%s not exists in %s", fullTopic, pulsarCluster.getAdminUrl());
@@ -102,7 +102,7 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
// Consumer naming rules: clusterTag_topicName_consumer_group
String subscription = clusterTag + "_" + topic + "_consumer_group";
- pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+ pulsarOperator.createSubscription(pulsarAdmin, topicBean, subscription);
// Insert the consumption data into the consumption table
consumptionService.saveSortConsumption(groupInfo, topic, subscription);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 8e39f9b48..5b7e63725 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -30,7 +30,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -50,7 +50,7 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
@Autowired
private InlongClusterService clusterService;
@Autowired
- private PulsarOptService pulsarOptService;
+ private PulsarOperator pulsarOperator;
@Override
public TaskEvent event() {
@@ -85,7 +85,7 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
.queueModule(pulsarInfo.getQueueModule())
.numPartitions(pulsarInfo.getPartitionNum())
.build();
- pulsarOptService.createTopic(pulsarAdmin, topicBean);
+ pulsarOperator.createTopic(pulsarAdmin, topicBean);
}
} catch (Exception e) {
String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
index 0acf8fac8..3ed670a9a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
@@ -18,18 +18,14 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.beans.ReTryConfigBean;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
-import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
+import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -37,8 +33,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.Collections;
-
/**
* Event listener of create tube consumer group.
*/
@@ -51,9 +45,7 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
@Autowired
private InlongClusterService clusterService;
@Autowired
- private TubeMqOptService tubeMqOptService;
- @Autowired
- private ReTryConfigBean reTryConfigBean;
+ private TubeMQOperator tubeMQOperator;
@Override
public TaskEvent event() {
@@ -64,58 +56,27 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
String groupId = form.getInlongGroupId();
- log.info("try to create consumer group for groupId {}", groupId);
+ log.info("begin to create tube consumer group for groupId {}", groupId);
InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
- InlongClusterInfo tubeCluster = clusterService.getOne(groupEntity.getInlongClusterTag(),
- null, ClusterType.CLS_TUBE);
-
- // TODO use the original method of TubeMQ to create group
- // TubeClusterDTO clusterDTO = TubeClusterDTO.getFromJson(clusters.get(0).getExtParams());
- // int clusterId = clusterDTO.getClusterId();
+ String clusterTag = groupEntity.getInlongClusterTag();
+ TubeClusterInfo tubeCluster = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.CLS_TUBE);
String topicName = groupEntity.getMqResource();
- QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
- .topicName(topicName).clusterId(1)
- .user(groupEntity.getCreator()).build();
- // Query whether the tube topic exists
- boolean topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
-
- Integer tryNumber = reTryConfigBean.getMaxAttempts();
- Long delay = reTryConfigBean.getDelay();
- while (!topicExist && --tryNumber > 0) {
- log.info("check whether the tube topic exists, try count={}", tryNumber);
- try {
- Thread.sleep(delay);
- delay *= reTryConfigBean.getMultiplier();
- topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
- } catch (InterruptedException e) {
- log.error("check the tube topic exists error", e);
- }
- }
-
- AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
- addTubeConsumeGroupRequest.setClusterId(1);
- addTubeConsumeGroupRequest.setCreateUser(groupEntity.getCreator());
-
- GroupNameJsonSetBean groupNameJsonSetBean = new GroupNameJsonSetBean();
- groupNameJsonSetBean.setTopicName(topicName);
- String consumeGroupName = "sort_" + topicName + "_group";
- groupNameJsonSetBean.setGroupName(consumeGroupName);
- addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(groupNameJsonSetBean));
-
+ // Consumer naming rules: clusterTag_topicName_consumer_group
+ String consumeGroup = clusterTag + "_" + topicName + "_consumer_group";
try {
- tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+ tubeMQOperator.createConsumerGroup(tubeCluster, topicName, consumeGroup, context.getOperator());
} catch (Exception e) {
throw new WorkflowListenerException("create tube consumer group for groupId=" + groupId + " error", e);
}
- log.info("finish to create consumer group for {}", groupId);
+ log.info("finish to create tube consumer group for groupId={}", groupId);
return ListenerResult.success();
}
@Override
public boolean async() {
- return true;
+ return false;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index d0f459351..50ace8a4a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -18,12 +18,15 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
+import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -31,8 +34,6 @@ import org.apache.inlong.manager.workflow.event.task.TaskEvent;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
-import java.util.Collections;
-
/**
* Create a listener for MQ resource tasks
*/
@@ -41,7 +42,9 @@ import java.util.Collections;
public class CreateTubeTopicTaskListener implements QueueOperateListener {
@Autowired
- private TubeMqOptService tubeMqOptService;
+ private InlongClusterService clusterService;
+ @Autowired
+ private TubeMQOperator tubeMQOperator;
@Autowired
private InlongGroupService groupService;
@@ -53,20 +56,15 @@ public class CreateTubeTopicTaskListener implements QueueOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-
log.info("begin create tube topic for groupId={}", form.getInlongGroupId());
- String groupId = form.getInlongGroupId();
+ String groupId = form.getInlongGroupId();
try {
- InlongGroupInfo groupInfo = groupService.get(groupId);
- String topicName = groupInfo.getMqResource();
- AddTubeMqTopicRequest request = new AddTubeMqTopicRequest();
- request.setUser("inlong-manager");
- AddTubeMqTopicRequest.AddTopicTasksBean tasksBean = new AddTubeMqTopicRequest.AddTopicTasksBean();
- tasksBean.setTopicName(topicName);
- request.setAddTopicTasks(Collections.singletonList(tasksBean));
- String result = tubeMqOptService.createNewTopic(request);
- log.info("finish to create tube topic for groupId={}, result={}", groupId, result);
+ InlongGroupInfo groupInfo = form.getGroupInfo();
+ String clusterTag = groupInfo.getInlongClusterTag();
+ InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_TUBE);
+ tubeMQOperator.createTopic((TubeClusterInfo) clusterInfo, groupInfo.getMqResource(), context.getOperator());
+ log.info("finish to create tube topic for groupId={}", groupId);
} catch (Exception e) {
log.error("create tube topic for groupId={} error, exception {} ", groupId, e.getMessage(), e);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
similarity index 84%
rename from inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
rename to inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
index 45eba4d6b..d8c66e5b9 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
@@ -18,12 +18,12 @@
package org.apache.inlong.manager.service.mq.util;
import com.google.common.collect.Sets;
-import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -31,60 +31,60 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
/**
- * Operation interface of Pulsar
+ * Pulsar operator, supports creating topics and creating subscription.
*/
@Service
-@Slf4j
-public class PulsarOptServiceImpl implements PulsarOptService {
+public class PulsarOperator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
private static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
@Autowired
private ConversionHandle conversionHandle;
- @Override
public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
- log.info("begin to create pulsar tenant={}", tenant);
+ LOGGER.info("begin to create pulsar tenant={}", tenant);
Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
try {
List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
if (exists) {
- log.warn("pulsar tenant={} already exists, skip to create", tenant);
+ LOGGER.warn("pulsar tenant={} already exists, skip to create", tenant);
return;
}
TenantInfoImpl tenantInfo = new TenantInfoImpl();
tenantInfo.setAllowedClusters(Sets.newHashSet(clusters));
tenantInfo.setAdminRoles(Sets.newHashSet());
pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
- log.info("success to create pulsar tenant={}", tenant);
+ LOGGER.info("success to create pulsar tenant={}", tenant);
} catch (PulsarAdminException e) {
- log.error("failed to create pulsar tenant=" + tenant, e);
+ LOGGER.error("failed to create pulsar tenant=" + tenant, e);
throw e;
}
}
- @Override
public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo,
String tenant, String namespace) throws PulsarAdminException {
Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
String namespaceName = tenant + "/" + namespace;
- log.info("begin to create namespace={}", namespaceName);
+ LOGGER.info("begin to create namespace={}", namespaceName);
try {
// Check whether the namespace exists, and create it if it does not exist
boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
if (isExists) {
- log.warn("namespace={} already exists, skip to create", namespaceName);
+ LOGGER.warn("namespace={} already exists, skip to create", namespaceName);
return;
}
@@ -119,14 +119,13 @@ public class PulsarOptServiceImpl implements PulsarOptService {
PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(),
pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), 0);
namespaces.setPersistence(namespaceName, persistencePolicies);
- log.info("success to create namespace={}", namespaceName);
+ LOGGER.info("success to create namespace={}", namespaceName);
} catch (PulsarAdminException e) {
- log.error("failed to create namespace=" + namespaceName, e);
+ LOGGER.error("failed to create namespace=" + namespaceName, e);
throw e;
}
}
- @Override
public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
@@ -137,7 +136,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
// Topic will be returned if it exists, and created if it does not exist
if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
- log.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
+ LOGGER.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
return;
}
@@ -157,44 +156,42 @@ public class PulsarOptServiceImpl implements PulsarOptService {
pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
}
- log.info("success to create topic={}", topicFullName);
+ LOGGER.info("success to create topic={}", topicFullName);
} catch (Exception e) {
- log.error("failed to create topic=" + topicFullName, e);
+ LOGGER.error("failed to create topic=" + topicFullName, e);
throw e;
}
}
- @Override
public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
throws PulsarAdminException {
Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
Preconditions.checkNotNull(subscription, "subscription cannot be empty during creating subscription");
String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
- log.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
+ LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
try {
boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
if (!isExists) {
pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
- log.info("success to create subscription={}", subscription);
+ LOGGER.info("success to create subscription={}", subscription);
} else {
- log.warn("pulsar subscription={} already exists, skip to create", subscription);
+ LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
}
} catch (Exception e) {
- log.error("failed to create pulsar subscription=" + subscription, e);
+ LOGGER.error("failed to create pulsar subscription=" + subscription, e);
throw e;
}
}
- @Override
public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
List<String> topicList) throws PulsarAdminException {
for (String topic : topicList) {
topicBean.setTopicName(topic);
this.createSubscription(pulsarAdmin, topicBean, subscription);
}
- log.info("success to create subscription={} for multiple topics={}", subscription, topicList);
+ LOGGER.info("success to create subscription={} for multiple topics={}", subscription, topicList);
}
/**
@@ -220,7 +217,6 @@ public class PulsarOptServiceImpl implements PulsarOptService {
* @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
* Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
*/
- @Override
public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
throws PulsarAdminException {
if (StringUtils.isBlank(topic)) {
@@ -243,7 +239,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
return subscriptionList.contains(subscription);
} catch (PulsarAdminException e) {
- log.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
+ LOGGER.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
return false;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptService.java
deleted file mode 100644
index 9b519104e..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptService.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq.util;
-
-import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
-import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
-import org.apache.pulsar.client.admin.PulsarAdmin;
-import org.apache.pulsar.client.admin.PulsarAdminException;
-
-import java.util.List;
-
-/**
- * Interface of Pulsar operation
- */
-public interface PulsarOptService {
-
- void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException;
-
- void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo groupInfo, String tenant,
- String namespace) throws PulsarAdminException;
-
- void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException;
-
- void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
- throws PulsarAdminException;
-
- void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
- List<String> topics) throws PulsarAdminException;
-
- boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
- throws PulsarAdminException;
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
new file mode 100644
index 000000000..0f68ba995
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMQOperator.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.mq.util;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.ConsumerGroupResponse;
+import org.apache.inlong.manager.common.pojo.tubemq.TopicResponse;
+import org.apache.inlong.manager.common.pojo.tubemq.TubeBrokerInfo;
+import org.apache.inlong.manager.common.pojo.tubemq.TubeHttpResponse;
+import org.apache.inlong.manager.common.util.HttpUtils;
+import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Nonnull;
+import java.util.List;
+
+/**
+ * TubeMQ operator, supports creating topics and creating consumer groups.
+ */
+@Service
+public class TubeMQOperator {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
+ private static final Integer SUCCESS_CODE = 0;
+
+ /**
+ * TubeMQ const for HTTP URL format
+ */
+ private static final String TOPIC_NAME = "&topicName=";
+ private static final String CONSUME_GROUP = "&consumeGroup=";
+ private static final String GROUP_NAME = "&groupName=";
+ private static final String BROKER_ID = "&brokerId=";
+ private static final String CREATE_USER = "&createUser=";
+ private static final String CONF_MOD_AUTH_TOKEN = "&confModAuthToken=";
+
+ private static final String QUERY_TOPIC_PATH = "/webapi.htm?method=admin_query_cluster_topic_view";
+ private static final String QUERY_BROKER_PATH = "/webapi.htm?method=admin_query_broker_run_status";
+ private static final String ADD_TOPIC_PATH = "/webapi.htm?method=admin_add_new_topic_record";
+ private static final String QUERY_CONSUMER_PATH = "/webapi.htm?method=admin_query_allowed_consumer_group_info";
+ private static final String ADD_CONSUMER_PATH = "/webapi.htm?method=admin_add_authorized_consumergroup_info";
+
+ @Autowired
+ private HttpUtils httpUtils;
+
+ /**
+ * Create topic for the given tube cluster.
+ */
+ public void createTopic(@Nonnull TubeClusterInfo tubeCluster, String topicName, String operator) {
+ String masterUrl = tubeCluster.getUrl();
+ LOGGER.info("begin to create tube topic {} in master {}", topicName, masterUrl);
+ if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(topicName)) {
+ throw new BusinessException("tube master url or tube topic cannot be null");
+ }
+
+ if (this.isTopicExist(masterUrl, topicName)) {
+ LOGGER.warn("tube topic {} already exists in {}, skip to create", topicName, masterUrl);
+ return;
+ }
+
+ this.createTopicOpt(masterUrl, topicName, tubeCluster.getToken(), operator);
+ LOGGER.info("success to create tube topic {} in {}", topicName, masterUrl);
+ }
+
+ /**
+ * Create consumer group for the given tube topic and cluster.
+ */
+ public void createConsumerGroup(TubeClusterInfo tubeCluster, String topic, String consumerGroup, String operator) {
+ String masterUrl = tubeCluster.getUrl();
+ LOGGER.info("begin to create consumer group {} for topic {} in master {}", consumerGroup, topic, masterUrl);
+ if (StringUtils.isEmpty(masterUrl) || StringUtils.isEmpty(consumerGroup) || StringUtils.isEmpty(topic)) {
+ throw new BusinessException("tube master url, consumer group, or tube topic cannot be null");
+ }
+
+ if (!this.isTopicExist(masterUrl, topic)) {
+ LOGGER.warn("cannot create tube consumer group {}, as the topic {} not exists in master {}",
+ consumerGroup, topic, masterUrl);
+ return;
+ }
+
+ if (this.isConsumerGroupExist(masterUrl, topic, consumerGroup)) {
+ LOGGER.warn("tube consumer group {} already exists for topic {} in master {}, skip to create",
+ consumerGroup, topic, masterUrl);
+ return;
+ }
+
+ this.createConsumerGroupOpt(masterUrl, topic, consumerGroup, tubeCluster.getToken(), operator);
+ LOGGER.info("success to create tube consumer group {} for topic {} in {}", consumerGroup, topic, masterUrl);
+ }
+
+ /**
+ * Check if the topic is exists in the TubeMQ.
+ */
+ public boolean isTopicExist(String masterUrl, String topicName) {
+ LOGGER.info("begin to check if the tube topic {} exists", topicName);
+ String url = masterUrl + QUERY_TOPIC_PATH + TOPIC_NAME + topicName;
+ try {
+ TopicResponse topicView = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+ TopicResponse.class);
+ if (CollectionUtils.isEmpty(topicView.getData())) {
+ LOGGER.warn("tube topic {} not exists in {}", topicName, url);
+ return false;
+ }
+ LOGGER.info("tube topic {} exists in {}", topicName, url);
+ return true;
+ } catch (Exception e) {
+ String msg = String.format("failed to check if the topic %s exist in ", topicName);
+ LOGGER.error(msg + url, e);
+ throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Check if the consumer group is exists for the given topic.
+ */
+ public boolean isConsumerGroupExist(String masterUrl, String topicName, String consumerGroup) {
+ LOGGER.info("begin to check if the consumer group {} exists on topic {}", consumerGroup, topicName);
+ String url = masterUrl + QUERY_CONSUMER_PATH + TOPIC_NAME + topicName + CONSUME_GROUP + consumerGroup;
+ try {
+ ConsumerGroupResponse response = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+ ConsumerGroupResponse.class);
+ if (CollectionUtils.isEmpty(response.getData())) {
+ LOGGER.warn("tube consumer group {} not exists for topic {} in {}", consumerGroup, topicName, url);
+ return false;
+ }
+ LOGGER.info("tube consumer group {} exists for topic {} in {}", consumerGroup, topicName, url);
+ return true;
+ } catch (Exception e) {
+ String msg = String.format("failed to check if the consumer group %s for topic %s exist in ",
+ consumerGroup, topicName);
+ LOGGER.error(msg + url, e);
+ throw new BusinessException(msg + masterUrl + ", error: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Get the broker list by the given TubeMQ master URL.
+ */
+ private TubeBrokerInfo getBrokerInfo(String masterUrl) {
+ String url = masterUrl + QUERY_BROKER_PATH;
+ try {
+ TubeBrokerInfo brokerInfo = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+ TubeBrokerInfo.class);
+ if (brokerInfo.getErrCode() != SUCCESS_CODE) {
+ String msg = "failed to query tube broker from %s, error: %s";
+ LOGGER.error(String.format(msg, url, brokerInfo.getErrMsg()));
+ throw new BusinessException(String.format(msg, masterUrl, brokerInfo.getErrMsg()));
+ }
+
+ // is success, divide the broker by status
+ brokerInfo.divideBrokerListByStatus();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("success to query tube broker from {}, result {}", url, brokerInfo.getData());
+ }
+ return brokerInfo;
+ } catch (Exception e) {
+ String msg = "failed to query tube broker from %s";
+ LOGGER.error(String.format(msg, url), e);
+ throw new BusinessException(String.format(msg, masterUrl) + ", error: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Create topic operation.
+ */
+ private void createTopicOpt(String masterUrl, String topicName, String token, String operator) {
+ LOGGER.info(String.format("begin to create tube topic %s in master %s", topicName, masterUrl));
+ TubeBrokerInfo brokerView = this.getBrokerInfo(masterUrl);
+ List<Integer> allBrokers = brokerView.getAllBrokerIdList();
+ if (CollectionUtils.isEmpty(allBrokers)) {
+ String msg = String.format("cannot create topic %s, as not any brokers found in %s", topicName, masterUrl);
+ LOGGER.error(msg);
+ throw new BusinessException(msg);
+ }
+
+ // create topic for all brokers
+ String url = masterUrl + ADD_TOPIC_PATH + TOPIC_NAME + topicName
+ + BROKER_ID + StringUtils.join(allBrokers, ",")
+ + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
+ try {
+ TubeHttpResponse response = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+ TubeHttpResponse.class);
+ if (response.getErrCode() != SUCCESS_CODE) {
+ String msg = String.format("failed to create tube topic %s, error: %s",
+ topicName, response.getErrMsg());
+ LOGGER.error(msg + " in {} for brokers {}", masterUrl, allBrokers);
+ throw new BusinessException(msg);
+ }
+
+ LOGGER.info("success to create tube topic {} in {}", topicName, url);
+ } catch (Exception e) {
+ String msg = String.format("failed to create tube topic %s in %s", topicName, masterUrl);
+ LOGGER.error(msg, e);
+ throw new BusinessException(msg + ", error: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Create consumer group operation.
+ */
+ private void createConsumerGroupOpt(String masterUrl, String topicName, String consumerGroup, String token,
+ String operator) {
+ LOGGER.info(String.format("begin to create consumer group %s for topic %s in master %s",
+ consumerGroup, topicName, masterUrl));
+
+ String url = masterUrl + ADD_CONSUMER_PATH + TOPIC_NAME + topicName
+ + GROUP_NAME + consumerGroup
+ + CREATE_USER + operator + CONF_MOD_AUTH_TOKEN + token;
+ try {
+ TubeHttpResponse response = httpUtils.request(url, HttpMethod.GET, null, new HttpHeaders(),
+ TubeHttpResponse.class);
+ if (response.getErrCode() != SUCCESS_CODE) {
+ String msg = String.format("failed to create tube consumer group %s for topic %s, error: %s",
+ consumerGroup, topicName, response.getErrMsg());
+ LOGGER.error(msg + ", url {}", url);
+ throw new BusinessException(msg);
+ }
+ LOGGER.info("success to create tube topic {} in {}", topicName, url);
+ } catch (Exception e) {
+ String msg = String.format("failed to create tube topic %s in %s", topicName, masterUrl);
+ LOGGER.error(msg, e);
+ throw new BusinessException(msg + ", error: " + e.getMessage());
+ }
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
deleted file mode 100644
index 3774aba4a..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service.mq.util;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
-import org.apache.inlong.manager.common.pojo.tubemq.TubeManagerResponse;
-import org.apache.inlong.manager.common.util.HttpUtils;
-import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.HttpHeaders;
-import org.springframework.http.HttpMethod;
-import org.springframework.stereotype.Service;
-import org.springframework.util.CollectionUtils;
-
-/**
- * TubeMq operation service, such as create new topic, consumer group, etc.
- */
-@Slf4j
-@Service
-public class TubeMqOptService {
-
- private static final Gson GSON = new GsonBuilder().create(); // thread safe
-
- @Autowired
- private InlongClusterEntityMapper clusterMapper;
- @Autowired
- private HttpUtils httpUtils;
-
- /**
- * Create new topic
- */
- public String createNewTopic(AddTubeMqTopicRequest request) {
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.add("Content-Type", "application/json");
- try {
- if (CollectionUtils.isEmpty(request.getAddTopicTasks())) {
- throw new Exception("topic cannot be empty");
- }
-
- // TODO use the original method of TubeMQ to create group
- AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
- QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
- .topicName(addTopicTasksBean.getTopicName()).clusterId(1)
- .user(request.getUser()).build();
-
- String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
- TubeManagerResponse response = httpUtils
- .request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
- GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
- if (response.getErrCode() == 101) { // topic already exists
- log.info(" create tube topic {} on {} ", GSON.toJson(request),
- tubeManager + "/v1/task?method=addTopicTask");
-
- request.setClusterId(1);
- TubeManagerResponse createRsp = httpUtils
- .request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
- GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
- log.info("create tube topic success, result: {}", createRsp);
- } else {
- log.warn("tube topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
- }
- } catch (Exception e) {
- log.error("failed to create tube topic: " + request.getAddTopicTasks().get(0).getTopicName(), e);
- }
- return "";
- }
-
- /**
- * Create new consumer group
- */
- public String createNewConsumerGroup(AddTubeConsumeGroupRequest request) throws Exception {
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.add("Content-Type", "application/json");
- try {
- String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
- log.info("create tube consumer group {} on {} ", GSON.toJson(request),
- tubeManager + "/v1/task?method=addTopicTask");
- TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/group?method=add",
- HttpMethod.POST, GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
- if (response.getErrCode() == -1) { // Creation failed
- throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, response.getErrMsg());
- }
- log.info("create tube consumer group success, result: {}", response);
- } catch (BusinessException e) {
- log.error("failed to create tube consumer group: " + GSON.toJson(request), e);
- throw e;
- }
- return "";
- }
-
- /**
- * Check if the topic is exists
- */
- public boolean queryTopicIsExist(QueryTubeTopicRequest queryTubeTopicRequest) {
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.add("Content-Type", "application/json");
- try {
- String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
- TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
- HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
- if (response.getErrCode() == 0) { // topic already exists
- log.error("topic {} exists in {} ", queryTubeTopicRequest.getTopicName(), tubeManager);
- return true;
- }
- } catch (Exception e) {
- log.error("fail to query tube topic {}", queryTubeTopicRequest.getTopicName(), e);
- }
- return false;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 6f0f5c28a..b107998bf 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -105,6 +105,9 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
return ListenerResult.success();
}
+ /**
+ * TODO need support TubeMQ
+ */
private GroupInfo createGroupInfo(InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
String groupId = groupInfo.getInlongGroupId();
List<StreamSink> streamSinks = sinkService.listSink(groupId, null);
@@ -129,7 +132,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
- String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
+ String errMsg = String.format("Unsupported MQ type %s", groupInfo.getMqType());
log.error(errMsg);
throw new WorkflowListenerException(errMsg);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index daaf55050..22723787c 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -122,7 +122,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
private List<StreamSource> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
- String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
+ String errMsg = String.format("Unsupported MQ type %s", groupInfo.getMqType());
log.error(errMsg);
throw new WorkflowListenerException(errMsg);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index df8ddad08..2baff6770 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -25,19 +25,19 @@ import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.cluster.tube.TubeClusterInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
-import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
import org.apache.inlong.manager.service.cluster.InlongClusterService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
-import org.apache.inlong.manager.service.mq.util.PulsarOptService;
+import org.apache.inlong.manager.service.mq.util.PulsarOperator;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
+import org.apache.inlong.manager.service.mq.util.TubeMQOperator;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -47,7 +47,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.Arrays;
-import java.util.Collections;
import java.util.Date;
import java.util.List;
@@ -59,15 +58,15 @@ import java.util.List;
public class ConsumptionCompleteProcessListener implements ProcessEventListener {
@Autowired
- private PulsarOptService pulsarMqOptService;
+ private InlongGroupEntityMapper groupMapper;
@Autowired
- private InlongClusterService clusterService;
+ private ConsumptionEntityMapper consumptionMapper;
@Autowired
- private InlongGroupService groupService;
+ private InlongClusterService clusterService;
@Autowired
- private ConsumptionEntityMapper consumptionMapper;
+ private PulsarOperator pulsarOperator;
@Autowired
- private TubeMqOptService tubeMqOptService;
+ private TubeMQOperator tubeMQOperator;
@Override
public ProcessEvent event() {
@@ -87,12 +86,12 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
MQType mqType = MQType.forType(entity.getMqType());
if (mqType == MQType.TUBE) {
- this.createTubeConsumerGroup(entity);
+ this.createTubeConsumerGroup(entity, context.getOperator());
return ListenerResult.success("Create Tube consumer group successful");
} else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
this.createPulsarSubscription(entity);
} else {
- throw new WorkflowListenerException("Unsupported MQ type [" + mqType + "]");
+ throw new WorkflowListenerException("Unsupported MQ type " + mqType);
}
this.updateConsumerInfo(consumptionId, entity.getConsumerGroup());
@@ -116,12 +115,12 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
*/
private void createPulsarSubscription(ConsumptionEntity entity) {
String groupId = entity.getInlongGroupId();
- InlongGroupInfo groupInfo = groupService.get(groupId);
- Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
- String mqResource = groupInfo.getMqResource();
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
+ String mqResource = groupEntity.getMqResource();
Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
- String clusterTag = groupInfo.getInlongClusterTag();
+ String clusterTag = groupEntity.getInlongClusterTag();
InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
@@ -146,7 +145,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
private void createPulsarSubscription(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
List<String> topics) {
try {
- pulsarMqOptService.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
+ pulsarOperator.createSubscriptions(pulsarAdmin, subscription, topicBean, topics);
} catch (Exception e) {
log.error("create pulsar consumer group failed", e);
throw new WorkflowListenerException("failed to create pulsar consumer group");
@@ -156,19 +155,20 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
/**
* Create tube consumer group
*/
- private void createTubeConsumerGroup(ConsumptionEntity consumption) {
- AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
- addTubeConsumeGroupRequest.setClusterId(1); // TODO is cluster id needed?
- addTubeConsumeGroupRequest.setCreateUser(consumption.getCreator());
- AddTubeConsumeGroupRequest.GroupNameJsonSetBean bean = new AddTubeConsumeGroupRequest.GroupNameJsonSetBean();
- bean.setTopicName(consumption.getTopic());
- bean.setGroupName(consumption.getConsumerGroup());
- addTubeConsumeGroupRequest.setGroupNameJsonSet(Collections.singletonList(bean));
+ private void createTubeConsumerGroup(ConsumptionEntity entity, String operator) {
+ String groupId = entity.getInlongGroupId();
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ Preconditions.checkNotNull(groupEntity, "inlong group not found for groupId=" + groupId);
+ String mqResource = groupEntity.getMqResource();
+ Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
+ String clusterTag = groupEntity.getInlongClusterTag();
+ TubeClusterInfo clusterInfo = (TubeClusterInfo) clusterService.getOne(clusterTag, null, ClusterType.CLS_TUBE);
try {
- tubeMqOptService.createNewConsumerGroup(addTubeConsumeGroupRequest);
+ tubeMQOperator.createConsumerGroup(clusterInfo, entity.getTopic(), entity.getConsumerGroup(), operator);
} catch (Exception e) {
- throw new WorkflowListenerException("failed to create tube consumer group: " + addTubeConsumeGroupRequest);
+ log.error("failed to create tube consumer group: ", e);
+ throw new WorkflowListenerException("failed to create tube consumer group: " + e.getMessage());
}
}