You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/09 09:57:10 UTC

[pulsar] branch branch-2.8 updated: Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14614)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new d2256c8  Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14614)
d2256c8 is described below

commit d2256c82c22e2bf3b69c3e8e8072827bc6135ee3
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Mar 9 17:55:16 2022 +0800

    Fix ConsumerBuilderImpl#subscribeAsync blocks calling thread. (#14614)
---
 .../pulsar/client/impl/ConsumerBuilderImpl.java    | 75 +++++++++++++---------
 1 file changed, 44 insertions(+), 31 deletions(-)

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index dba18d9..f902d76 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -24,9 +24,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import lombok.AccessLevel;
@@ -55,6 +53,7 @@ import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.util.RetryMessageUtil;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 
 @Getter(AccessLevel.PUBLIC)
@@ -116,48 +115,62 @@ public class ConsumerBuilderImpl<T> implements ConsumerBuilder<T> {
             return FutureUtil.failedFuture(
                     new InvalidConfigurationException("KeySharedPolicy must set with KeyShared subscription"));
         }
+        CompletableFuture<Void> applyDLQConfig;
         if(conf.isRetryEnable() && conf.getTopicNames().size() > 0 ) {
             TopicName topicFirst = TopicName.get(conf.getTopicNames().iterator().next());
-            String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
-            String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
 
             //Issue 9327: do compatibility check in case of the default retry and dead letter topic name changed
             String oldRetryLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
             String oldDeadLetterTopic = topicFirst.getNamespace() + "/" + conf.getSubscriptionName() + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
-            try {
-                if (client.getPartitionedTopicMetadata(oldRetryLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
-                    retryLetterTopic = oldRetryLetterTopic;
-                }
-                if (client.getPartitionedTopicMetadata(oldDeadLetterTopic)
-                        .get(client.conf.getOperationTimeoutMs(), TimeUnit.MILLISECONDS).partitions > 0) {
-                    deadLetterTopic = oldDeadLetterTopic;
-                }
-            } catch (InterruptedException | TimeoutException e) {
-                return FutureUtil.failedFuture(e);
-            } catch (ExecutionException e) {
-                return FutureUtil.failedFuture(e.getCause());
-            }
-
-            if(conf.getDeadLetterPolicy() == null) {
-                conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
+            DeadLetterPolicy deadLetterPolicy = conf.getDeadLetterPolicy();
+            if (deadLetterPolicy == null || StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())
+                    || StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                CompletableFuture<PartitionedTopicMetadata> retryLetterTopicMetadata =
+                        client.getPartitionedTopicMetadata(oldRetryLetterTopic);
+                CompletableFuture<PartitionedTopicMetadata> deadLetterTopicMetadata =
+                        client.getPartitionedTopicMetadata(oldDeadLetterTopic);
+                applyDLQConfig = CompletableFuture.allOf(retryLetterTopicMetadata, deadLetterTopicMetadata)
+                        .thenAccept(__ -> {
+                            String retryLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+                                    + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX;
+                            String deadLetterTopic = topicFirst + "-" + conf.getSubscriptionName()
+                                    + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX;
+                            if (retryLetterTopicMetadata.join().partitions > 0) {
+                                retryLetterTopic = oldRetryLetterTopic;
+                            }
+                            if (deadLetterTopicMetadata.join().partitions > 0) {
+                                deadLetterTopic = oldDeadLetterTopic;
+                            }
+                            if (deadLetterPolicy == null) {
+                                conf.setDeadLetterPolicy(DeadLetterPolicy.builder()
                                         .maxRedeliverCount(RetryMessageUtil.MAX_RECONSUMETIMES)
                                         .retryLetterTopic(retryLetterTopic)
                                         .deadLetterTopic(deadLetterTopic)
                                         .build());
+                            } else {
+                                if (StringUtils.isBlank(deadLetterPolicy.getRetryLetterTopic())) {
+                                    conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
+                                }
+                                if (StringUtils.isBlank(deadLetterPolicy.getDeadLetterTopic())) {
+                                    conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
+                                }
+                            }
+                            conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                        });
             } else {
-                if (StringUtils.isBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
-                    conf.getDeadLetterPolicy().setRetryLetterTopic(retryLetterTopic);
-                }
-                if (StringUtils.isBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
-                    conf.getDeadLetterPolicy().setDeadLetterTopic(deadLetterTopic);
-                }
+                conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+                applyDLQConfig = CompletableFuture.completedFuture(null);
             }
-            conf.getTopicNames().add(conf.getDeadLetterPolicy().getRetryLetterTopic());
+        } else {
+            applyDLQConfig = CompletableFuture.completedFuture(null);
         }
-        return interceptorList == null || interceptorList.size() == 0 ?
-                client.subscribeAsync(conf, schema, null) :
-                client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+        return applyDLQConfig.thenCompose(__ -> {
+            if (interceptorList == null || interceptorList.size() == 0) {
+                return client.subscribeAsync(conf, schema, null);
+            } else {
+                return client.subscribeAsync(conf, schema, new ConsumerInterceptors<>(interceptorList));
+            }
+        });
     }
 
     @Override