You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/29 03:25:14 UTC

[pulsar] branch branch-2.8 updated: Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)"

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 3b164f5ac0c Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)"
3b164f5ac0c is described below

commit 3b164f5ac0c900d6c162596892ec7e702e48f937
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Jul 29 11:25:00 2022 +0800

    Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)"
    
    This reverts commit b01d5d2da269c5f072167e8f21d569e25a35306f.
---
 .../pulsar/broker/service/BrokerService.java       | 48 +++++-----------------
 1 file changed, 10 insertions(+), 38 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d943734e0a8..bdaded637b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2412,44 +2412,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 .thenCompose(topicExists -> {
                     return fetchPartitionedTopicMetadataAsync(topicName)
                             .thenCompose(metadata -> {
-                                CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
-
-                                // There are a couple of potentially blocking calls, which we cannot make from the
-                                // MetadataStore callback thread.
-                                pulsar.getExecutor().execute(() -> {
-                                    // If topic is already exist, creating partitioned topic is not allowed.
-
-                                    if (metadata.partitions == 0
-                                            && !topicExists
-                                            && !topicName.isPartitioned()
-                                            && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
-                                            && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-
-                                        pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
-                                                .thenAccept(md -> future.complete(md))
-                                                .exceptionally(ex -> {
-                                                    if (ex.getCause()
-                                                            instanceof MetadataStoreException.AlreadyExistsException) {
-                                                        // The partitioned topic might be created concurrently
-                                                        fetchPartitionedTopicMetadataAsync(topicName)
-                                                                .whenComplete((metadata2, ex2) -> {
-                                                                    if (ex2 == null) {
-                                                                        future.complete(metadata2);
-                                                                    } else {
-                                                                        future.completeExceptionally(ex2);
-                                                                    }
-                                                                });
-                                                    } else {
-                                                        future.completeExceptionally(ex);
-                                                    }
-                                                    return null;
-                                                });
-                                    } else {
-                                        future.complete(metadata);
-                                    }
-                                });
-
-                                return future;
+                                // If topic is already exist, creating partitioned topic is not allowed.
+                                if (metadata.partitions == 0
+                                        && !topicExists
+                                        && !topicName.isPartitioned()
+                                        && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+                                        && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+                                    return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+                                } else {
+                                    return CompletableFuture.completedFuture(metadata);
+                                }
                             });
                 });
     }