You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/07/21 11:33:35 UTC
[inlong] branch master updated: [INLONG-5112][Manager] Creating Pulsar topic by using the lookups method (#5159)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 4c519860e [INLONG-5112][Manager] Creating Pulsar topic by using the lookups method (#5159)
4c519860e is described below
commit 4c519860e69803bcbd526cd7073fbd8c87e07f4d
Author: healchow <he...@gmail.com>
AuthorDate: Thu Jul 21 19:33:31 2022 +0800
[INLONG-5112][Manager] Creating Pulsar topic by using the lookups method (#5159)
---
.../manager/service/mq/util/PulsarOperator.java | 178 ++++++++++++++-------
1 file changed, 116 insertions(+), 62 deletions(-)
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
index 00fcb5589..049ea33bd 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/mq/util/PulsarOperator.java
@@ -20,6 +20,7 @@ package org.apache.inlong.manager.service.mq.util;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
+import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.common.pojo.pulsar.PulsarTopicBean;
import org.apache.inlong.manager.common.util.Preconditions;
@@ -37,6 +38,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.List;
+import java.util.Map;
/**
* Pulsar operator, supports creating topics and creating subscription.
@@ -46,14 +48,20 @@ public class PulsarOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
private static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
+ private static final int MAX_PARTITION = 100;
+ private static final int RETRY_TIMES = 3;
+ private static final int DELAY_SECONDS = 5;
@Autowired
private ConversionHandle conversionHandle;
+ /**
+ * Create Pulsar tenant
+ */
public void createTenant(PulsarAdmin pulsarAdmin, String tenant) throws PulsarAdminException {
LOGGER.info("begin to create pulsar tenant={}", tenant);
-
Preconditions.checkNotEmpty(tenant, "Tenant cannot be empty");
+
try {
List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
boolean exists = this.tenantIsExists(pulsarAdmin, tenant);
@@ -72,14 +80,16 @@ public class PulsarOperator {
}
}
- public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo,
- String tenant, String namespace) throws PulsarAdminException {
+ /**
+ * Create Pulsar namespace
+ */
+ public void createNamespace(PulsarAdmin pulsarAdmin, InlongPulsarInfo pulsarInfo, String tenant, String namespace)
+ throws PulsarAdminException {
Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
String namespaceName = tenant + "/" + namespace;
LOGGER.info("begin to create namespace={}", namespaceName);
-
try {
// Check whether the namespace exists, and create it if it does not exist
boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
@@ -125,30 +135,9 @@ public class PulsarOperator {
}
}
- public void forceDeleteNamespace(PulsarAdmin pulsarAdmin, String tenant, String namespace)
- throws PulsarAdminException {
- Preconditions.checkNotNull(tenant, "pulsar tenant cannot be empty during create namespace");
- Preconditions.checkNotNull(namespace, "pulsar namespace cannot be empty during create namespace");
-
- String namespaceName = tenant + "/" + namespace;
- LOGGER.info("begin to delete namespace={}", namespaceName);
-
- try {
- // Check whether the namespace exists, and create it if it does not exist
- boolean isExists = this.namespacesIsExists(pulsarAdmin, tenant, namespace);
- if (!isExists) {
- LOGGER.warn("namespace={} already delete", namespaceName);
- return;
- }
- Namespaces namespaces = pulsarAdmin.namespaces();
- namespaces.deleteNamespace(namespaceName, true);
- LOGGER.info("success to delete namespace={}", namespaceName);
- } catch (PulsarAdminException e) {
- LOGGER.error("failed to delete namespace=" + namespaceName, e);
- throw e;
- }
- }
-
+ /**
+ * Create Pulsar topic
+ */
public void createTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
@@ -164,28 +153,47 @@ public class PulsarOperator {
}
try {
- String queueModule = topicBean.getQueueModule();
- // create partition topic
- if (PULSAR_QUEUE_TYPE_SERIAL.equalsIgnoreCase(queueModule)) {
- pulsarAdmin.topics().createPartitionedTopic(topicFullName, 1);
+ if (PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
+ pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
+ String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
+ LOGGER.info("success to create topic={}, lookup result is {}", topicFullName, res);
} else {
- List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
// The number of brokers as the default value of topic partition
- List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
- Integer numPartitions = brokers.size();
- if (topicBean.getNumPartitions() > 0) {
- numPartitions = topicBean.getNumPartitions();
+ List<String> clusters = PulsarUtils.getPulsarClusters(pulsarAdmin);
+ Integer numPartitions = topicBean.getNumPartitions();
+ if (numPartitions < 0 || numPartitions <= MAX_PARTITION) {
+ List<String> brokers = pulsarAdmin.brokers().getActiveBrokers(clusters.get(0));
+ numPartitions = brokers.size();
}
+
pulsarAdmin.topics().createPartitionedTopic(topicFullName, numPartitions);
+ Map<String, String> res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+ // if lookup failed (res.size not equals the partition number)
+ if (res.keySet().size() != numPartitions) {
+ // look up partition failed, retry to get partition numbers
+ for (int i = 0; (i < RETRY_TIMES && res.keySet().size() != numPartitions); i++) {
+ res = pulsarAdmin.lookups().lookupPartitionedTopic(topicFullName);
+ try {
+ Thread.sleep(500);
+ } catch (InterruptedException e) {
+ LOGGER.error("Thread has been interrupted");
+ }
+ }
+ }
+ if (numPartitions != res.keySet().size()) {
+ throw new PulsarAdminException("The number of partitions not equal to lookupPartitionedTopic");
+ }
+ LOGGER.info("success to create topic={}", topicFullName);
}
-
- LOGGER.info("success to create topic={}", topicFullName);
- } catch (Exception e) {
+ } catch (PulsarAdminException e) {
LOGGER.error("failed to create topic=" + topicFullName, e);
throw e;
}
}
+ /**
+ * Force delete Pulsar topic
+ */
public void forceDeleteTopic(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean) throws PulsarAdminException {
Preconditions.checkNotNull(topicBean, "pulsar topic info cannot be empty");
@@ -209,6 +217,9 @@ public class PulsarOperator {
}
}
+ /**
+ * Create a Pulsar subscription for the given topic
+ */
public void createSubscription(PulsarAdmin pulsarAdmin, PulsarTopicBean topicBean, String subscription)
throws PulsarAdminException {
Preconditions.checkNotNull(topicBean, "can not find tenant information to create subscription");
@@ -216,21 +227,23 @@ public class PulsarOperator {
String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
-
try {
boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
- if (!isExists) {
- pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
- LOGGER.info("success to create subscription={}", subscription);
- } else {
+ if (isExists) {
LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
+ return;
}
- } catch (Exception e) {
+ pulsarAdmin.topics().createSubscription(topicName, subscription, MessageId.latest);
+ LOGGER.info("success to create subscription={}", subscription);
+ } catch (PulsarAdminException e) {
LOGGER.error("failed to create pulsar subscription=" + subscription, e);
throw e;
}
}
+ /**
+ * Create a Pulsar subscription for the specified topic list
+ */
public void createSubscriptions(PulsarAdmin pulsarAdmin, String subscription, PulsarTopicBean topicBean,
List<String> topicList) throws PulsarAdminException {
for (String topic : topicList) {
@@ -249,7 +262,7 @@ public class PulsarOperator {
}
/**
- * Check whether the specified Namespace exists under the specified Tenant
+ * Check whether the Pulsar namespace exists under the specified tenant
*/
private boolean namespacesIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace)
throws PulsarAdminException {
@@ -263,30 +276,71 @@ public class PulsarOperator {
* @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
* Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
*/
- public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic)
- throws PulsarAdminException {
+ public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic) {
if (StringUtils.isBlank(topic)) {
return true;
}
// persistent://tenant/namespace/topic
- List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
- for (String t : topicList) {
- t = t.substring(t.lastIndexOf("/") + 1); // Cannot contain /
- if (topic.equals(t)) {
- return true;
+ List<String> topicList;
+ boolean topicExists = false;
+ try {
+ topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+ for (String t : topicList) {
+ t = t.substring(t.lastIndexOf("/") + 1); // not contains /
+ if (topic.equals(t)) {
+ topicExists = true;
+ break;
+ }
+ }
+ } catch (PulsarAdminException pe) {
+ LOGGER.error("check if the pulsar topic={} exists error, begin retry", topic, pe);
+ int count = 0;
+ try {
+ while (!topicExists && ++count <= RETRY_TIMES) {
+ LOGGER.info("check whether the pulsar topic={} exists error, try count={}", topic, count);
+ Thread.sleep(DELAY_SECONDS);
+
+ topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+ for (String t : topicList) {
+ t = t.substring(t.lastIndexOf("/") + 1);
+ if (topic.equals(t)) {
+ topicExists = true;
+ break;
+ }
+ }
+ }
+ } catch (Exception e) {
+ LOGGER.error("after retry, check if the pulsar topic={} exists still error", topic, pe);
}
}
- return false;
+ return topicExists;
}
- public boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
- try {
- List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
- return subscriptionList.contains(subscription);
- } catch (PulsarAdminException e) {
- LOGGER.error("failed to check the subscription=" + subscription + " exists for topic=" + topic, e);
- return false;
+ private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
+ int count = 0;
+ while (++count <= RETRY_TIMES) {
+ try {
+ LOGGER.info("check whether the subscription exists for topic={}, try count={}", topic, count);
+ Thread.sleep(DELAY_SECONDS);
+
+ // first lookup to load the topic, and then query whether the subscription exists
+ Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
+ if (topicMap.isEmpty()) {
+ LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
+ continue;
+ }
+ List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
+ return subscriptionList.contains(subscription);
+ } catch (PulsarAdminException | InterruptedException e) {
+ LOGGER.error("check if the subscription exists for topic={} error, continue retry", topic, e);
+ if (count == RETRY_TIMES) {
+ LOGGER.error("after {} times retry, still check subscription exception for topic {}", count, topic);
+ throw new BusinessException("check if the subscription exists error");
+ }
+ }
}
+ return false;
}
+
}