You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/24 09:57:10 UTC
[pulsar] branch branch-2.8 updated: [Cherry-pick #11597] Use get
instead of join to avoid getting stuck (#12110)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 5de46f9 [Cherry-pick #11597] Use get instead of join to avoid getting stuck (#12110)
5de46f9 is described below
commit 5de46f93672d4eef3604754517eb55773b4320a1
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Sep 24 17:56:19 2021 +0800
[Cherry-pick #11597] Use get instead of join to avoid getting stuck (#12110)
---
.../java/org/apache/pulsar/broker/PulsarService.java | 3 ++-
.../pulsar/broker/admin/impl/NamespacesBase.java | 6 ++++--
.../pulsar/broker/admin/v1/NonPersistentTopics.java | 12 ++++++++++--
.../pulsar/broker/admin/v2/NonPersistentTopics.java | 13 +++++++++++--
.../pulsar/broker/namespace/OwnershipCache.java | 11 ++++++++++-
.../pulsar/client/impl/ConsumerBuilderImpl.java | 20 +++++++++++++++-----
6 files changed, 52 insertions(+), 13 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 61ffef7..eb01fa4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1063,7 +1063,8 @@ public class PulsarService implements AutoCloseable {
List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList();
long topicLoadStart = System.nanoTime();
- for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) {
+ for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).
+ get(config.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)) {
try {
TopicName topicName = TopicName.get(topic);
if (bundle.includes(topicName) && !isTransactionSystemTopic(topicName)) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2ba0a09..3fd6c37 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -391,7 +391,8 @@ public abstract class NamespacesBase extends AdminResource {
List<String> topics;
try {
- topics = pulsar().getNamespaceService().getFullListOfTopics(namespaceName).join();
+ topics = pulsar().getNamespaceService().getFullListOfTopics(namespaceName).
+ get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
@@ -571,7 +572,8 @@ public abstract class NamespacesBase extends AdminResource {
try {
NamespaceBundle bundle = validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange,
authoritative, true);
- List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName).join();
+ List<String> topics = pulsar().getNamespaceService().getListOfPersistentTopics(namespaceName)
+ .get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
for (String topic : topics) {
NamespaceBundle topicBundle = pulsar().getNamespaceService()
.getBundle(TopicName.get(topic));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
index daf6ea0..c5d5870 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/NonPersistentTopics.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
@@ -297,7 +298,14 @@ public class NonPersistentTopics extends PersistentTopics {
}
private Topic getTopicReference(TopicName topicName) {
- return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+ try {
+ return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+ .get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ } catch (InterruptedException | TimeoutException e) {
+ throw new RestException(e);
+ }
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
index 2cfb20f..d390049 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java
@@ -29,6 +29,8 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
@@ -530,7 +532,14 @@ public class NonPersistentTopics extends PersistentTopics {
}
private Topic getTopicReference(TopicName topicName) {
- return pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join()
- .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+ try {
+ return pulsar().getBrokerService().getTopicIfExists(topicName.toString())
+ .get(config().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS)
+ .orElseThrow(() -> new RestException(Status.NOT_FOUND, "Topic not found"));
+ } catch (ExecutionException e) {
+ throw new RestException(e.getCause());
+ } catch (InterruptedException | TimeoutException e) {
+ throw new RestException(e);
+ }
}
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
index 116397a..c27575c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/OwnershipCache.java
@@ -30,7 +30,10 @@ import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -371,7 +374,13 @@ public class OwnershipCache {
public OwnedBundle getOwnedBundle(NamespaceBundle bundle) {
CompletableFuture<OwnedBundle> future = ownedBundlesCache.getIfPresent(ServiceUnitUtils.path(bundle));
if (future != null && future.isDone() && !future.isCompletedExceptionally()) {
- return future.join();
+ try {
+ return future.get(pulsar.getConfiguration().getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS);
+ } catch (InterruptedException | TimeoutException e) {
+ throw new RuntimeException(e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e.getCause());
+ }
} else {
return null;
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 47f9991..dba18d9 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -24,7 +24,9 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.AccessLevel;
@@ -122,11 +124,19 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
//Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
- if (client.getPartitionedTopicMetadata(oldRetryLetterTopic).join().partitions > 0) {
- retryLetterTopic = oldRetryLetterTopic;
- }
- if (client.getPartitionedTopicMetadata(oldDeadLetterTopic).join().partitions > 0) {
- deadLetterTopic = oldDeadLetterTopic;
+ try {
+ if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
+ .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+ retryLetterTopic = oldRetryLetterTopic;
+ }
+ if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
+ .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
+ deadLetterTopic = oldDeadLetterTopic;
+ }
+ } catch (InterruptedException | TimeoutException e) {
+ return FutureUtil.failedFuture(e);
+ } catch (ExecutionException e) {
+ return FutureUtil.failedFuture(e.getCause());
}
if(conf.getDeadLetterPolicy() == null) {