You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/11/09 11:25:35 UTC
[inlong] 05/07: [INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit d4d737167d2f5df013cb457939f2def6961c04c4
Author: vernedeng <de...@pku.edu.cn>
AuthorDate: Wed Nov 9 17:48:51 2022 +0800
[INLONG-6463][Manager] Support create subscription and topic of multiple pulsar cluster (#6464)
---
.../queue/pulsar/PulsarResourceOperator.java | 163 ++++++++++++---------
1 file changed, 95 insertions(+), 68 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index 5b698b856..23c13b740 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -27,7 +27,6 @@ import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupStatus;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
-import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo;
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
@@ -45,6 +44,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Operator for create Pulsar Tenant, Namespace, Topic and Subscription
@@ -84,42 +84,51 @@ public class PulsarResourceOperator implements QueueResourceOperator {
// get pulsar cluster via the inlong cluster tag from the inlong group
String clusterTag = groupInfo.getInlongClusterTag();
- PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(clusterTag, null,
- ClusterType.PULSAR);
- try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
- // create pulsar tenant and namespace
- String tenant = pulsarCluster.getTenant();
- if (StringUtils.isEmpty(tenant)) {
- tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
- }
- String namespace = groupInfo.getMqResource();
- InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
- // if the group was not successful, need create tenant and namespace
- if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
- pulsarOperator.createTenant(pulsarAdmin, tenant);
- log.info("success to create pulsar tenant for groupId={}, tenant={}", groupId, tenant);
- pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
- log.info("success to create pulsar namespace for groupId={}, namespace={}", groupId, namespace);
- }
+ List<PulsarClusterInfo> pulsarClusters =
+ clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR).stream()
+ .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+ .collect(Collectors.toList());
+ for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+ try (PulsarAdmin pulsarAdmin = PulsarUtils.getPulsarAdmin(pulsarCluster)) {
+ String clusterName = pulsarCluster.getName();
+ // create pulsar tenant and namespace
+ String tenant = pulsarCluster.getTenant();
+ if (StringUtils.isEmpty(tenant)) {
+ tenant = InlongConstants.DEFAULT_PULSAR_TENANT;
+ }
+ String namespace = groupInfo.getMqResource();
+ InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
+ // if the group was not successful, need create tenant and namespace
+ if (!Objects.equal(GroupStatus.CONFIG_SUCCESSFUL.getCode(), groupInfo.getStatus())) {
+ pulsarOperator.createTenant(pulsarAdmin, tenant);
+ log.info("success to create pulsar tenant for groupId={}, tenant={}, cluster={}",
+ groupId, tenant, clusterName);
+ pulsarOperator.createNamespace(pulsarAdmin, pulsarInfo, tenant, namespace);
+ log.info("success to create pulsar namespace for groupId={}, namespace={}, cluster={}",
+ groupId, namespace, clusterName);
+ }
- // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
- List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
- if (streamInfoList == null || streamInfoList.isEmpty()) {
- log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
- return;
+ // create pulsar topic - each Inlong Stream corresponds to a Pulsar topic
+ List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+ if (streamInfoList == null || streamInfoList.isEmpty()) {
+ log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
+ groupId, clusterName);
+ return;
+ }
+ // create pulsar topic and subscription
+ for (InlongStreamBriefInfo stream : streamInfoList) {
+ this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
+ this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(),
+ stream.getInlongStreamId());
+ }
+ } catch (Exception e) {
+ String msg = String.format("failed to create pulsar resource for groupId=%s, cluster=%s", groupId,
+ pulsarCluster.toString());
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg + ": " + e.getMessage());
}
- // create pulsar topic and subscription
- for (InlongStreamBriefInfo stream : streamInfoList) {
- this.createTopic(pulsarInfo, pulsarCluster, stream.getMqResource());
- this.createSubscription(pulsarInfo, pulsarCluster, stream.getMqResource(), stream.getInlongStreamId());
- }
- } catch (Exception e) {
- String msg = String.format("failed to create pulsar resource for groupId=%s", groupId);
- log.error(msg, e);
- throw new WorkflowListenerException(msg + ": " + e.getMessage());
}
-
- log.info("success to create pulsar resource for groupId={}, cluster={}", groupId, pulsarCluster);
+ log.info("success to create pulsar resource for groupId={}", groupId);
}
@Override
@@ -129,22 +138,28 @@ public class PulsarResourceOperator implements QueueResourceOperator {
String groupId = groupInfo.getInlongGroupId();
log.info("begin to delete pulsar resource for groupId={}", groupId);
- ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
- try {
- List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
- if (streamInfoList == null || streamInfoList.isEmpty()) {
- log.warn("skip to create pulsar topic and subscription as no streams for groupId={}", groupId);
- return;
- }
- for (InlongStreamBriefInfo streamInfo : streamInfoList) {
- this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
+ List<PulsarClusterInfo> pulsarClusters =
+ clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+ .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+ .collect(Collectors.toList());
+ for (PulsarClusterInfo clusterInfo : pulsarClusters) {
+ String clusterName = clusterInfo.getName();
+ try {
+ List<InlongStreamBriefInfo> streamInfoList = streamService.getTopicList(groupId);
+ if (streamInfoList == null || streamInfoList.isEmpty()) {
+ log.warn("skip to create pulsar topic and subscription as no streams for groupId={}, cluster={}",
+ groupId, clusterName);
+ return;
+ }
+ for (InlongStreamBriefInfo streamInfo : streamInfoList) {
+ this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+ }
+ } catch (Exception e) {
+ log.error("failed to delete pulsar resource for groupId={}, cluster={}", groupId, clusterName, e);
+ throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
}
- } catch (Exception e) {
- log.error("failed to delete pulsar resource for groupId=" + groupId, e);
- throw new WorkflowListenerException("failed to delete pulsar resource: " + e.getMessage());
}
-
- log.info("success to delete pulsar resource for groupId={}, cluster={}", groupId, clusterInfo);
+ log.info("success to delete pulsar resource for groupId={}", groupId);
}
@Override
@@ -157,17 +172,22 @@ public class PulsarResourceOperator implements QueueResourceOperator {
String streamId = streamInfo.getInlongStreamId();
log.info("begin to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
- try {
- // get pulsar cluster via the inlong cluster tag from the inlong group
- PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(
- groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
- // create pulsar topic and subscription
- this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
- this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource(), streamId);
- } catch (Exception e) {
- String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
- log.error(msg, e);
- throw new WorkflowListenerException(msg + ": " + e.getMessage());
+ List<PulsarClusterInfo> pulsarClusters =
+ clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+ .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+ .collect(Collectors.toList());
+ for (PulsarClusterInfo pulsarCluster : pulsarClusters) {
+ try {
+ // create pulsar topic and subscription
+ this.createTopic((InlongPulsarInfo) groupInfo, pulsarCluster, streamInfo.getMqResource());
+ this.createSubscription((InlongPulsarInfo) groupInfo, pulsarCluster,
+ streamInfo.getMqResource(), streamId);
+ } catch (Exception e) {
+ String msg = String.format("failed to create pulsar topic for groupId=%s, streamId=%s, cluster=%s",
+ groupId, streamId,pulsarCluster.getName());
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg + ": " + e.getMessage());
+ }
}
log.info("success to create pulsar resource for groupId={}, streamId={}", groupId, streamId);
@@ -182,16 +202,23 @@ public class PulsarResourceOperator implements QueueResourceOperator {
String streamId = streamInfo.getInlongStreamId();
log.info("begin to delete pulsar resource for groupId={} streamId={}", groupId, streamId);
- try {
- ClusterInfo clusterInfo = clusterService.getOne(groupInfo.getInlongClusterTag(), null, ClusterType.PULSAR);
- this.deletePulsarTopic(groupInfo, (PulsarClusterInfo) clusterInfo, streamInfo.getMqResource());
- log.info("success to delete pulsar topic for groupId={}, streamId={}", groupId, streamId);
- } catch (Exception e) {
- String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s", groupId, streamId);
- log.error(msg, e);
- throw new WorkflowListenerException(msg);
+ List<PulsarClusterInfo> pulsarClusters =
+ clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR).stream()
+ .map(clusterInfo -> (PulsarClusterInfo) clusterInfo)
+ .collect(Collectors.toList());
+ for (PulsarClusterInfo clusterInfo : pulsarClusters) {
+ String clusterName = clusterInfo.getName();
+ try {
+ this.deletePulsarTopic(groupInfo, clusterInfo, streamInfo.getMqResource());
+ log.info("success to delete pulsar topic for groupId={}, streamId={}, cluster={}",
+ groupId, streamId, clusterName);
+ } catch (Exception e) {
+ String msg = String.format("failed to delete pulsar topic for groupId=%s, streamId=%s, cluster=%s",
+ groupId, streamId, clusterName);
+ log.error(msg, e);
+ throw new WorkflowListenerException(msg);
+ }
}
-
log.info("success to delete pulsar resource for groupId={}, streamId={}", groupId, streamId);
}