You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2022/07/29 03:25:14 UTC
[pulsar] branch branch-2.8 updated: Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)"
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new 3b164f5ac0c Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)"
3b164f5ac0c is described below
commit 3b164f5ac0c900d6c162596892ec7e702e48f937
Author: Yunze Xu <xy...@163.com>
AuthorDate: Fri Jul 29 11:25:00 2022 +0800
Revert "[improve][broker] Avoid reconnection when a partitioned topic was created concurrently (#16043)"
This reverts commit b01d5d2da269c5f072167e8f21d569e25a35306f.
---
.../pulsar/broker/service/BrokerService.java | 48 +++++-----------------
1 file changed, 10 insertions(+), 38 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index d943734e0a8..bdaded637b6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2412,44 +2412,16 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
.thenCompose(topicExists -> {
return fetchPartitionedTopicMetadataAsync(topicName)
.thenCompose(metadata -> {
- CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>();
-
- // There are a couple of potentially blocking calls, which we cannot make from the
- // MetadataStore callback thread.
- pulsar.getExecutor().execute(() -> {
- // If topic is already exist, creating partitioned topic is not allowed.
-
- if (metadata.partitions == 0
- && !topicExists
- && !topicName.isPartitioned()
- && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
- && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
-
- pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
- .thenAccept(md -> future.complete(md))
- .exceptionally(ex -> {
- if (ex.getCause()
- instanceof MetadataStoreException.AlreadyExistsException) {
- // The partitioned topic might be created concurrently
- fetchPartitionedTopicMetadataAsync(topicName)
- .whenComplete((metadata2, ex2) -> {
- if (ex2 == null) {
- future.complete(metadata2);
- } else {
- future.completeExceptionally(ex2);
- }
- });
- } else {
- future.completeExceptionally(ex);
- }
- return null;
- });
- } else {
- future.complete(metadata);
- }
- });
-
- return future;
+ // If topic is already exist, creating partitioned topic is not allowed.
+ if (metadata.partitions == 0
+ && !topicExists
+ && !topicName.isPartitioned()
+ && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
+ && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+ return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName);
+ } else {
+ return CompletableFuture.completedFuture(metadata);
+ }
});
});
}