You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/27 08:36:31 UTC

[pulsar] 09/10: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b01d5d2da269c5f072167e8f21d569e25a35306f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800

    [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
    
    * [improve][broker] Avoid reconnection when a partitioned topic was created concurrently
    
    ### Motivation
    
    When a partitioned topic was created concurrently, especially when
    automatically created by many producers. This case can be reproduced
    easily by configuring `allowAutoTopicCreationType=non-partitioned` and
    starting a Pulsar standalone. Then, run the following code:
    
    ```java
    try (PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650").build()) {
        for (int i = 0; i < 10; i++) {
            client.newProducer().topic("topic").createAsync();
        }
        Thread.sleep(1000);
    }
    ```
    
    We can see a lot of "Could not get connection while
    getPartitionedTopicMetadata" warning logs at client side, while there
    were more warning logs with full stack traces at broker side:
    
    ```
    2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN  org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata [/127.0.0.1:64846] persistent://public/default/topic: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    ```
    
    It's because when broker handles the partitioned metadata command, it
    calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and
    will try creating a partitioned topic if it doesn't exist. It's a race
    condition that if many connections are established during a short time
    interval and one of them created successfully, the following will fail
    with the `AlreadyExistsException`.
    
    ### Modifications
    
    Handles the `MetadataStoreException.AlreadyExistsException` in
    `unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke
    `fetchPartitionedTopicMetadataAsync` to get the partitioned metadata
    again.
    
    ### Verifying this change
    
    Even if without this patch, the creation of producers could also succeed
    because they will reconnect to broker again after 100 ms because broker
    will return a `ServiceNotReady` error in thiss case. The only way to
    verify this fix is reproducing the bug again with this patch, we can
    see no reconnection will happen from the logs.
    
    * Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently"
    
    This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087.
    
    * Handle AlreadyExistsException in fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync
    
    (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
---
 .../pulsar/broker/service/BrokerService.java       | 48 +++++++++++++++++-----
 1 file changed, 38 insertions(+), 10 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bdaded637b6..d943734e0a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2412,16 +2412,44 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 .thenCompose(topicExists -> {
                     return fetchPartitionedTopicMetadataAsync(topicName)
                             .thenCompose(metadata -> {
-                                // If topic is already exist, creating partitioned topic is not allowed.
-                                if (metadata.partitions == 0
-                                        && !topicExists
-                                        && !topicName.isPartitioned()
-                                        && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
-                                        && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-                                    return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
-                                } else {
-                                    return CompletableFuture.completedFuture(metadata);
-                                }
+                                CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
+
+                                // There are a couple of potentially blocking calls, which we cannot make from the
+                                // MetadataStore callback thread.
+                                pulsar.getExecutor().execute(() -> {
+                                    // If topic is already exist, creating partitioned topic is not allowed.
+
+                                    if (metadata.partitions == 0
+                                            && !topicExists
+                                            && !topicName.isPartitioned()
+                                            && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+                                            && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+
+                                        pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+                                                .thenAccept(md -> future.complete(md))
+                                                .exceptionally(ex -> {
+                                                    if (ex.getCause()
+                                                            instanceof MetadataStoreException.AlreadyExistsException) {
+                                                        // The partitioned topic might be created concurrently
+                                                        fetchPartitionedTopicMetadataAsync(topicName)
+                                                                .whenComplete((metadata2, ex2) -> {
+                                                                    if (ex2 == null) {
+                                                                        future.complete(metadata2);
+                                                                    } else {
+                                                                        future.completeExceptionally(ex2);
+                                                                    }
+                                                                });
+                                                    } else {
+                                                        future.completeExceptionally(ex);
+                                                    }
+                                                    return null;
+                                                });
+                                    } else {
+                                        future.complete(metadata);
+                                    }
+                                });
+
+                                return future;
                             });
                 });
     }