You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/27 08:36:31 UTC
[pulsar] 09/10: [improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b01d5d2da269c5f072167e8f21d569e25a35306f
Author: Yunze Xu <xy...@163.com>
AuthorDate: Tue Jun 14 17:03:37 2022 +0800
[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)
* [improve][broker] Avoid reconnection when a partitioned topic was created concurrently
### Motivation
When a partitioned topic was created concurrently, especially when
automatically created by many producers. This case can be reproduced
easily by configuring `allowAutoTopicCreationType=non-partitioned` and
starting a Pulsar standalone. Then, run the following code:
```java
try (PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650").build()) {
for (int i = 0; i < 10; i++) {
client.newProducer().topic("topic").createAsync();
}
Thread.sleep(1000);
}
```
We can see a lot of "Could not get connection while
getPartitionedTopicMetadata" warning logs at client side, while there
were more warning logs with full stack traces at broker side:
```
2022-06-14T02:04:20,522+0800 [metadata-store-22-1] WARN org.apache.pulsar.broker.service.ServerCnx - Failed to get Partitioned Metadata [/127.0.0.1:64846] persistent://public/default/topic: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
org.apache.pulsar.metadata.api.MetadataStoreException$AlreadyExistsException: org.apache.pulsar.metadata.api.MetadataStoreException$BadVersionException: org.apache.zookeeper.KeeperException$BadVersionException: KeeperErrorCode = BadVersion for /admin/partitioned-topics/public/default/persistent/topic
```
It's because when broker handles the partitioned metadata command, it
calls `fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync` and
will try creating a partitioned topic if it doesn't exist. It's a race
condition that if many connections are established during a short time
interval and one of them created successfully, the following will fail
with the `AlreadyExistsException`.
### Modifications
Handles the `MetadataStoreException.AlreadyExistsException` in
`unsafeGetPartitionedTopicMetadataAsync`. In this case, invoke
`fetchPartitionedTopicMetadataAsync` to get the partitioned metadata
again.
### Verifying this change
Even if without this patch, the creation of producers could also succeed
because they will reconnect to broker again after 100 ms because broker
will return a `ServiceNotReady` error in thiss case. The only way to
verify this fix is reproducing the bug again with this patch, we can
see no reconnection will happen from the logs.
* Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently"
This reverts commit c259c0fdcfb299e6ed861796f7e2ab50632f9087.
* Handle AlreadyExistsException in fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync
(cherry picked from commit 2a7a8555c0b0296bcaa6a757a8646b8f65185ac6)
---
.../pulsar/broker/service/BrokerService.java | 48 +++++++++++++++++-----
1 file changed, 38 insertions(+), 10 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bdaded637b6..d943734e0a8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2412,16 +2412,44 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
.thenCompose(topicExists -> {
return fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
- // If topic is already exist, creating partitioned topic is not allowed.
- if (metadata.partitions == 0
- && !topicExists
- && !topicName.isPartitioned()
- && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
- && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
- return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
- } else {
- return CompletableFuture.completedFuture(metadata);
- }
+ CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
+
+ // There are a couple of potentially blocking calls, which we cannot make from the
+ // MetadataStore callback thread.
+ pulsar.getExecutor().execute(() -> {
+ // If topic is already exist, creating partitioned topic is not allowed.
+
+ if (metadata.partitions == 0
+ && !topicExists
+ && !topicName.isPartitioned()
+ && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+ && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+
+ pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+ .thenAccept(md -> future.complete(md))
+ .exceptionally(ex -> {
+ if (ex.getCause()
+ instanceof MetadataStoreException.AlreadyExistsException) {
+ // The partitioned topic might be created concurrently
+ fetchPartitionedTopicMetadataAsync(topicName)
+ .whenComplete((metadata2, ex2) -> {
+ if (ex2 == null) {
+ future.complete(metadata2);
+ } else {
+ future.completeExceptionally(ex2);
+ }
+ });
+ } else {
+ future.completeExceptionally(ex);
+ }
+ return null;
+ });
+ } else {
+ future.complete(metadata);
+ }
+ });
+
+ return future;
});
});
}