You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/12 08:47:21 UTC

[inlong] branch master updated: [INLONG-5491][Manager] Fix the error of configuring the Pulsar nonpartitioned topic (#5492)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ccec72d1 [INLONG-5491][Manager] Fix the error of configuring the Pulsar nonpartitioned topic (#5492)
5ccec72d1 is described below

commit 5ccec72d16648e9296fea7ee67321c818f0c62a5
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Fri Aug 12 16:47:17 2022 +0800

    [INLONG-5491][Manager] Fix the error of configuring the Pulsar nonpartitioned topic (#5492)
---
 .../manager/common/consts/InlongConstants.java     |  4 ++
 .../resource/queue/pulsar/PulsarOperator.java      | 44 +++++++++++++++-------
 .../queue/pulsar/PulsarResourceOperator.java       |  5 ++-
 3 files changed, 38 insertions(+), 15 deletions(-)

diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c2c584d75..28c2435d2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -62,6 +62,10 @@ public class InlongConstants {
 
     public static final String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
 
+    public static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
+
+    public static final String PULSAR_QUEUE_TYPE_PARALLEL = "PARALLEL";
+
     /**
      * Format of the Pulsar topic: "persistent://tenant/namespace/topic
      */
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 98fc3f689..9bfe2605b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -19,11 +19,12 @@ package org.apache.inlong.manager.service.resource.queue.pulsar;
 
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
 import org.apache.inlong.manager.common.conversion.ConversionHandle;
 import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
-import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
 import org.apache.pulsar.client.admin.Namespaces;
 import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -47,7 +48,6 @@ import java.util.Map;
 public class PulsarOperator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
-    private static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
     private static final int MAX_PARTITION = 100;
     private static final int RETRY_TIMES = 3;
     private static final int DELAY_SECONDS = 5;
@@ -146,13 +146,14 @@ public class PulsarOperator {
         String topicFullName = tenant + "/" + namespace + "/" + topic;
 
         // Topic will be returned if it exists, and created if it does not exist
-        if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+        if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
+                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
             LOGGER.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
             return;
         }
 
         try {
-            if (PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
+            if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
                 pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
                 String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
                 LOGGER.info("success to create topic={}, lookup result is {}", topicFullName, res);
@@ -202,7 +203,8 @@ public class PulsarOperator {
         String topicFullName = tenant + "/" + namespace + "/" + topic;
 
         // Topic will be returned if it not exists
-        if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+        if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
+                InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
             LOGGER.warn("pulsar topic={} already delete", topicFullName);
             return;
         }
@@ -227,7 +229,8 @@ public class PulsarOperator {
         String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
         LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
         try {
-            boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
+            boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription,
+                    InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
             if (isExists) {
                 LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
                 return;
@@ -275,7 +278,8 @@ public class PulsarOperator {
      * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
      *         Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
      */
-    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic) {
+    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic,
+            boolean isPartitioned) {
         if (StringUtils.isBlank(topic)) {
             return true;
         }
@@ -284,7 +288,11 @@ public class PulsarOperator {
         List<String> topicList;
         boolean topicExists = false;
         try {
-            topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+            if (isPartitioned) {
+                topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+            } else {
+                topicList = pulsarAdmin.topics().getList(tenant + "/" + namespace);
+            }
             for (String t : topicList) {
                 t = t.substring(t.lastIndexOf("/") + 1); // not contains /
                 if (topic.equals(t)) {
@@ -316,7 +324,8 @@ public class PulsarOperator {
         return topicExists;
     }
 
-    private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
+    private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
+            boolean isPartitioned) {
         int count = 0;
         while (++count <= RETRY_TIMES) {
             try {
@@ -324,11 +333,20 @@ public class PulsarOperator {
                 Thread.sleep(DELAY_SECONDS);
 
                 // first lookup to load the topic, and then query whether the subscription exists
-                Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
-                if (topicMap.isEmpty()) {
-                    LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
-                    continue;
+                if (isPartitioned) {
+                    Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
+                    if (topicMap.isEmpty()) {
+                        LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
+                        continue;
+                    }
+                } else {
+                    String lookupTopic = pulsarAdmin.lookups().lookupTopic(topic);
+                    if (StringUtils.isBlank(lookupTopic)) {
+                        LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
+                        continue;
+                    }
                 }
+
                 List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
                 return subscriptionList.contains(subscription);
             } catch (PulsarAdminException | InterruptedException e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index e4f2b46c4..1b179c597 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -21,9 +21,9 @@ import com.google.common.base.Objects;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.enums.ClusterType;
 import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
 import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
 import org.apache.inlong.manager.common.util.Preconditions;
 import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
@@ -205,7 +205,8 @@ public class PulsarResourceOperator implements QueueResourceOperator {
             pulsarOperator.createTopic(pulsarAdmin, topicBean);
 
             // 2. create a subscription for the pulsar topic
-            boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topicName);
+            boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topicName,
+                    InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
             if (!exist) {
                 String topicFullName = tenant + "/" + namespace + "/" + topicName;
                 String serviceUrl = pulsarCluster.getAdminUrl();