You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:12 UTC

[pulsar] 07/29: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cc9ff5965a191d4fa727de7bab925d0c522b415f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800

    [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
    
    * [improve][broker] Avoid reconnection when a partitioned topic was created concurrently
    
    ### Motivation
    
    When a partitioned topic was created concurrently, especially when
    automatically created by many producers. This case can be reproduced
    easily by configuring `allowAutoTopicCreationType=non-partitioned` and
    starting a Pulsar standalone. Then, run the following code:
    
    ```java
    try (PulsarClient client = PulsarClient.builder()
            .serviceUrl("pulsar://localhost:6650").build()) {
        for (int i = 0; i < 10; i++) {
            client.newProducer().topic("topic").createAsync();
        }
        Thread.sleep(1000);
    }
    ```
    
    We can see a lot of "Could not get connection while
    getPartitionedTopicMetadata" warning logs at client side, while there
    were more warning logs with full stack traces at broker side:
    
    ```
    2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN  org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata [/127.0.0.1:64846] persistent://public/default/topic: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
    ```
    
    It's because when broker handles the partitioned metadata command, it
    calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and
    will try creating a partitioned topic if it doesn't exist. It's a race
    condition that if many connections are established during a short time
    interval and one of them created successfully, the following will fail
    with the `AlreadyExistsException`.
    
    ### Modifications
    
    Handles the `MetadataStoreException.AlreadyExistsException` in
    `unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke
    `fetchPartitionedTopicMetadataAsync` to get the partitioned metadata
    again.
    
    ### Verifying this change
    
    Even if without this patch, the creation of producers could also succeed
    because they will reconnect to broker again after 100 ms because broker
    will return a `ServiceNotReady` error in thiss case. The only way to
    verify this fix is reproducing the bug again with this patch, we can
    see no reconnection will happen from the logs.
    
    * Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently"
    
    This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087.
    
    * Handle AlreadyExistsException in fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync
    
    (cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
---
 .../org/apache/pulsar/broker/service/BrokerService.java   | 15 ++++++++++++++-
 1 file changed, 14 insertions(+), 1 deletion(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a4840d9f52f..1287076c10f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2540,7 +2540,20 @@ public class BrokerService implements Closeable {
                                         pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
                                                 .thenAccept(md -> future.complete(md))
                                                 .exceptionally(ex -> {
-                                                    future.completeExceptionally(ex);
+                                                    if (ex.getCause()
+                                                            instanceof MetadataStoreException.AlreadyExistsException) {
+                                                        // The partitioned topic might be created concurrently
+                                                        fetchPartitionedTopicMetadataAsync(topicName)
+                                                                .whenComplete((metadata2, ex2) -> {
+                                                                    if (ex2 == null) {
+                                                                        future.complete(metadata2);
+                                                                    } else {
+                                                                        future.completeExceptionally(ex2);
+                                                                    }
+                                                                });
+                                                    } else {
+                                                        future.completeExceptionally(ex);
+                                                    }
                                                     return null;
                                                 });
                                     } else {