You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/09 11:25:35 UTC

[inlong] 05/07: [INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit d4d737167d2f5df013cb457939f2def6961c04c4
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Nov 9 17:48:51 2022 +0800

    [INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464)
---
 .../queue/pulsar/PulsarResourceOperator.java       | 163 ++++++++++++---------
 1 file changed, 95 insertions(+), 68 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 5b698b856..23c13b740 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
 import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
 import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -45,6 +44,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -84,42 +84,51 @@ public class PulsarResourceOperator implements QueueResourceOperator {
 
         // get pulsar cluster via the inlong cluster tag from the inlong group
         String clusterTag = groupInfo.getInlongClusterTag();
-        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(clusterTag, null,
-                ClusterType.PULSAR);
-        try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
-            // create pulsar tenant and namespace
-            String tenant = pulsarCluster.getTenant();
-            if (StringUtils.isEmpty(tenant)) {
-                tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
-            }
-            String namespace = groupInfo.getMqResource();
-            InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
-            // if the group was not successful, need create tenant and namespace
-            if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
-                pulsarOperator.createTenant(pulsarAdmin, tenant);
-                log.info("success to create pulsar tenant for groupId={}, tenant={}", groupId, tenant);
-                pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
-                log.info("success to create pulsar namespace for groupId={}, namespace={}", groupId, namespace);
-            }
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+            try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+                String clusterName = pulsarCluster.getName();
+                // create pulsar tenant and namespace
+                String tenant = pulsarCluster.getTenant();
+                if (StringUtils.isEmpty(tenant)) {
+                    tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+                }
+                String namespace = groupInfo.getMqResource();
+                InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+                // if the group was not successful, need create tenant and namespace
+                if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
+                    pulsarOperator.createTenant(pulsarAdmin, tenant);
+                    log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}",
+                            groupId, tenant, clusterName);
+                    pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
+                    log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
+                            groupId, namespace, clusterName);
+                }
 
-            // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
-                return;
+                // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
+                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+                if (streamInfoList == null || streamInfoList.isEmpty()) {
+                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
+                            groupId, clusterName);
+                    return;
+                }
+                // create pulsar topic and subscription
+                for (InlongStreamBriefInfo stream : streamInfoList) {
+                    this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
+                    this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(),
+                            stream.getInlongStreamId());
+                }
+            } catch (Exception e) {
+                String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
+                        pulsarCluster.toString());
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
             }
-            // create pulsar topic and subscription
-            for (InlongStreamBriefInfo stream : streamInfoList) {
-                this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
-                this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), stream.getInlongStreamId());
-            }
-        } catch (Exception e) {
-            String msg = String.format("failed to create pulsar resource for groupId=%s", groupId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
         }
-
-        log.info("success to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster);
+        log.info("success to create pulsar resource for groupId={}", groupId);
     }
 
     @Override
@@ -129,22 +138,28 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         String groupId = groupInfo.getInlongGroupId();
         log.info("begin to delete pulsar resource for groupId={}", groupId);
 
-        ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
-        try {
-            List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
-            if (streamInfoList == null || streamInfoList.isEmpty()) {
-                log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
-                return;
-            }
-            for (InlongStreamBriefInfo streamInfo : streamInfoList) {
-                this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
+            String clusterName = clusterInfo.getName();
+            try {
+                List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+                if (streamInfoList == null || streamInfoList.isEmpty()) {
+                    log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
+                            groupId, clusterName);
+                    return;
+                }
+                for (InlongStreamBriefInfo streamInfo : streamInfoList) {
+                    this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+                }
+            } catch (Exception e) {
+                log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e);
+                throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
             }
-        } catch (Exception e) {
-            log.error("failed to delete pulsar resource for groupId=" + groupId, e);
-            throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
         }
-
-        log.info("success to delete pulsar resource for groupId={}, cluster={}", groupId, clusterInfo);
+        log.info("success to delete pulsar resource for groupId={}", groupId);
     }
 
     @Override
@@ -157,17 +172,22 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         String streamId = streamInfo.getInlongStreamId();
         log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
 
-        try {
-            // get pulsar cluster via the inlong cluster tag from the inlong group
-            PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
-                    groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
-            // create pulsar topic and subscription
-            this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
-            this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource(), streamId);
-        } catch (Exception e) {
-            String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg + ": " + e.getMessage());
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+            try {
+                // create pulsar topic and subscription
+                this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
+                this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster,
+                        streamInfo.getMqResource(), streamId);
+            } catch (Exception e) {
+                String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
+                        groupId, streamId,pulsarCluster.getName());
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg + ": " + e.getMessage());
+            }
         }
 
         log.info("success to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
@@ -182,16 +202,23 @@ public class PulsarResourceOperator implements QueueResourceOperator {
         String streamId = streamInfo.getInlongStreamId();
         log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId);
 
-        try {
-            ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
-            this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
-            log.info("success to delete pulsar topic for groupId={}, streamId={}", groupId, streamId);
-        } catch (Exception e) {
-            String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
-            log.error(msg, e);
-            throw new WorkflowListenerException(msg);
+        List<PulsarClusterInfo> pulsarClusters =
+                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+                        .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+                        .collect(Collectors.toList());
+        for (PulsarClusterInfo clusterInfo : pulsarClusters) {
+            String clusterName = clusterInfo.getName();
+            try {
+                this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+                log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}",
+                        groupId, streamId, clusterName);
+            } catch (Exception e) {
+                String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s",
+                        groupId, streamId, clusterName);
+                log.error(msg, e);
+                throw new WorkflowListenerException(msg);
+            }
         }
-
         log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId);
     }