You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/28 15:13:12 UTC
[pulsar] 07/29: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit cc9ff5965a191d4fa727de7bab925d0c522b415f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800
[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
* [improve][broker] Avoid reconnection when a partitioned topic was created concurrently
### Motivation
When a partitioned topic was created concurrently, especially when
automatically created by many producers. This case can be reproduced
easily by configuring `allowAutoTopicCreationType=non-partitioned` and
starting a Pulsar standalone. Then, run the following code:
```java
try (PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650").build()) {
for (int i = 0; i < 10; i++) {
client.newProducer().topic("topic").createAsync();
}
Thread.sleep(1000);
}
```
We can see a lot of "Could not get connection while
getPartitionedTopicMetadata" warning logs at client side, while there
were more warning logs with full stack traces at broker side:
```
2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata [/127.0.0.1:64846] persistent://public/default/topic: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
```
It's because when broker handles the partitioned metadata command, it
calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and
will try creating a partitioned topic if it doesn't exist. It's a race
condition that if many connections are established during a short time
interval and one of them created successfully, the following will fail
with the `AlreadyExistsException`.
### Modifications
Handles the `MetadataStoreException.AlreadyExistsException` in
`unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke
`fetchPartitionedTopicMetadataAsync` to get the partitioned metadata
again.
### Verifying this change
Even if without this patch, the creation of producers could also succeed
because they will reconnect to broker again after 100 ms because broker
will return a `ServiceNotReady` error in thiss case. The only way to
verify this fix is reproducing the bug again with this patch, we can
see no reconnection will happen from the logs.
* Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently"
This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087.
* Handle AlreadyExistsException in fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync
(cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
---
.../org/apache/pulsar/broker/service/BrokerService.java | 15 ++++++++++++++-
1 file changed, 14 insertions(+), 1 deletion(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a4840d9f52f..1287076c10f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2540,7 +2540,20 @@ public class BrokerService implements Closeable {
pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
.thenAccept(md -> future.complete(md))
.exceptionally(ex -> {
- future.completeExceptionally(ex);
+ if (ex.getCause()
+ instanceof MetadataStoreException.AlreadyExistsException) {
+ // The partitioned topic might be created concurrently
+ fetchPartitionedTopicMetadataAsync(topicName)
+ .whenComplete((metadata2, ex2) -> {
+ if (ex2 == null) {
+ future.complete(metadata2);
+ } else {
+ future.completeExceptionally(ex2);
+ }
+ });
+ } else {
+ future.completeExceptionally(ex);
+ }
return null;
});
} else {