You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2023/11/17 01:29:07 UTC
(pulsar) branch branch-3.0 updated: [fix][broker] Fix create topic with different auto creation strategies causes race condition (#21545)
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 48560b7850c [fix][broker] Fix create topic with different auto creation strategies causes race condition (#21545)
48560b7850c is described below
commit 48560b7850cd17bf867165955b7b18a742dd5100
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Fri Nov 10 13:30:05 2023 +0800
[fix][broker] Fix create topic with different auto creation strategies causes race condition (#21545)
(cherry picked from commit 3c067ce28025e116146977118312a1471ba284f5)
---
.../pulsar/broker/service/BrokerService.java | 14 +++++-
.../apache/pulsar/broker/admin/AdminApi2Test.java | 2 +-
.../pulsar/broker/admin/TopicAutoCreationTest.java | 57 ++++++++++++++++++++++
.../service/persistent/PersistentTopicTest.java | 3 +-
4 files changed, 72 insertions(+), 4 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4d02e7e8773..c19c292ca5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1074,10 +1074,22 @@ public class BrokerService implements Closeable {
return loadOrCreatePersistentTopic(tpName, createIfMissing,
properties, topicPolicies);
}
- return CompletableFuture.completedFuture(Optional.empty());
+ final String errorMsg =
+ String.format("Illegal topic partition name %s with max allowed "
+ + "%d partitions", topicName, metadata.partitions);
+ log.warn(errorMsg);
+ return FutureUtil
+ .failedFuture(new BrokerServiceException.NotAllowedException(errorMsg));
});
}
return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
+ }).thenCompose(optionalTopic -> {
+ if (!optionalTopic.isPresent() && createIfMissing) {
+ log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+ + "but the returned topic is empty", topicName);
+ return getTopic(topicName, createIfMissing, properties);
+ }
+ return CompletableFuture.completedFuture(optionalTopic);
});
});
} else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index b4fcd21df37..f2bcea96fbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3198,7 +3198,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
MessageId.earliest);
fail("Unexpected behaviour");
- } catch (PulsarAdminException.PreconditionFailedException ex) {
+ } catch (PulsarAdminException.ConflictException ex) {
// OK
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 590edc2d3f3..c9138beee52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -27,7 +27,10 @@ import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -40,6 +43,7 @@ import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -55,6 +59,7 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setAllowAutoTopicCreation(true);
conf.setDefaultNumPartitions(3);
+ conf.setForceDeleteNamespaceAllowed(true);
super.internalSetup();
super.producerBaseSetup();
}
@@ -186,4 +191,56 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
}
}
+
+ @Test
+ public void testClientWithAutoCreationGotNotFoundException() throws PulsarAdminException, PulsarClientException {
+ final String namespace = "public/test_1";
+ final String topicName = "persistent://public/test_1/test_auto_creation_got_not_found"
+ + System.currentTimeMillis();
+ final int retryTimes = 30;
+ admin.namespaces().createNamespace(namespace);
+ admin.namespaces().setAutoTopicCreation(namespace, AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType("non-partitioned")
+ .build());
+
+ @Cleanup("shutdown")
+ final ExecutorService executor1 = Executors.newSingleThreadExecutor();
+
+ @Cleanup("shutdown")
+ final ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+ for (int i = 0; i < retryTimes; i++) {
+ final CompletableFuture<Void> adminListSub = CompletableFuture.runAsync(() -> {
+ try {
+ admin.topics().getSubscriptions(topicName);
+ } catch (PulsarAdminException e) {
+ throw new RuntimeException(e);
+ }
+ }, executor1);
+
+ final CompletableFuture<Consumer<byte[]>> consumerSub = CompletableFuture.supplyAsync(() -> {
+ try {
+ return pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionName("sub-1")
+ .subscribe();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException(e);
+ }
+ }, executor2);
+
+ try {
+ adminListSub.join();
+ } catch (Throwable ex) {
+ // we don't care the exception.
+ }
+
+ consumerSub.join().close();
+ admin.topics().delete(topicName, true);
+ }
+
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ac2727e33eb..5db17d7ecfa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -457,8 +457,7 @@ public class PersistentTopicTest extends BrokerTestBase {
.topic(partition.toString())
.create();
fail("unexpected behaviour");
- } catch (PulsarClientException.TopicDoesNotExistException ignored) {
-
+ } catch (PulsarClientException.NotAllowedException ex) {
}
Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4);
}