You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by he...@apache.org on 2022/06/07 03:20:37 UTC
[incubator-inlong] branch master updated: [INLONG-4543][Manager] Query MQ clusters from inlong cluster table (#4546)
This is an automated email from the ASF dual-hosted git repository.
healchow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 95f9dbcc0 [INLONG-4543][Manager] Query MQ clusters from inlong cluster table (#4546)
95f9dbcc0 is described below
commit 95f9dbcc000eac6f37670fe011b851f73da59545
Author: healzhou <he...@gmail.com>
AuthorDate: Tue Jun 7 11:20:31 2022 +0800
[INLONG-4543][Manager] Query MQ clusters from inlong cluster table (#4546)
---
.../common/pojo/dataproxy/PulsarClusterInfo.java | 39 -----
.../manager/service/CommonOperateService.java | 186 ---------------------
.../service/cluster/InlongClusterService.java | 11 ++
.../service/cluster/InlongClusterServiceImpl.java | 35 ++--
.../service/core/impl/ConsumptionServiceImpl.java | 1 +
.../manager/service/group/GroupCheckService.java | 69 ++++++++
.../service/mq/CreatePulsarGroupTaskListener.java | 50 +++---
.../mq/CreatePulsarResourceTaskListener.java | 55 +++---
.../mq/CreatePulsarSubscriptionTaskListener.java | 62 ++++---
.../service/mq/CreatePulsarTopicTaskListener.java | 61 ++++---
.../service/mq/CreateTubeGroupTaskListener.java | 38 +++--
.../service/mq/CreateTubeTopicTaskListener.java | 5 +-
.../service/mq/util/PulsarOptServiceImpl.java | 14 +-
.../manager/service/mq/util/PulsarUtils.java | 36 ++--
.../manager/service/mq/util/TubeMqOptService.java | 33 ++--
.../service/sink/StreamSinkServiceImpl.java | 14 +-
.../service/sort/CreateSortConfigListenerV2.java | 42 +++--
.../sort/CreateStreamSortConfigListener.java | 32 ++--
.../service/sort/PushSortConfigListener.java | 9 +-
.../manager/service/sort/util/DataFlowUtils.java | 7 +-
.../service/source/StreamSourceServiceImpl.java | 18 +-
.../transform/StreamTransformServiceImpl.java | 10 +-
.../ConsumptionCompleteProcessListener.java | 33 ++--
23 files changed, 377 insertions(+), 483 deletions(-)
diff --git a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java b/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
deleted file mode 100644
index f03204ed0..000000000
--- a/inlong-common/src/main/java/org/apache/inlong/common/pojo/dataproxy/PulsarClusterInfo.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.common.pojo.dataproxy;
-
-import lombok.AllArgsConstructor;
-import lombok.Builder;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-import java.util.Map;
-
-@Data
-@Builder
-@AllArgsConstructor
-@NoArgsConstructor
-public class PulsarClusterInfo {
-
- private String type;
- private String adminUrl;
- private String token;
- private String brokerServiceUrl;
- private Map<String, String> ext;
-
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
deleted file mode 100644
index 6f95dbd79..000000000
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/CommonOperateService.java
+++ /dev/null
@@ -1,186 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.inlong.manager.service;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.gson.Gson;
-import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
-import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.enums.MQType;
-import org.apache.inlong.manager.common.exceptions.BusinessException;
-import org.apache.inlong.manager.common.pojo.cluster.InlongClusterPageRequest;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
-import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
-import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
-import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Service;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-/**
- * Common operation service
- */
-@Service
-public class CommonOperateService {
-
- private static final Logger LOGGER = LoggerFactory.getLogger(CommonOperateService.class);
-
- @Autowired
- public ObjectMapper objectMapper;
- @Autowired
- private InlongGroupEntityMapper groupMapper;
- @Autowired
- private InlongClusterEntityMapper clusterMapper;
-
- /**
- * query some third-party-cluster info according key, such as "pulsar_adminUrl", "cluster_tube_manager", etc.
- *
- * @param key Param name.
- * @return Value of key in database.
- */
- public String getSpecifiedParam(String key) {
- String result = "";
- InlongClusterEntity clusterEntity;
- Gson gson = new Gson();
- Map<String, String> params;
-
- switch (key) {
- case InlongGroupSettings.PULSAR_SERVICE_URL: {
- clusterEntity = getMQCluster(MQType.PULSAR);
- if (clusterEntity != null) {
- result = clusterEntity.getUrl();
- }
- break;
- }
- case InlongGroupSettings.PULSAR_ADMIN_URL: {
- clusterEntity = getMQCluster(MQType.PULSAR);
- if (clusterEntity != null) {
- params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
- result = params.get(key);
- }
- break;
- }
- case InlongGroupSettings.TUBE_MANAGER_URL:
- case InlongGroupSettings.TUBE_CLUSTER_ID:
- case InlongGroupSettings.TUBE_MASTER_URL: {
- clusterEntity = getMQCluster(MQType.TUBE);
- if (clusterEntity != null) {
- if (key.equals(InlongGroupSettings.TUBE_MASTER_URL)) {
- result = clusterEntity.getUrl();
- } else {
- params = gson.fromJson(clusterEntity.getExtParams(), Map.class);
- result = params.get(key);
- }
- }
- break;
- }
- default:
- LOGGER.warn("case warn key {}", key);
- }
- return result;
- }
-
- /**
- * Get third party cluster by type.
- *
- * TODO Add data_proxy_cluster_name for query.
- */
- private InlongClusterEntity getMQCluster(MQType type) {
- List<InlongClusterEntity> clusterList = clusterMapper.selectByKey(null, null,
- InlongGroupSettings.CLUSTER_DATA_PROXY);
- if (CollectionUtils.isEmpty(clusterList)) {
- LOGGER.warn("no data proxy cluster found");
- return null;
- }
-
- String clusterTag = clusterList.get(0).getClusterTag();
- InlongClusterPageRequest request = new InlongClusterPageRequest();
- request.setClusterTag(clusterTag);
- request.setType(type.getType());
- List<InlongClusterEntity> mqClusterList = clusterMapper.selectByCondition(request);
- if (CollectionUtils.isEmpty(mqClusterList)) {
- LOGGER.warn("no mq cluster found by cluster tag={} and type={}", clusterTag, type);
- return null;
- }
-
- return mqClusterList.get(0);
- }
-
- /**
- * Get Pulsar cluster by the given type.
- *
- * @return Pulsar cluster info.
- */
- public PulsarClusterInfo getPulsarClusterInfo(String type) {
- MQType mqType = MQType.forType(type);
- InlongClusterEntity clusterEntity = getMQCluster(mqType);
- if (clusterEntity == null || StringUtils.isBlank(clusterEntity.getExtParams())) {
- throw new BusinessException("pulsar cluster or pulsar ext params is empty");
- }
-
- PulsarClusterInfo pulsarCluster = PulsarClusterInfo.builder()
- .brokerServiceUrl(clusterEntity.getUrl())
- .token(clusterEntity.getToken())
- .build();
- try {
- Map<String, String> configParams = objectMapper.readValue(clusterEntity.getExtParams(), Map.class);
- String adminUrl = configParams.get(InlongGroupSettings.PULSAR_ADMIN_URL);
- pulsarCluster.setAdminUrl(adminUrl);
- } catch (Exception e) {
- LOGGER.error("parse pulsar cluster info error: ", e);
- }
-
- Preconditions.checkNotNull(pulsarCluster.getAdminUrl(), "adminUrl is empty, check third party cluster table");
- pulsarCluster.setType(clusterEntity.getType());
- return pulsarCluster;
- }
-
- /**
- * Check whether the inlong group status is temporary
- *
- * @param groupId Inlong group id
- * @return Inlong group entity, for caller reuse
- */
- public InlongGroupEntity checkGroupStatus(String groupId, String operator) {
- InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
- Preconditions.checkNotNull(inlongGroupEntity, "groupId is invalid");
-
- List<String> managers = Arrays.asList(inlongGroupEntity.getInCharges().split(","));
- Preconditions.checkTrue(managers.contains(operator),
- String.format(ErrorCodeEnum.USER_IS_NOT_MANAGER.getMessage(), operator, managers));
-
- GroupStatus state = GroupStatus.forCode(inlongGroupEntity.getStatus());
- // Add/modify/delete is not allowed under certain group state
- if (GroupStatus.notAllowedUpdate(state)) {
- LOGGER.error("inlong group status was not allowed to add/update/delete related info");
- throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS);
- }
-
- return inlongGroupEntity;
- }
-}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
index 11645441d..49d65c39d 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterService.java
@@ -50,6 +50,17 @@ public interface InlongClusterService {
*/
InlongClusterInfo get(Integer id);
+ /**
+ * Get one cluster by the cluster tag, cluster name and cluster type.
+ *
+ * @param clusterTag cluster tag
+ * @param clusterName cluster name
+ * @param clusterType cluster type
+ * @return cluster info
+ * @apiNote No matter how many clusters there are, only one cluster is returned.
+ */
+ InlongClusterInfo getOne(String clusterTag, String clusterName, String clusterType);
+
/**
* Paging query clusters according to conditions.
*
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
index 712f43f8e..2adcd2b2a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/cluster/InlongClusterServiceImpl.java
@@ -77,6 +77,10 @@ public class InlongClusterServiceImpl implements InlongClusterService {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
private static final Gson GSON = new Gson();
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+ @Autowired
+ private InlongStreamEntityMapper streamMapper;
@Autowired
private InlongClusterOperatorFactory clusterOperatorFactory;
@Autowired
@@ -84,10 +88,6 @@ public class InlongClusterServiceImpl implements InlongClusterService {
@Autowired
private InlongClusterNodeEntityMapper clusterNodeMapper;
@Autowired
- private InlongGroupEntityMapper groupMapper;
- @Autowired
- private InlongStreamEntityMapper streamMapper;
- @Autowired
private DataProxyConfigRepository proxyRepository;
@Override
@@ -133,19 +133,32 @@ public class InlongClusterServiceImpl implements InlongClusterService {
PageHelper.startPage(request.getPageNum(), request.getPageSize());
Page<InlongClusterEntity> entityPage = (Page<InlongClusterEntity>) clusterMapper.selectByCondition(request);
- List<InlongClusterInfo> list = new ArrayList<>(entityPage.getResult().size());
- for (InlongClusterEntity entity : entityPage) {
- InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());
- InlongClusterInfo clusterInfo = instance.getFromEntity(entity);
- list.add(clusterInfo);
- }
-
+ List<InlongClusterInfo> list = entityPage.stream()
+ .map(entity -> {
+ InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());
+ return instance.getFromEntity(entity);
+ }).collect(Collectors.toList());
PageInfo<InlongClusterInfo> page = new PageInfo<>(list);
page.setTotal(list.size());
LOGGER.debug("success to list inlong cluster by {}", request);
return page;
}
+ @Override
+ public InlongClusterInfo getOne(String clusterTag, String name, String type) {
+ List<InlongClusterEntity> entityList = clusterMapper.selectByKey(clusterTag, name, type);
+ if (CollectionUtils.isEmpty(entityList)) {
+ throw new BusinessException(String.format("cluster not found by tag=%s, name=%s, type=%s",
+ clusterTag, name, type));
+ }
+
+ InlongClusterEntity entity = entityList.get(0);
+ InlongClusterOperator instance = clusterOperatorFactory.getInstance(entity.getType());
+ InlongClusterInfo result = instance.getFromEntity(entity);
+ LOGGER.debug("success to get inlong cluster by tag={}, name={}, type={}", clusterTag, name, type);
+ return result;
+ }
+
@Override
public Boolean update(InlongClusterRequest request, String operator) {
LOGGER.debug("begin to update inlong cluster={}", request);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
index fd8a25137..899777a42 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/ConsumptionServiceImpl.java
@@ -301,6 +301,7 @@ public class ConsumptionServiceImpl implements ConsumptionService {
/**
* According to groupId and topic, stitch the full path of Pulsar Topic
+ * TODO: save full topic of Pulsar in consumption info
*/
private String getFullPulsarTopic(String groupId, String topic) {
InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
new file mode 100644
index 000000000..c3062c047
--- /dev/null
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/group/GroupCheckService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.manager.service.group;
+
+import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
+import org.apache.inlong.manager.common.enums.GroupStatus;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * Service of inlong group check.
+ */
+@Service
+public class GroupCheckService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(GroupCheckService.class);
+
+ @Autowired
+ private InlongGroupEntityMapper groupMapper;
+
+ /**
+ * Check whether the inlong group status is temporary
+ *
+ * @param groupId Inlong group id
+ * @return Inlong group entity, for caller reuse
+ */
+ public InlongGroupEntity checkGroupStatus(String groupId, String operator) {
+ InlongGroupEntity inlongGroupEntity = groupMapper.selectByGroupId(groupId);
+ Preconditions.checkNotNull(inlongGroupEntity, "groupId is invalid");
+
+ List<String> managers = Arrays.asList(inlongGroupEntity.getInCharges().split(","));
+ Preconditions.checkTrue(managers.contains(operator),
+ String.format(ErrorCodeEnum.USER_IS_NOT_MANAGER.getMessage(), operator, managers));
+
+ GroupStatus state = GroupStatus.forCode(inlongGroupEntity.getStatus());
+ // Add/modify/delete is not allowed under certain group state
+ if (GroupStatus.notAllowedUpdate(state)) {
+ LOGGER.error("inlong group status was not allowed to add/update/delete related info");
+ throw new BusinessException(ErrorCodeEnum.OPT_NOT_ALLOWED_BY_STATUS);
+ }
+
+ return inlongGroupEntity;
+ }
+
+}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
index 96763a4ad..39c211ff6 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarGroupTaskListener.java
@@ -18,16 +18,19 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
+import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.dao.entity.InlongStreamEntity;
-import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
@@ -49,16 +52,14 @@ import java.util.List;
public class CreatePulsarGroupTaskListener implements QueueOperateListener {
@Autowired
- private CommonOperateService commonOperateService;
+ private InlongGroupService groupService;
@Autowired
- private ClusterBean clusterBean;
+ private InlongStreamService streamService;
@Autowired
- private InlongGroupService groupService;
+ private InlongClusterService clusterService;
@Autowired
private ConsumptionService consumptionService;
@Autowired
- private InlongStreamEntityMapper streamMapper;
- @Autowired
private PulsarOptService pulsarOptService;
@Override
@@ -69,8 +70,7 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
@Override
public ListenerResult listen(WorkflowContext context) throws WorkflowListenerException {
GroupResourceProcessForm form = (GroupResourceProcessForm) context.getProcessForm();
-
- String groupId = form.getInlongGroupId();
+ final String groupId = form.getInlongGroupId();
InlongGroupInfo groupInfo = groupService.get(groupId);
if (groupInfo == null) {
log.error("inlong group not found with groupId={}", groupId);
@@ -78,21 +78,27 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
}
// For Pulsar, each Stream corresponds to a Topic
- List<InlongStreamEntity> streamEntities = streamMapper.selectByGroupId(groupId);
- if (streamEntities == null || streamEntities.isEmpty()) {
+ List<InlongStreamBriefInfo> streamInfo = streamService.getTopicList(groupId);
+ if (streamInfo == null || streamInfo.isEmpty()) {
log.warn("inlong stream is empty for groupId={}, skip to create pulsar subscription", groupId);
return ListenerResult.success();
}
- PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
- String tenant = clusterBean.getDefaultTenant();
+
+ String clusterTag = groupInfo.getInlongClusterTag();
+ InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ }
String namespace = groupInfo.getMqResource();
- for (InlongStreamEntity streamEntity : streamEntities) {
+ for (InlongStreamBriefInfo stream : streamInfo) {
PulsarTopicBean topicBean = new PulsarTopicBean();
topicBean.setTenant(tenant);
topicBean.setNamespace(namespace);
- String topic = streamEntity.getMqResource();
+ String topic = stream.getMqResource();
topicBean.setTopicName(topic);
// Create a subscription in the Pulsar cluster you need to ensure that the Topic exists
@@ -100,13 +106,13 @@ public class CreatePulsarGroupTaskListener implements QueueOperateListener {
boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
if (!exist) {
String topicFull = tenant + "/" + namespace + "/" + topic;
- String serviceUrl = pulsarClusterInfo.getAdminUrl();
+ String serviceUrl = pulsarCluster.getAdminUrl();
log.error("topic={} not exists in {}", topicFull, serviceUrl);
throw new WorkflowListenerException("topic=" + topicFull + " not exists in " + serviceUrl);
}
- // Consumer naming rules: sortAppName_topicName_consumer_group
- String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
+ // Consumer naming rules: clusterTag_topicName_consumer_group
+ String subscription = clusterTag + "_" + topic + "_consumer_group";
pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
// Insert the consumption data into the consumption table
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
index e2cedd393..b58e9b921 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarResourceTaskListener.java
@@ -18,17 +18,20 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamBriefInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.dao.mapper.InlongStreamEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
+import org.apache.inlong.manager.service.core.InlongStreamService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
@@ -50,15 +53,13 @@ import java.util.List;
public class CreatePulsarResourceTaskListener implements QueueOperateListener {
@Autowired
- PulsarOptService pulsarOptService;
- @Autowired
- private ClusterBean clusterBean;
+ private InlongGroupService groupService;
@Autowired
- private CommonOperateService commonOperateService;
+ private InlongStreamService streamService;
@Autowired
- private InlongGroupService groupService;
+ private InlongClusterService clusterService;
@Autowired
- private InlongStreamEntityMapper streamMapper;
+ private PulsarOptService pulsarOptService;
@Override
public TaskEvent event() {
@@ -75,10 +76,12 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
if (groupInfo == null) {
throw new WorkflowListenerException("inlong group or pulsar cluster not found for groupId=" + groupId);
}
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
+
try {
- this.createPulsarProcess(pulsarInfo, pulsarClusterInfo);
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ InlongClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+ ClusterType.CLS_PULSAR);
+ this.createPulsarProcess(pulsarInfo, (PulsarClusterInfo) clusterInfo);
} catch (Exception e) {
log.error("create pulsar resource error for groupId={}", groupId, e);
throw new WorkflowListenerException("create pulsar resource error for groupId=" + groupId);
@@ -91,26 +94,29 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
/**
* Create Pulsar tenant, namespace and topic
*/
- private void createPulsarProcess(InlongPulsarInfo groupInfo, PulsarClusterInfo pulsarClusterInfo) throws Exception {
- String groupId = groupInfo.getInlongGroupId();
- log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, pulsarClusterInfo);
+ private void createPulsarProcess(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pulsarCluster) throws Exception {
+ String groupId = pulsarInfo.getInlongGroupId();
+ log.info("begin to create pulsar resource for groupId={} in cluster={}", groupId, pulsarCluster);
- String namespace = groupInfo.getMqResource();
+ String namespace = pulsarInfo.getMqResource();
Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty for groupId=" + groupId);
- String queueModule = groupInfo.getQueueModule();
+ String queueModule = pulsarInfo.getQueueModule();
Preconditions.checkNotNull(queueModule, "queue module cannot be empty for groupId=" + groupId);
- String tenant = clusterBean.getDefaultTenant();
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
// create pulsar tenant
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ }
pulsarOptService.createTenant(pulsarAdmin, tenant);
// create pulsar namespace
- pulsarOptService.createNamespace(pulsarAdmin, groupInfo, tenant, namespace);
+ pulsarOptService.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
// create pulsar topic
- Integer partitionNum = groupInfo.getPartitionNum();
- List<InlongStreamBriefInfo> streamTopicList = streamMapper.selectBriefList(groupId);
+ Integer partitionNum = pulsarInfo.getPartitionNum();
+ List<InlongStreamBriefInfo> streamTopicList = streamService.getTopicList(groupId);
PulsarTopicBean topicBean = PulsarTopicBean.builder()
.tenant(tenant).namespace(namespace).numPartitions(partitionNum).queueModule(queueModule).build();
@@ -119,8 +125,7 @@ public class CreatePulsarResourceTaskListener implements QueueOperateListener {
pulsarOptService.createTopic(pulsarAdmin, topicBean);
}
}
- log.info("finish to create pulsar resource for groupId={}, service http url={}", groupId,
- pulsarClusterInfo.getAdminUrl());
+ log.info("finish to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster);
}
@Override
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
index f5c9c9dc7..b806aef74 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarSubscriptionTaskListener.java
@@ -18,19 +18,20 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.core.ConsumptionService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
-import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.task.QueueOperateListener;
@@ -49,9 +50,7 @@ import java.util.List;
public class CreatePulsarSubscriptionTaskListener implements QueueOperateListener {
@Autowired
- private CommonOperateService commonOperateService;
- @Autowired
- private ClusterBean clusterBean;
+ private InlongClusterService clusterService;
@Autowired
private PulsarOptService pulsarOptService;
@Autowired
@@ -71,10 +70,11 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
InlongStreamInfo streamInfo = form.getStreamInfo();
final String groupId = streamInfo.getInlongGroupId();
final String streamId = streamInfo.getInlongStreamId();
- final String namespace = groupInfo.getMqResource();
- final String topic = streamInfo.getMqResource();
- PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+
+ String clusterTag = groupInfo.getInlongClusterTag();
+ InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
// Query data sink info based on groupId and streamId
List<String> sinkTypeList = sinkService.getSinkTypeList(groupId, streamId);
if (sinkTypeList == null || sinkTypeList.size() == 0) {
@@ -82,38 +82,36 @@ public class CreatePulsarSubscriptionTaskListener implements QueueOperateListene
groupId, streamId);
return ListenerResult.success();
}
- String tenant = clusterBean.getDefaultTenant();
+
+ String tenant = pulsarCluster.getTenant();
+ String namespace = groupInfo.getMqResource();
+ String topic = streamInfo.getMqResource();
PulsarTopicBean topicBean = new PulsarTopicBean();
topicBean.setTenant(tenant);
topicBean.setNamespace(namespace);
topicBean.setTopicName(topic);
// Create a subscription in the Pulsar cluster, you need to ensure that the Topic exists
- try {
- boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
- if (!exist) {
- String fullTopic = tenant + "/" + namespace + "/" + topic;
- String serviceUrl = pulsarClusterInfo.getAdminUrl();
- log.error("topic={} not exists in {}", fullTopic, serviceUrl);
- throw new BusinessException("topic=" + fullTopic + " not exists in " + serviceUrl);
- }
+ boolean exist = pulsarOptService.topicIsExists(pulsarAdmin, tenant, namespace, topic);
+ if (!exist) {
+ String fullTopic = tenant + "/" + namespace + "/" + topic;
+ String msg = String.format("topic=%s not exists in %s", fullTopic, pulsarCluster.getAdminUrl());
+ log.error(msg);
+ throw new BusinessException(msg);
+ }
- // Consumer naming rules: sortAppName_topicName_consumer_group
- String subscription = clusterBean.getAppName() + "_" + topic + "_consumer_group";
- pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
+ // Consumer naming rules: clusterTag_topicName_consumer_group
+ String subscription = clusterTag + "_" + topic + "_consumer_group";
+ pulsarOptService.createSubscription(pulsarAdmin, topicBean, subscription);
- // Insert the consumption data into the consumption table
- consumptionService.saveSortConsumption(groupInfo, topic, subscription);
- } catch (Exception e) {
- log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
- throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
- }
+ // Insert the consumption data into the consumption table
+ consumptionService.saveSortConsumption(groupInfo, topic, subscription);
} catch (Exception e) {
- log.error("create pulsar subscription error for groupId={}, streamId={}", groupId, streamId, e);
- throw new WorkflowListenerException("create pulsar subscription error, reason: " + e.getMessage());
+ log.error("failed to create pulsar subscription for groupId=" + groupId + " streamId=" + streamId, e);
+ throw new WorkflowListenerException("failed to create pulsar subscription: " + e.getMessage());
}
- log.info("finish to create single pulsar subscription for groupId={}, streamId={}", groupId, streamId);
+ log.info("success to create pulsar subscription for groupId={}, streamId={}", groupId, streamId);
return ListenerResult.success();
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
index 213127961..8e39f9b48 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreatePulsarTopicTaskListener.java
@@ -18,15 +18,18 @@
package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
import org.apache.inlong.manager.workflow.WorkflowContext;
@@ -45,9 +48,7 @@ import org.springframework.stereotype.Component;
public class CreatePulsarTopicTaskListener implements QueueOperateListener {
@Autowired
- private CommonOperateService commonOperateService;
- @Autowired
- private ClusterBean clusterBean;
+ private InlongClusterService clusterService;
@Autowired
private PulsarOptService pulsarOptService;
@@ -65,41 +66,37 @@ public class CreatePulsarTopicTaskListener implements QueueOperateListener {
final String streamId = streamInfo.getInlongStreamId();
log.info("begin to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(pulsarInfo.getMqType());
try {
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
String pulsarTopic = streamInfo.getMqResource();
- this.createTopic(pulsarInfo, pulsarTopic, pulsarClusterInfo);
+ String clusterTag = pulsarInfo.getInlongClusterTag();
+ InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ }
+
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ PulsarTopicBean topicBean = PulsarTopicBean.builder()
+ .tenant(tenant)
+ .namespace(pulsarInfo.getMqResource())
+ .topicName(pulsarTopic)
+ .queueModule(pulsarInfo.getQueueModule())
+ .numPartitions(pulsarInfo.getPartitionNum())
+ .build();
+ pulsarOptService.createTopic(pulsarAdmin, topicBean);
+ }
} catch (Exception e) {
- log.error("create pulsar topic error for groupId={}, streamId={}", groupId, streamId, e);
- throw new WorkflowListenerException(
- "create pulsar topic error for groupId=" + groupId + ", streamId=" + streamId);
+ String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg);
}
log.info("success to create pulsar topic for groupId={}, streamId={}", groupId, streamId);
return ListenerResult.success();
}
- private void createTopic(InlongPulsarInfo groupInfo, String pulsarTopic, PulsarClusterInfo pulsarClusterInfo)
- throws Exception {
- Integer partitionNum = groupInfo.getPartitionNum();
- int partition = 0;
- if (partitionNum != null && partitionNum > 0) {
- partition = partitionNum;
- }
-
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
- PulsarTopicBean topicBean = PulsarTopicBean.builder()
- .tenant(clusterBean.getDefaultTenant())
- .namespace(groupInfo.getMqResource())
- .topicName(pulsarTopic)
- .numPartitions(partition)
- .queueModule(groupInfo.getQueueModule())
- .build();
- pulsarOptService.createTopic(pulsarAdmin, topicBean);
- }
- }
-
@Override
public boolean async() {
return false;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
index 3e27e18f6..0acf8fac8 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeGroupTaskListener.java
@@ -19,15 +19,16 @@ package org.apache.inlong.manager.service.mq;
import lombok.extern.slf4j.Slf4j;
import org.apache.inlong.manager.common.beans.ReTryConfigBean;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
-import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest.GroupNameJsonSetBean;
import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
-import org.apache.inlong.manager.service.group.InlongGroupService;
+import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
+import org.apache.inlong.manager.dao.mapper.InlongGroupEntityMapper;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
@@ -46,14 +47,13 @@ import java.util.Collections;
public class CreateTubeGroupTaskListener implements QueueOperateListener {
@Autowired
- InlongGroupService groupService;
-
+ private InlongGroupEntityMapper groupMapper;
@Autowired
- TubeMqOptService tubeMqOptService;
+ private InlongClusterService clusterService;
@Autowired
- ReTryConfigBean reTryConfigBean;
+ private TubeMqOptService tubeMqOptService;
@Autowired
- private CommonOperateService commonOperateService;
+ private ReTryConfigBean reTryConfigBean;
@Override
public TaskEvent event() {
@@ -66,12 +66,18 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
String groupId = form.getInlongGroupId();
log.info("try to create consumer group for groupId {}", groupId);
- InlongGroupInfo groupInfo = groupService.get(groupId);
- String topicName = groupInfo.getMqResource();
- int clusterId = Integer.parseInt(commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID));
+ InlongGroupEntity groupEntity = groupMapper.selectByGroupId(groupId);
+ InlongClusterInfo tubeCluster = clusterService.getOne(groupEntity.getInlongClusterTag(),
+ null, ClusterType.CLS_TUBE);
+
+ // TODO use the original method of TubeMQ to create group
+ // TubeClusterDTO clusterDTO = TubeClusterDTO.getFromJson(clusters.get(0).getExtParams());
+ // int clusterId = clusterDTO.getClusterId();
+
+ String topicName = groupEntity.getMqResource();
QueryTubeTopicRequest queryTubeTopicRequest = QueryTubeTopicRequest.builder()
- .topicName(topicName).clusterId(clusterId)
- .user(groupInfo.getCreator()).build();
+ .topicName(topicName).clusterId(1)
+ .user(groupEntity.getCreator()).build();
// Query whether the tube topic exists
boolean topicExist = tubeMqOptService.queryTopicIsExist(queryTubeTopicRequest);
@@ -89,8 +95,8 @@ public class CreateTubeGroupTaskListener implements QueueOperateListener {
}
AddTubeConsumeGroupRequest addTubeConsumeGroupRequest = new AddTubeConsumeGroupRequest();
- addTubeConsumeGroupRequest.setClusterId(clusterId);
- addTubeConsumeGroupRequest.setCreateUser(groupInfo.getCreator());
+ addTubeConsumeGroupRequest.setClusterId(1);
+ addTubeConsumeGroupRequest.setCreateUser(groupEntity.getCreator());
GroupNameJsonSetBean groupNameJsonSetBean = new GroupNameJsonSetBean();
groupNameJsonSetBean.setTopicName(topicName);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
index b6bce4869..d0f459351 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/CreateTubeTopicTaskListener.java
@@ -65,9 +65,8 @@ public class CreateTubeTopicTaskListener implements QueueOperateListener {
AddTubeMqTopicRequest.AddTopicTasksBean tasksBean = new AddTubeMqTopicRequest.AddTopicTasksBean();
tasksBean.setTopicName(topicName);
request.setAddTopicTasks(Collections.singletonList(tasksBean));
- tubeMqOptService.createNewTopic(request);
-
- log.info("finish to create tube topic for groupId={}", groupId);
+ String result = tubeMqOptService.createNewTopic(request);
+ log.info("finish to create tube topic for groupId={}, result={}", groupId, result);
} catch (Exception e) {
log.error("create tube topic for groupId={} error, exception {} ", groupId, e.getMessage(), e);
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
index e01516903..45eba4d6b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOptServiceImpl.java
@@ -50,7 +50,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
@Override
public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
- log.info("begin to create tenant={}", tenant);
+ log.info("begin to create pulsar tenant={}", tenant);
Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
try {
@@ -66,7 +66,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
pulsarAdmin.tenants().createTenant(tenant, tenantInfo);
log.info("success to create pulsar tenant={}", tenant);
} catch (PulsarAdminException e) {
- log.error("create pulsar tenant={} failed", tenant, e);
+ log.error("failed to create pulsar tenant=" + tenant, e);
throw e;
}
}
@@ -119,9 +119,9 @@ public class PulsarOptServiceImpl implements PulsarOptService {
PersistencePolicies persistencePolicies = new PersistencePolicies(pulsarInfo.getEnsemble(),
pulsarInfo.getWriteQuorum(), pulsarInfo.getAckQuorum(), 0);
namespaces.setPersistence(namespaceName, persistencePolicies);
- log.info("success to create namespace={}", tenant);
+ log.info("success to create namespace={}", namespaceName);
} catch (PulsarAdminException e) {
- log.error("create namespace={} error", tenant, e);
+ log.error("failed to create namespace=" + namespaceName, e);
throw e;
}
}
@@ -159,7 +159,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
log.info("success to create topic={}", topicFullName);
} catch (Exception e) {
- log.error("create topic={} failed", topicFullName, e);
+ log.error("failed to create topic=" + topicFullName, e);
throw e;
}
}
@@ -182,7 +182,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
log.warn("pulsar subscription={} already exists, skip to create", subscription);
}
} catch (Exception e) {
- log.error("create pulsar subscription={} failed", subscription, e);
+ log.error("failed to create pulsar subscription=" + subscription, e);
throw e;
}
}
@@ -243,7 +243,7 @@ public class PulsarOptServiceImpl implements PulsarOptService {
List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
return subscriptionList.contains(subscription);
} catch (PulsarAdminException e) {
- log.error("check if the topic={} is exists error,", topic, e);
+ log.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
return false;
}
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
index ddb11af17..be8853837 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarUtils.java
@@ -19,8 +19,7 @@ package org.apache.inlong.manager.service.mq.util;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -41,37 +40,36 @@ public class PulsarUtils {
/**
* Get pulsar admin info
*/
- public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarClusterInfo) throws PulsarClientException {
- Preconditions.checkNotNull(pulsarClusterInfo.getAdminUrl(), "Pulsar adminUrl cannot be empty");
+ public static PulsarAdmin getPulsarAdmin(PulsarClusterInfo pulsarCluster) throws PulsarClientException {
+ Preconditions.checkNotNull(pulsarCluster.getAdminUrl(), "Pulsar adminUrl cannot be empty");
PulsarAdmin pulsarAdmin;
- if (StringUtils.isEmpty(pulsarClusterInfo.getToken())) {
- pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl());
+ if (StringUtils.isEmpty(pulsarCluster.getToken())) {
+ pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl());
} else {
- pulsarAdmin = getPulsarAdmin(pulsarClusterInfo.getAdminUrl(), pulsarClusterInfo.getToken(),
- InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE);
+ pulsarAdmin = getPulsarAdmin(pulsarCluster.getAdminUrl(), pulsarCluster.getToken());
}
return pulsarAdmin;
}
/**
- * Obtain the PulsarAdmin client according to the service URL, and it must be closed after use
+ * Get the pulsar admin from the given service URL.
+ *
+ * @apiNote It must be closed after use.
*/
public static PulsarAdmin getPulsarAdmin(String serviceHttpUrl) throws PulsarClientException {
return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl).build();
}
/**
- * Get pulsar admin info.
+ * Get the pulsar admin from the given service URL and token.
+ * <p/>
+ * Currently only token is supported as an authentication type.
+ *
+ * @apiNote It must be closed after use.
*/
- private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String authentication, String authenticationType)
- throws PulsarClientException {
- if (InlongGroupSettings.DEFAULT_PULSAR_AUTHENTICATION_TYPE.equals(authenticationType)) {
- return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
- .authentication(AuthenticationFactory.token(authentication)).build();
- } else {
- throw new IllegalArgumentException(
- String.format("illegal authentication type for pulsar : %s", authenticationType));
- }
+ private static PulsarAdmin getPulsarAdmin(String serviceHttpUrl, String token) throws PulsarClientException {
+ return PulsarAdmin.builder().serviceHttpUrl(serviceHttpUrl)
+ .authentication(AuthenticationFactory.token(token)).build();
}
/**
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
index 57d0bd053..3774aba4a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/TubeMqOptService.java
@@ -26,9 +26,8 @@ import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeMqTopicRequest;
import org.apache.inlong.manager.common.pojo.tubemq.QueryTubeTopicRequest;
import org.apache.inlong.manager.common.pojo.tubemq.TubeManagerResponse;
-import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.HttpUtils;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.dao.mapper.InlongClusterEntityMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
@@ -45,7 +44,7 @@ public class TubeMqOptService {
private static final Gson GSON = new GsonBuilder().create(); // thread safe
@Autowired
- private CommonOperateService commonOperateService;
+ private InlongClusterEntityMapper clusterMapper;
@Autowired
private HttpUtils httpUtils;
@@ -59,14 +58,14 @@ public class TubeMqOptService {
if (CollectionUtils.isEmpty(request.getAddTopicTasks())) {
throw new Exception("topic cannot be empty");
}
+
+ // TODO use the original method of TubeMQ to create group
AddTubeMqTopicRequest.AddTopicTasksBean addTopicTasksBean = request.getAddTopicTasks().get(0);
- String clusterIdStr = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_CLUSTER_ID);
- int clusterId = Integer.parseInt(clusterIdStr);
QueryTubeTopicRequest topicRequest = QueryTubeTopicRequest.builder()
- .topicName(addTopicTasksBean.getTopicName()).clusterId(clusterId)
+ .topicName(addTopicTasksBean.getTopicName()).clusterId(1)
.user(request.getUser()).build();
- String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
+ String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
TubeManagerResponse response = httpUtils
.request(tubeManager + "/v1/topic?method=queryCanWrite", HttpMethod.POST,
GSON.toJson(topicRequest), httpHeaders, TubeManagerResponse.class);
@@ -74,15 +73,16 @@ public class TubeMqOptService {
log.info(" create tube topic {} on {} ", GSON.toJson(request),
tubeManager + "/v1/task?method=addTopicTask");
- request.setClusterId(clusterId);
+ request.setClusterId(1);
TubeManagerResponse createRsp = httpUtils
.request(tubeManager + "/v1/task?method=addTopicTask", HttpMethod.POST,
GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
+ log.info("create tube topic success, result: {}", createRsp);
} else {
- log.warn("topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
+ log.warn("tube topic {} exists in {} ", addTopicTasksBean.getTopicName(), tubeManager);
}
} catch (Exception e) {
- log.error("fail to create tube topic " + request.getAddTopicTasks().get(0).getTopicName(), e);
+ log.error("failed to create tube topic: " + request.getAddTopicTasks().get(0).getTopicName(), e);
}
return "";
}
@@ -94,16 +94,17 @@ public class TubeMqOptService {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add("Content-Type", "application/json");
try {
- String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
+ String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
log.info("create tube consumer group {} on {} ", GSON.toJson(request),
tubeManager + "/v1/task?method=addTopicTask");
- TubeManagerResponse rsp = httpUtils.request(tubeManager + "/v1/group?method=add",
+ TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/group?method=add",
HttpMethod.POST, GSON.toJson(request), httpHeaders, TubeManagerResponse.class);
- if (rsp.getErrCode() == -1) { // Creation failed
- throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, rsp.getErrMsg());
+ if (response.getErrCode() == -1) { // Creation failed
+ throw new BusinessException(ErrorCodeEnum.CONSUMER_GROUP_CREATE_FAILED, response.getErrMsg());
}
+ log.info("create tube consumer group success, result: {}", response);
} catch (BusinessException e) {
- log.error(" fail to create tube consumer group " + GSON.toJson(request), e);
+ log.error("failed to create tube consumer group: " + GSON.toJson(request), e);
throw e;
}
return "";
@@ -116,7 +117,7 @@ public class TubeMqOptService {
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.add("Content-Type", "application/json");
try {
- String tubeManager = commonOperateService.getSpecifiedParam(InlongGroupSettings.TUBE_MANAGER_URL);
+ String tubeManager = "InlongGroupSettings.TUBE_MANAGER_URL";
TubeManagerResponse response = httpUtils.request(tubeManager + "/v1/topic?method=queryCanWrite",
HttpMethod.POST, GSON.toJson(queryTubeTopicRequest), httpHeaders, TubeManagerResponse.class);
if (response.getErrCode() == 0) { // topic already exists
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
index 026461985..455b87845 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sink/StreamSinkServiceImpl.java
@@ -41,7 +41,7 @@ import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSinkEntity;
import org.apache.inlong.manager.dao.mapper.StreamSinkEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSinkFieldEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -66,7 +66,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
@Autowired
private SinkOperationFactory operationFactory;
@Autowired
- private CommonOperateService commonOperateService;
+ private GroupCheckService groupCheckService;
@Autowired
private StreamSinkEntityMapper sinkMapper;
@Autowired
@@ -80,7 +80,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
// Check if it can be added
String groupId = request.getInlongGroupId();
- commonOperateService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
// Make sure that there is no sink info with the current groupId and streamId
String streamId = request.getInlongStreamId();
@@ -192,7 +192,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
String streamId = request.getInlongStreamId();
String sinkName = request.getSinkName();
String sinkType = request.getSinkType();
- InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+ InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
// Check whether the sink name exists with the same groupId and streamId
List<StreamSinkEntity> sinkList = sinkMapper.selectByRelatedId(groupId, streamId, sinkName);
@@ -239,7 +239,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
Preconditions.checkNotNull(id, ErrorCodeEnum.ID_IS_EMPTY.getMessage());
StreamSinkEntity entity = sinkMapper.selectByPrimaryKey(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SINK_INFO_NOT_FOUND.getMessage());
- commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+ groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
entity.setPreviousStatus(entity.getStatus());
entity.setStatus(GlobalConstants.DELETED_STATUS);
@@ -261,7 +261,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
- commonOperateService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
Date now = new Date();
List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
@@ -290,7 +290,7 @@ public class StreamSinkServiceImpl implements StreamSinkService {
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
- commonOperateService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
List<StreamSinkEntity> entityList = sinkMapper.selectByRelatedId(groupId, streamId, null);
if (CollectionUtils.isNotEmpty(entityList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
index 80678d19e..6f0f5c28a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateSortConfigListenerV2.java
@@ -23,11 +23,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.inlong.common.enums.DataTypeEnum;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupExtInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -37,7 +39,7 @@ import org.apache.inlong.manager.common.pojo.source.pulsar.PulsarSource;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.GroupResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
@@ -68,12 +70,10 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
@Autowired
private StreamSourceService sourceService;
-
@Autowired
private StreamSinkService sinkService;
-
@Autowired
- private CommonOperateService commonOperateService;
+ private InlongClusterService clusterService;
@Override
public TaskEvent event() {
@@ -125,26 +125,33 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
return new GroupInfo(groupId, streamInfos);
}
- private Map<String, List<StreamSource>> createPulsarSources(InlongGroupInfo groupInfo,
- List<InlongStreamInfo> streamInfoList) {
- MQType mqType = MQType.forType(groupInfo.getMqType());
- if (mqType != MQType.PULSAR) {
- String errMsg = String.format("Unsupported mqType={%s}", mqType);
+ private Map<String, List<StreamSource>> createPulsarSources(
+ InlongGroupInfo groupInfo, List<InlongStreamInfo> streamInfoList) {
+
+ if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
+ String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
log.error(errMsg);
throw new WorkflowListenerException(errMsg);
}
+
Map<String, List<StreamSource>> sourceMap = Maps.newHashMap();
- PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+ InlongClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+ ClusterType.CLS_PULSAR);
+
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ String adminUrl = pulsarCluster.getAdminUrl();
+ String serviceUrl = pulsarCluster.getUrl();
streamInfoList.forEach(streamInfo -> {
PulsarSource pulsarSource = new PulsarSource();
- pulsarSource.setSourceName(streamInfo.getInlongStreamId());
+ String streamId = streamInfo.getInlongStreamId();
+ pulsarSource.setSourceName(streamId);
pulsarSource.setNamespace(groupInfo.getMqResource());
pulsarSource.setTopic(streamInfo.getMqResource());
- pulsarSource.setAdminUrl(pulsarCluster.getAdminUrl());
- pulsarSource.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+ pulsarSource.setAdminUrl(adminUrl);
+ pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
- List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(),
- streamInfo.getInlongStreamId());
+
+ List<StreamSource> sourceInfos = sourceService.listSource(groupInfo.getInlongGroupId(), streamId);
for (StreamSource sourceInfo : sourceInfos) {
if (StringUtils.isEmpty(pulsarSource.getSerializationType())
&& StringUtils.isNotEmpty(sourceInfo.getSerializationType())) {
@@ -159,8 +166,7 @@ public class CreateSortConfigListenerV2 implements SortOperateListener {
}
pulsarSource.setScanStartupMode(PulsarScanStartupMode.EARLIEST.getValue());
pulsarSource.setFieldList(streamInfo.getFieldList());
- sourceMap.computeIfAbsent(streamInfo.getInlongStreamId(), key -> Lists.newArrayList())
- .add(pulsarSource);
+ sourceMap.computeIfAbsent(streamId, key -> Lists.newArrayList()).add(pulsarSource);
});
return sourceMap;
}
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
index 02e848de2..daaf55050 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/CreateStreamSortConfigListener.java
@@ -22,11 +22,13 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupOperateType;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.enums.SourceType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
import org.apache.inlong.manager.common.pojo.source.StreamSource;
@@ -36,7 +38,7 @@ import org.apache.inlong.manager.common.pojo.stream.InlongStreamExtInfo;
import org.apache.inlong.manager.common.pojo.stream.InlongStreamInfo;
import org.apache.inlong.manager.common.pojo.workflow.form.StreamResourceProcessForm;
import org.apache.inlong.manager.common.settings.InlongGroupSettings;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.sink.StreamSinkService;
import org.apache.inlong.manager.service.sort.util.ExtractNodeUtils;
import org.apache.inlong.manager.service.sort.util.LoadNodeUtils;
@@ -70,7 +72,7 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
@Autowired
private StreamSinkService streamSinkService;
@Autowired
- private CommonOperateService commonOperateService;
+ private InlongClusterService clusterService;
@Override
public TaskEvent event() {
@@ -119,22 +121,28 @@ public class CreateStreamSortConfigListener implements SortOperateListener {
}
private List<StreamSource> createPulsarSources(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo) {
- MQType mqType = MQType.forType(groupInfo.getMqType());
- if (mqType != MQType.PULSAR) {
- String errMsg = String.format("Unsupported mqType={%s}", mqType);
+ if (!MQType.MQ_PULSAR.equals(groupInfo.getMqType())) {
+ String errMsg = String.format("Unsupported mqType={%s}", groupInfo.getMqType());
log.error(errMsg);
throw new WorkflowListenerException(errMsg);
}
- PulsarClusterInfo pulsarCluster = commonOperateService.getPulsarClusterInfo(groupInfo.getMqType());
+
PulsarSource pulsarSource = new PulsarSource();
- pulsarSource.setSourceName(streamInfo.getInlongStreamId());
+ String streamId = streamInfo.getInlongStreamId();
+ pulsarSource.setSourceName(streamId);
pulsarSource.setNamespace(groupInfo.getMqResource());
pulsarSource.setTopic(streamInfo.getMqResource());
- pulsarSource.setAdminUrl(pulsarCluster.getAdminUrl());
- pulsarSource.setServiceUrl(pulsarCluster.getBrokerServiceUrl());
+
+ InlongClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null,
+ ClusterType.CLS_PULSAR);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ String adminUrl = pulsarCluster.getAdminUrl();
+ String serviceUrl = pulsarCluster.getUrl();
+
+ pulsarSource.setAdminUrl(adminUrl);
+ pulsarSource.setServiceUrl(serviceUrl);
pulsarSource.setInlongComponent(true);
- List<StreamSource> sources = streamSourceService.listSource(groupInfo.getInlongGroupId(),
- streamInfo.getInlongStreamId());
+ List<StreamSource> sources = streamSourceService.listSource(groupInfo.getInlongGroupId(), streamId);
for (StreamSource source : sources) {
if (StringUtils.isEmpty(pulsarSource.getSerializationType())
&& StringUtils.isNotEmpty(source.getSerializationType())) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
index b113370f7..a24a9264f 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/PushSortConfigListener.java
@@ -19,7 +19,6 @@ package org.apache.inlong.manager.service.sort;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.inlong.manager.common.beans.ClusterBean;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.sink.StreamSink;
@@ -47,8 +46,6 @@ public class PushSortConfigListener implements SortOperateListener {
private static final Logger LOGGER = LoggerFactory.getLogger(PushSortConfigListener.class);
- @Autowired
- private ClusterBean clusterBean;
@Autowired
private InlongGroupService groupService;
@Autowired
@@ -89,10 +86,10 @@ public class PushSortConfigListener implements SortOperateListener {
Integer sinkId = streamSink.getId();
try {
// DataFlowInfo dataFlowInfo = dataFlowUtils.createDataFlow(groupInfo, streamSink);
- String zkUrl = clusterBean.getZkUrl();
- String zkRoot = clusterBean.getZkRoot();
+ // String zkUrl = clusterBean.getZkUrl();
+ // String zkRoot = clusterBean.getZkRoot();
// push data flow info to zk
- String sortClusterName = clusterBean.getAppName();
+ // String sortClusterName = clusterBean.getAppName();
// ZkTools.updateDataFlowInfo(dataFlowInfo, sortClusterName, sinkId, zkUrl, zkRoot);
// add sink id to zk
// ZkTools.addDataFlowToCluster(sortClusterName, sinkId, zkUrl, zkRoot);
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
index 2f54a94ab..51767692b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/sort/util/DataFlowUtils.java
@@ -17,9 +17,8 @@
package org.apache.inlong.manager.service.sort.util;
-import org.apache.inlong.manager.common.beans.ClusterBean;
-import org.apache.inlong.manager.service.CommonOperateService;
import org.apache.inlong.manager.service.core.InlongStreamService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
import org.apache.inlong.manager.service.source.StreamSourceService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@@ -32,9 +31,7 @@ import org.springframework.stereotype.Service;
public class DataFlowUtils {
@Autowired
- private ClusterBean clusterBean;
- @Autowired
- private CommonOperateService commonOperateService;
+ private GroupCheckService groupCheckService;
@Autowired
private StreamSourceService streamSourceService;
@Autowired
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
index b3993d071..0a375f909 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/source/StreamSourceServiceImpl.java
@@ -40,7 +40,7 @@ import org.apache.inlong.manager.dao.entity.InlongGroupEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.dao.mapper.StreamSourceEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamSourceFieldEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@ -70,7 +70,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
@Autowired
private StreamSourceFieldEntityMapper sourceFieldMapper;
@Autowired
- private CommonOperateService commonOperateService;
+ private GroupCheckService groupCheckService;
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
@@ -80,7 +80,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
// Check if it can be added
String groupId = request.getInlongGroupId();
- InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+ InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
// According to the source type, save source information
String sourceType = request.getSourceType();
@@ -169,7 +169,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
// Check if it can be modified
String groupId = request.getInlongGroupId();
- InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+ InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
String sourceType = request.getSourceType();
StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(sourceType));
@@ -203,7 +203,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+ groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
SourceStatus curStatus = SourceStatus.forCode(entity.getStatus());
SourceStatus nextStatus = SourceStatus.TO_BE_ISSUED_DELETE;
@@ -235,7 +235,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.info("begin to restart source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+ groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
SourceRequest sourceRequest = new SourceRequest();
@@ -253,7 +253,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
LOGGER.info("begin to stop source by id={}", id);
StreamSourceEntity entity = sourceMapper.selectByIdForUpdate(id);
Preconditions.checkNotNull(entity, ErrorCodeEnum.SOURCE_INFO_NOT_FOUND.getMessage());
- commonOperateService.checkGroupStatus(entity.getInlongGroupId(), operator);
+ groupCheckService.checkGroupStatus(entity.getInlongGroupId(), operator);
StreamSourceOperation operation = operationFactory.getInstance(SourceType.forType(entity.getSourceType()));
SourceRequest sourceRequest = new SourceRequest();
@@ -272,7 +272,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
- InlongGroupEntity groupEntity = commonOperateService.checkGroupStatus(groupId, operator);
+ InlongGroupEntity groupEntity = groupCheckService.checkGroupStatus(groupId, operator);
Integer nextStatus;
if (GroupStatus.CONFIG_SUCCESSFUL.getCode().equals(groupEntity.getStatus())) {
nextStatus = SourceStatus.TO_BE_ISSUED_DELETE.getCode();
@@ -307,7 +307,7 @@ public class StreamSourceServiceImpl implements StreamSourceService {
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
// Check if it can be deleted
- commonOperateService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
sourceMapper.deleteByRelatedId(groupId, streamId);
LOGGER.info("success to delete all source by groupId={}, streamId={}", groupId, streamId);
return true;
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
index 7aacda685..9865d9660 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/transform/StreamTransformServiceImpl.java
@@ -33,7 +33,7 @@ import org.apache.inlong.manager.dao.entity.StreamTransformEntity;
import org.apache.inlong.manager.dao.entity.StreamTransformFieldEntity;
import org.apache.inlong.manager.dao.mapper.StreamTransformEntityMapper;
import org.apache.inlong.manager.dao.mapper.StreamTransformFieldEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.group.GroupCheckService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Propagation;
@@ -58,7 +58,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
@Autowired
protected StreamTransformFieldEntityMapper transformFieldMapper;
@Autowired
- protected CommonOperateService commonOperateService;
+ protected GroupCheckService groupCheckService;
@Override
@Transactional(rollbackFor = Throwable.class, propagation = Propagation.REQUIRES_NEW)
@@ -70,7 +70,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
final String groupId = transformRequest.getInlongGroupId();
final String streamId = transformRequest.getInlongStreamId();
final String transformName = transformRequest.getTransformName();
- commonOperateService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
List<StreamTransformEntity> transformEntities = transformMapper.selectByRelatedId(groupId,
streamId, transformName);
@@ -131,7 +131,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
this.checkParams(transformRequest);
// Check whether the transform can be modified
String groupId = transformRequest.getInlongGroupId();
- commonOperateService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
Preconditions.checkNotNull(transformRequest.getId(), ErrorCodeEnum.ID_IS_EMPTY.getMessage());
StreamTransformEntity transformEntity = CommonBeanUtils.copyProperties(transformRequest,
StreamTransformEntity::new);
@@ -151,7 +151,7 @@ public class StreamTransformServiceImpl implements StreamTransformService {
transformName);
Preconditions.checkNotNull(groupId, ErrorCodeEnum.GROUP_ID_IS_EMPTY.getMessage());
Preconditions.checkNotNull(streamId, ErrorCodeEnum.STREAM_ID_IS_EMPTY.getMessage());
- commonOperateService.checkGroupStatus(groupId, operator);
+ groupCheckService.checkGroupStatus(groupId, operator);
Date now = new Date();
List<StreamTransformEntity> entityList = transformMapper.selectByRelatedId(groupId, streamId, transformName);
if (CollectionUtils.isNotEmpty(entityList)) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
index 3ae77c41e..df8ddad08 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/workflow/consumption/listener/ConsumptionCompleteProcessListener.java
@@ -18,23 +18,26 @@
package org.apache.inlong.manager.service.workflow.consumption.listener;
import lombok.extern.slf4j.Slf4j;
-import org.apache.inlong.common.pojo.dataproxy.PulsarClusterInfo;
-import org.apache.inlong.manager.common.beans.ClusterBean;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ConsumptionStatus;
import org.apache.inlong.manager.common.enums.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
+import org.apache.inlong.manager.common.pojo.cluster.InlongClusterInfo;
+import org.apache.inlong.manager.common.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.common.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.pojo.tubemq.AddTubeConsumeGroupRequest;
import org.apache.inlong.manager.common.pojo.workflow.form.NewConsumptionProcessForm;
+import org.apache.inlong.manager.common.settings.InlongGroupSettings;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.ConsumptionEntity;
import org.apache.inlong.manager.dao.mapper.ConsumptionEntityMapper;
-import org.apache.inlong.manager.service.CommonOperateService;
+import org.apache.inlong.manager.service.cluster.InlongClusterService;
import org.apache.inlong.manager.service.group.InlongGroupService;
import org.apache.inlong.manager.service.mq.util.PulsarOptService;
-import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
import org.apache.inlong.manager.service.mq.util.PulsarUtils;
+import org.apache.inlong.manager.service.mq.util.TubeMqOptService;
import org.apache.inlong.manager.workflow.WorkflowContext;
import org.apache.inlong.manager.workflow.event.ListenerResult;
import org.apache.inlong.manager.workflow.event.process.ProcessEvent;
@@ -58,9 +61,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
@Autowired
private PulsarOptService pulsarMqOptService;
@Autowired
- private ClusterBean clusterBean;
- @Autowired
- private CommonOperateService commonOperateService;
+ private InlongClusterService clusterService;
@Autowired
private InlongGroupService groupService;
@Autowired
@@ -89,7 +90,7 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
this.createTubeConsumerGroup(entity);
return ListenerResult.success("Create Tube consumer group successful");
} else if (mqType == MQType.PULSAR || mqType == MQType.TDMQ_PULSAR) {
- this.createPulsarTopicMessage(entity);
+ this.createPulsarSubscription(entity);
} else {
throw new WorkflowListenerException("Unsupported MQ type [" + mqType + "]");
}
@@ -111,18 +112,24 @@ public class ConsumptionCompleteProcessListener implements ProcessEventListener
}
/**
- * Create Pulsar consumption information
+ * Create Pulsar subscription
*/
- private void createPulsarTopicMessage(ConsumptionEntity entity) {
+ private void createPulsarSubscription(ConsumptionEntity entity) {
String groupId = entity.getInlongGroupId();
InlongGroupInfo groupInfo = groupService.get(groupId);
Preconditions.checkNotNull(groupInfo, "inlong group not found for groupId=" + groupId);
String mqResource = groupInfo.getMqResource();
Preconditions.checkNotNull(mqResource, "mq resource cannot empty for groupId=" + groupId);
- PulsarClusterInfo pulsarClusterInfo = commonOperateService.getPulsarClusterInfo(entity.getMqType());
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarClusterInfo)) {
+
+ String clusterTag = groupInfo.getInlongClusterTag();
+ InlongClusterInfo clusterInfo = clusterService.getOne(clusterTag, null, ClusterType.CLS_PULSAR);
+ PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
PulsarTopicBean topicMessage = new PulsarTopicBean();
- String tenant = clusterBean.getDefaultTenant();
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongGroupSettings.DEFAULT_PULSAR_TENANT;
+ }
topicMessage.setTenant(tenant);
topicMessage.setNamespace(mqResource);