You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2024/03/31 15:05:06 UTC

(inlong) branch master updated: [INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new c62bfff4ed [INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903)
c62bfff4ed is described below

commit c62bfff4ed40b8c057c9901a101002e3de6197b3
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Sun Mar 31 23:05:01 2024 +0800

    [INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903)
---
 .../queue/pulsar/PulsarQueueResourceOperator.java     | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index e6e1d66ef5..13b859347a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -49,6 +49,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -304,11 +305,11 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
             InlongStreamInfo streamInfo, Integer messageCount) throws Exception {
         String groupId = streamInfo.getInlongGroupId();
         InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
-        PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(groupInfo.getInlongClusterTag(),
-                null, ClusterType.PULSAR);
+        List<ClusterInfo> pulsarClusterList =
+                clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR);
         String tenant = inlongPulsarInfo.getPulsarTenant();
-        if (StringUtils.isBlank(tenant)) {
-            tenant = pulsarCluster.getPulsarTenant();
+        if (StringUtils.isBlank(tenant) && CollectionUtils.isNotEmpty(pulsarClusterList)) {
+            tenant = ((PulsarClusterInfo) pulsarClusterList.get(0)).getPulsarTenant();
         }
 
         String namespace = groupInfo.getMqResource();
@@ -317,8 +318,14 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
         String clusterTag = inlongPulsarInfo.getInlongClusterTag();
         String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
         boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
-        List<BriefMQMessage> briefMQMessages = pulsarOperator.queryLatestMessage(pulsarCluster, fullTopicName, subs,
-                messageCount, streamInfo, serial);
+        List<BriefMQMessage> briefMQMessages = new ArrayList<>();
+        for (ClusterInfo clusterInfo : pulsarClusterList) {
+            briefMQMessages = pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo, fullTopicName, subs,
+                    messageCount, streamInfo, serial);
+            if (CollectionUtils.isNotEmpty(briefMQMessages)) {
+                break;
+            }
+        }
 
         // insert the consumer group info into the inlong_consume table
         Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);