You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/09 09:57:10 UTC
[pulsar] branch branch-2.8 updated: Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14614)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new d2256c8 Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14614)
d2256c8 is described below
commit d2256c82c22e2bf3b69c3e8e8072827bc6135ee3
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Mar 9 17:55:16 2022 +0800
Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14614)
---
.../pulsar/client/impl/ConsumerBuilderImpl.java | 75 +++++++++++++---------
1 file changed, 44 insertions(+), 31 deletions(-)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index dba18d9..f902d76 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -24,9 +24,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import lombok.AccessLevel;
@@ -55,6 +53,7 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.util.FutureUtil;
@Getter(AccessLevel.PUBLIC)
@@ -116,48 +115,62 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
return FutureUtil.failedFuture(
new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
}
+ CompletableFuture<Void> applyDLQConfig;
if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
- String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
- String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
//Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
- try {
- if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
- .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
- retryLetterTopic = oldRetryLetterTopic;
- }
- if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
- .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
- deadLetterTopic = oldDeadLetterTopic;
- }
- } catch (InterruptedException | TimeoutException e) {
- return FutureUtil.failedFuture(e);
- } catch (ExecutionException e) {
- return FutureUtil.failedFuture(e.getCause());
- }
-
- if(conf.getDeadLetterPolicy() == null) {
- conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
+ DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
+ if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
+ || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+ CompletableFuture<PartitionedTopicMetadata> retryLetterTopicMetadata =
+ client.getPartitionedTopicMetadata(oldRetryLetterTopic);
+ CompletableFuture<PartitionedTopicMetadata> deadLetterTopicMetadata =
+ client.getPartitionedTopicMetadata(oldDeadLetterTopic);
+ applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
+ .thenAccept(__ -> {
+ String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+ + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+ String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+ + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+ if (retryLetterTopicMetadata.join().partitions > 0) {
+ retryLetterTopic = oldRetryLetterTopic;
+ }
+ if (deadLetterTopicMetadata.join().partitions > 0) {
+ deadLetterTopic = oldDeadLetterTopic;
+ }
+ if (deadLetterPolicy == null) {
+ conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
.retryLetterTopic(retryLetterTopic)
.deadLetterTopic(deadLetterTopic)
.build());
+ } else {
+ if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
+ conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
+ }
+ if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+ conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
+ }
+ }
+ conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+ });
} else {
- if (StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
- conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
- }
- if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
- conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
- }
+ conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+ applyDLQConfig = CompletableFuture.completedFuture(null);
}
- conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+ } else {
+ applyDLQConfig = CompletableFuture.completedFuture(null);
}
- return interceptorList == null || interceptorList.size() == 0 ?
- client.subscribeAsync(conf, schema, null) :
- client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+ return applyDLQConfig.thenCompose(__ -> {
+ if (interceptorList == null || interceptorList.size() == 0) {
+ return client.subscribeAsync(conf, schema, null);
+ } else {
+ return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+ }
+ });
}
@Override