You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/12 08:47:21 UTC
[inlong] branch master updated: [INLONG-5491][Manager] Fix the error of configuring the Pulsar nonpartitioned topic (#5492)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 5ccec72d1 [INLONG-5491][Manager] Fix the error of configuring the Pulsar nonpartitioned topic (#5492)
5ccec72d1 is described below
commit 5ccec72d16648e9296fea7ee67321c818f0c62a5
Author: fuweng11 <76...@users.noreply.github.com>
AuthorDate: Fri Aug 12 16:47:17 2022 +0800
[INLONG-5491][Manager] Fix the error of configuring the Pulsar nonpartitioned topic (#5492)
---
.../manager/common/consts/InlongConstants.java | 4 ++
.../resource/queue/pulsar/PulsarOperator.java | 44 +++++++++++++++-------
.../queue/pulsar/PulsarResourceOperator.java | 5 ++-
3 files changed, 38 insertions(+), 15 deletions(-)
diff --git a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
index c2c584d75..28c2435d2 100644
--- a/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
+++ b/inlong-manager/manager-common/src/main/java/org/apache/inlong/manager/common/consts/InlongConstants.java
@@ -62,6 +62,10 @@ public class InlongConstants {
public static final String DEFAULT_PULSAR_AUTHENTICATION_TYPE = "token";
+ public static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
+
+ public static final String PULSAR_QUEUE_TYPE_PARALLEL = "PARALLEL";
+
/**
* Format of the Pulsar topic: "persistent://tenant/namespace/topic
*/
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
index 98fc3f689..9bfe2605b 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarOperator.java
@@ -19,11 +19,12 @@ package org.apache.inlong.manager.service.resource.queue.pulsar;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
+import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.conversion.ConversionHandle;
import org.apache.inlong.manager.common.exceptions.BusinessException;
+import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo;
import org.apache.inlong.manager.pojo.queue.pulsar.PulsarTopicBean;
-import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.service.cluster.InlongClusterServiceImpl;
import org.apache.pulsar.client.admin.Namespaces;
import org.apache.pulsar.client.admin.PulsarAdmin;
@@ -47,7 +48,6 @@ import java.util.Map;
public class PulsarOperator {
private static final Logger LOGGER = LoggerFactory.getLogger(InlongClusterServiceImpl.class);
- private static final String PULSAR_QUEUE_TYPE_SERIAL = "SERIAL";
private static final int MAX_PARTITION = 100;
private static final int RETRY_TIMES = 3;
private static final int DELAY_SECONDS = 5;
@@ -146,13 +146,14 @@ public class PulsarOperator {
String topicFullName = tenant + "/" + namespace + "/" + topic;
// Topic will be returned if it exists, and created if it does not exist
- if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+ if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
LOGGER.warn("pulsar topic={} already exists in {}", topicFullName, pulsarAdmin.getServiceUrl());
return;
}
try {
- if (PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
+ if (InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(topicBean.getQueueModule())) {
pulsarAdmin.topics().createNonPartitionedTopic(topicFullName);
String res = pulsarAdmin.lookups().lookupTopic(topicFullName);
LOGGER.info("success to create topic={}, lookup result is {}", topicFullName, res);
@@ -202,7 +203,8 @@ public class PulsarOperator {
String topicFullName = tenant + "/" + namespace + "/" + topic;
// Topic will be returned if it not exists
- if (topicIsExists(pulsarAdmin, tenant, namespace, topic)) {
+ if (topicIsExists(pulsarAdmin, tenant, namespace, topic,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()))) {
LOGGER.warn("pulsar topic={} already delete", topicFullName);
return;
}
@@ -227,7 +229,8 @@ public class PulsarOperator {
String topicName = topicBean.getTenant() + "/" + topicBean.getNamespace() + "/" + topicBean.getTopicName();
LOGGER.info("begin to create pulsar subscription={} for topic={}", subscription, topicName);
try {
- boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription);
+ boolean isExists = this.subscriptionIsExists(pulsarAdmin, topicName, subscription,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
if (isExists) {
LOGGER.warn("pulsar subscription={} already exists, skip to create", subscription);
return;
@@ -275,7 +278,8 @@ public class PulsarOperator {
* @apiNote cannot compare whether the string contains, otherwise it may be misjudged, such as:
* Topic "ab" does not exist, but if "abc" exists, "ab" will be mistakenly judged to exist
*/
- public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic) {
+ public boolean topicIsExists(PulsarAdmin pulsarAdmin, String tenant, String namespace, String topic,
+ boolean isPartitioned) {
if (StringUtils.isBlank(topic)) {
return true;
}
@@ -284,7 +288,11 @@ public class PulsarOperator {
List<String> topicList;
boolean topicExists = false;
try {
- topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+ if (isPartitioned) {
+ topicList = pulsarAdmin.topics().getPartitionedTopicList(tenant + "/" + namespace);
+ } else {
+ topicList = pulsarAdmin.topics().getList(tenant + "/" + namespace);
+ }
for (String t : topicList) {
t = t.substring(t.lastIndexOf("/") + 1); // not contains /
if (topic.equals(t)) {
@@ -316,7 +324,8 @@ public class PulsarOperator {
return topicExists;
}
- private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription) {
+ private boolean subscriptionIsExists(PulsarAdmin pulsarAdmin, String topic, String subscription,
+ boolean isPartitioned) {
int count = 0;
while (++count <= RETRY_TIMES) {
try {
@@ -324,11 +333,20 @@ public class PulsarOperator {
Thread.sleep(DELAY_SECONDS);
// first lookup to load the topic, and then query whether the subscription exists
- Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
- if (topicMap.isEmpty()) {
- LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
- continue;
+ if (isPartitioned) {
+ Map<String, String> topicMap = pulsarAdmin.lookups().lookupPartitionedTopic(topic);
+ if (topicMap.isEmpty()) {
+ LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
+ continue;
+ }
+ } else {
+ String lookupTopic = pulsarAdmin.lookups().lookupTopic(topic);
+ if (StringUtils.isBlank(lookupTopic)) {
+ LOGGER.error("result of lookups topic={} is empty, continue retry", topic);
+ continue;
+ }
}
+
List<String> subscriptionList = pulsarAdmin.topics().getSubscriptions(topic);
return subscriptionList.contains(subscription);
} catch (PulsarAdminException | InterruptedException e) {
diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
index e4f2b46c4..1b179c597 100644
--- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
+++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarResourceOperator.java
@@ -21,9 +21,9 @@ import com.google.common.base.Objects;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.manager.common.consts.InlongConstants;
+import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.GroupStatus;
-import org.apache.inlong.manager.common.consts.MQType;
import org.apache.inlong.manager.common.exceptions.WorkflowListenerException;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
@@ -205,7 +205,8 @@ public class PulsarResourceOperator implements QueueResourceOperator {
pulsarOperator.createTopic(pulsarAdmin, topicBean);
// 2. create a subscription for the pulsar topic
- boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topicName);
+ boolean exist = pulsarOperator.topicIsExists(pulsarAdmin, tenant, namespace, topicName,
+ InlongConstants.PULSAR_QUEUE_TYPE_PARALLEL.equals(topicBean.getQueueModule()));
if (!exist) {
String topicFullName = tenant + "/" + namespace + "/" + topicName;
String serviceUrl = pulsarCluster.getAdminUrl();