You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2024/03/31 15:05:06 UTC
(inlong) branch master updated: [INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new c62bfff4ed [INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903)
c62bfff4ed is described below
commit c62bfff4ed40b8c057c9901a101002e3de6197b3
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Sun Mar 31 23:05:01 2024 +0800
[INLONG-9902][Manager] Data preview supports pulsar multi cluster (#9903)
---
.../queue/pulsar/PulsarQueueResourceOperator.java | 19 +++++++++++++------
1 file changed, 13 insertions(+), 6 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
index e6e1d66ef5..13b859347a 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java
@@ -49,6 +49,7 @@ import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
+import java.util.ArrayList;
import java.util.List;
/**
@@ -304,11 +305,11 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
InlongStreamInfo streamInfo, Integer messageCount) throws Exception {
String groupId = streamInfo.getInlongGroupId();
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
- PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterService.getOne(groupInfo.getInlongClusterTag(),
- null, ClusterType.PULSAR);
+ List<ClusterInfo> pulsarClusterList =
+ clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), ClusterType.PULSAR);
String tenant = inlongPulsarInfo.getPulsarTenant();
- if (StringUtils.isBlank(tenant)) {
- tenant = pulsarCluster.getPulsarTenant();
+ if (StringUtils.isBlank(tenant) && CollectionUtils.isNotEmpty(pulsarClusterList)) {
+ tenant = ((PulsarClusterInfo) pulsarClusterList.get(0)).getPulsarTenant();
}
String namespace = groupInfo.getMqResource();
@@ -317,8 +318,14 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
String clusterTag = inlongPulsarInfo.getInlongClusterTag();
String subs = String.format(PULSAR_SUBSCRIPTION_REALTIME_REVIEW, clusterTag, topicName);
boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());
- List<BriefMQMessage> briefMQMessages = pulsarOperator.queryLatestMessage(pulsarCluster, fullTopicName, subs,
- messageCount, streamInfo, serial);
+ List<BriefMQMessage> briefMQMessages = new ArrayList<>();
+ for (ClusterInfo clusterInfo : pulsarClusterList) {
+ briefMQMessages = pulsarOperator.queryLatestMessage((PulsarClusterInfo) clusterInfo, fullTopicName, subs,
+ messageCount, streamInfo, serial);
+ if (CollectionUtils.isNotEmpty(briefMQMessages)) {
+ break;
+ }
+ }
// insert the consumer group info into the inlong_consume table
Integer id = consumeService.saveBySystem(groupInfo, topicName, subs);