You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/21 11:33:35 UTC

[inlong] branch master updated: [INLONG-5112][Manager] Creating Pulsar topic by using the lookups method (#5159)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 4c519860e [INLONG-5112][Manager] Creating Pulsar topic by using the lookups method (#5159)
4c519860e is described below

commit 4c519860e69803bcbd526cd7073fbd8c87e07f4d
Author: healchow <he...@gmail.com>
AuthorDate: Thu Jul 21 19:33:31 2022 +0800

    [INLONG-5112][Manager] Creating Pulsar topic by using the lookups method (#5159)
---
 .../manager/service/mq/util/PulsarOperator.java    | 178 ++++++++++++++-------
 1 file changed, 116 insertions(+), 62 deletions(-)

diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
index 00fcb5589..049ea33bd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.mq.util;
 import com.google.common.collect.Sets;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.inlong.manager.common.conversion.ConversionHandle;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
 import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
 import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
 import org.apache.inlong.manager.common.util.Preconditions;
@@ -37,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 
 import java.util.List;
+import java.util.Map;
 
 /**
  * Pulsar operator, supports creating topics and creating subscription.
@@ -46,14 +48,20 @@ public class PulsarOperator {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
     private static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
+    private static final int MAX_PARTITION = 100;
+    private static final int RETRY_TIMES = 3;
+    private static final int DELAY_SECONDS = 5;
 
     @Autowired
     private ConversionHandle conversionHandle;
 
+    /**
+     * Create Pulsar tenant
+     */
     public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
         LOGGER.info("begin to create pulsar tenant={}", tenant);
-
         Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
+
         try {
             List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
             boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
@@ -72,14 +80,16 @@ public class PulsarOperator {
         }
     }
 
-    public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo,
-            String tenant, String namespace) throws PulsarAdminException {
+    /**
+     * Create Pulsar namespace
+     */
+    public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo, String tenant, String namespace)
+            throws PulsarAdminException {
         Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
         Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
 
         String namespaceName = tenant + "/" + namespace;
         LOGGER.info("begin to create namespace={}", namespaceName);
-
         try {
             // Check whether the namespace exists, and create it if it does not exist
             boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
@@ -125,30 +135,9 @@ public class PulsarOperator {
         }
     }
 
-    public void forceDeleteNamespace(PulsarAdmin pulsarAdmin, String tenant, String namespace)
-            throws PulsarAdminException {
-        Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
-        Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
-
-        String namespaceName = tenant + "/" + namespace;
-        LOGGER.info("begin to delete namespace={}", namespaceName);
-
-        try {
-            // Check whether the namespace exists, and create it if it does not exist
-            boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
-            if (!isExists) {
-                LOGGER.warn("namespace={} already delete", namespaceName);
-                return;
-            }
-            Namespaces namespaces = pulsarAdmin.namespaces();
-            namespaces.deleteNamespace(namespaceName, true);
-            LOGGER.info("success to delete namespace={}", namespaceName);
-        } catch (PulsarAdminException e) {
-            LOGGER.error("failed to delete namespace=" + namespaceName, e);
-            throw e;
-        }
-    }
-
+    /**
+     * Create Pulsar topic
+     */
     public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
         Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
 
@@ -164,28 +153,47 @@ public class PulsarOperator {
         }
 
         try {
-            String queueModule = topicBean.getQueueModule();
-            // create partition topic
-            if (PULSAR_QUEUE_TYPE_SERIAL.equalsIgnoreCase(queueModule)) {
-                pulsarAdmin.topics().createPartitionedTopic(topicFullName, 1);
+            if (PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
+                pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
+                String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
+                LOGGER.info("success to create topic={}, lookup result is {}", topicFullName, res);
             } else {
-                List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
                 // The number of brokers as the default value of topic partition
-                List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
-                Integer numPartitions = brokers.size();
-                if (topicBean.getNumPartitions() > 0) {
-                    numPartitions = topicBean.getNumPartitions();
+                List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+                Integer numPartitions = topicBean.getNumPartitions();
+                if (numPartitions < 0 || numPartitions <= MAX_PARTITION) {
+                    List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
+                    numPartitions = brokers.size();
                 }
+
                 pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
+                Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+                // if lookup failed (res.size not equals the partition number)
+                if (res.keySet().size() != numPartitions) {
+                    // look up partition failed, retry to get partition numbers
+                    for (int i = 0; (i < RETRY_TIMES && res.keySet().size() != numPartitions); i++) {
+                        res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+                        try {
+                            Thread.sleep(500);
+                        } catch (InterruptedException e) {
+                            LOGGER.error("Thread has been interrupted");
+                        }
+                    }
+                }
+                if (numPartitions != res.keySet().size()) {
+                    throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
+                }
+                LOGGER.info("success to create topic={}", topicFullName);
             }
-
-            LOGGER.info("success to create topic={}", topicFullName);
-        } catch (Exception e) {
+        } catch (PulsarAdminException e) {
             LOGGER.error("failed to create topic=" + topicFullName, e);
             throw e;
         }
     }
 
+    /**
+     * Force delete Pulsar topic
+     */
     public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
         Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
 
@@ -209,6 +217,9 @@ public class PulsarOperator {
         }
     }
 
+    /**
+     * Create a Pulsar subscription for the given topic
+     */
     public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
             throws PulsarAdminException {
         Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
@@ -216,21 +227,23 @@ public class PulsarOperator {
 
         String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
         LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
-
         try {
             boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
-            if (!isExists) {
-                pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
-                LOGGER.info("success to create subscription={}", subscription);
-            } else {
+            if (isExists) {
                 LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
+                return;
             }
-        } catch (Exception e) {
+            pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
+            LOGGER.info("success to create subscription={}", subscription);
+        } catch (PulsarAdminException e) {
             LOGGER.error("failed to create pulsar subscription=" + subscription, e);
             throw e;
         }
     }
 
+    /**
+     * Create a Pulsar subscription for the specified topic list
+     */
     public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
             List<String> topicList) throws PulsarAdminException {
         for (String topic : topicList) {
@@ -249,7 +262,7 @@ public class PulsarOperator {
     }
 
     /**
-     * Check whether the specified Namespace exists under the specified Tenant
+     * Check whether the Pulsar namespace exists under the specified tenant
      */
     private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
             throws PulsarAdminException {
@@ -263,30 +276,71 @@ public class PulsarOperator {
      * @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
      *         Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
      */
-    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
-            throws PulsarAdminException {
+    public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic) {
         if (StringUtils.isBlank(topic)) {
             return true;
         }
 
         // persistent://tenant/namespace/topic
-        List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
-        for (String t : topicList) {
-            t = t.substring(t.lastIndexOf("/") + 1); // Cannot contain /
-            if (topic.equals(t)) {
-                return true;
+        List<String> topicList;
+        boolean topicExists = false;
+        try {
+            topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+            for (String t : topicList) {
+                t = t.substring(t.lastIndexOf("/") + 1); // not contains /
+                if (topic.equals(t)) {
+                    topicExists = true;
+                    break;
+                }
+            }
+        } catch (PulsarAdminException pe) {
+            LOGGER.error("check if the pulsar topic={} exists error, begin retry", topic, pe);
+            int count = 0;
+            try {
+                while (!topicExists && ++count <= RETRY_TIMES) {
+                    LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topic, count);
+                    Thread.sleep(DELAY_SECONDS);
+
+                    topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+                    for (String t : topicList) {
+                        t = t.substring(t.lastIndexOf("/") + 1);
+                        if (topic.equals(t)) {
+                            topicExists = true;
+                            break;
+                        }
+                    }
+                }
+            } catch (Exception e) {
+                LOGGER.error("after retry, check if the pulsar topic={} exists still error", topic, pe);
             }
         }
-        return false;
+        return topicExists;
     }
 
-    public boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
-        try {
-            List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
-            return subscriptionList.contains(subscription);
-        } catch (PulsarAdminException e) {
-            LOGGER.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
-            return false;
+    private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
+        int count = 0;
+        while (++count <= RETRY_TIMES) {
+            try {
+                LOGGER.info("check whether the subscription exists for topic={}, try count={}", topic, count);
+                Thread.sleep(DELAY_SECONDS);
+
+                // first lookup to load the topic, and then query whether the subscription exists
+                Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
+                if (topicMap.isEmpty()) {
+                    LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
+                    continue;
+                }
+                List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
+                return subscriptionList.contains(subscription);
+            } catch (PulsarAdminException | InterruptedException e) {
+                LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e);
+                if (count == RETRY_TIMES) {
+                    LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic);
+                    throw new BusinessException("check if the subscription exists error");
+                }
+            }
         }
+        return false;
     }
+
 }