You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2023/11/17 01:29:07 UTC

(pulsar) branch branch-3.0 updated: [fix][broker] Fix create topic with different auto creation strategies causes race condition (#21545)

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 48560b7850c [fix][broker] Fix create topic with different auto creation strategies causes race condition (#21545)
48560b7850c is described below

commit 48560b7850cd17bf867165955b7b18a742dd5100
Author: Qiang Zhao <ma...@apache.org>
AuthorDate: Fri Nov 10 13:30:05 2023 +0800

    [fix][broker] Fix create topic with different auto creation strategies causes race condition (#21545)
    
    (cherry picked from commit 3c067ce28025e116146977118312a1471ba284f5)
---
 .../pulsar/broker/service/BrokerService.java       | 14 +++++-
 .../apache/pulsar/broker/admin/AdminApi2Test.java  |  2 +-
 .../pulsar/broker/admin/TopicAutoCreationTest.java | 57 ++++++++++++++++++++++
 .../service/persistent/PersistentTopicTest.java    |  3 +-
 4 files changed, 72 insertions(+), 4 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 4d02e7e8773..c19c292ca5b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1074,10 +1074,22 @@ public class BrokerService implements Closeable {
                                             return loadOrCreatePersistentTopic(tpName, createIfMissing,
                                                     properties, topicPolicies);
                                         }
-                                        return CompletableFuture.completedFuture(Optional.empty());
+                                        final String errorMsg =
+                                                String.format("Illegal topic partition name %s with max allowed "
+                                                        + "%d partitions", topicName, metadata.partitions);
+                                        log.warn(errorMsg);
+                                        return FutureUtil
+                                                .failedFuture(new BrokerServiceException.NotAllowedException(errorMsg));
                                     });
                         }
                         return loadOrCreatePersistentTopic(tpName, createIfMissing, properties, topicPolicies);
+                    }).thenCompose(optionalTopic -> {
+                        if (!optionalTopic.isPresent() && createIfMissing) {
+                            log.warn("[{}] Try to recreate the topic with createIfMissing=true "
+                                    + "but the returned topic is empty", topicName);
+                            return getTopic(topicName, createIfMissing, properties);
+                        }
+                        return CompletableFuture.completedFuture(optionalTopic);
                     });
                 });
             } else {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index b4fcd21df37..f2bcea96fbc 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -3198,7 +3198,7 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
             admin.topics().createSubscription(partitionedTopicName + "-partition-" + startPartitions, subName1,
                     MessageId.earliest);
             fail("Unexpected behaviour");
-        } catch (PulsarAdminException.PreconditionFailedException ex) {
+        } catch (PulsarAdminException.ConflictException ex) {
             // OK
         }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 590edc2d3f3..c9138beee52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -27,7 +27,10 @@ import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import lombok.Cleanup;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -40,6 +43,7 @@ import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -55,6 +59,7 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
         conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
         conf.setAllowAutoTopicCreation(true);
         conf.setDefaultNumPartitions(3);
+        conf.setForceDeleteNamespaceAllowed(true);
         super.internalSetup();
         super.producerBaseSetup();
     }
@@ -186,4 +191,56 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
         }
 
     }
+
+    @Test
+    public void testClientWithAutoCreationGotNotFoundException() throws PulsarAdminException, PulsarClientException {
+        final String namespace = "public/test_1";
+        final String topicName = "persistent://public/test_1/test_auto_creation_got_not_found"
+                + System.currentTimeMillis();
+        final int retryTimes = 30;
+        admin.namespaces().createNamespace(namespace);
+        admin.namespaces().setAutoTopicCreation(namespace, AutoTopicCreationOverride.builder()
+                .allowAutoTopicCreation(true)
+                .topicType("non-partitioned")
+                .build());
+
+        @Cleanup("shutdown")
+        final ExecutorService executor1 = Executors.newSingleThreadExecutor();
+
+        @Cleanup("shutdown")
+        final ExecutorService executor2 = Executors.newSingleThreadExecutor();
+
+        for (int i = 0; i < retryTimes; i++) {
+            final CompletableFuture<Void> adminListSub = CompletableFuture.runAsync(() -> {
+                try {
+                    admin.topics().getSubscriptions(topicName);
+                } catch (PulsarAdminException e) {
+                    throw new RuntimeException(e);
+                }
+            }, executor1);
+
+            final CompletableFuture<Consumer<byte[]>> consumerSub = CompletableFuture.supplyAsync(() -> {
+                try {
+                    return pulsarClient.newConsumer()
+                            .topic(topicName)
+                            .subscriptionName("sub-1")
+                            .subscribe();
+                } catch (PulsarClientException e) {
+                    throw new RuntimeException(e);
+                }
+            }, executor2);
+
+            try {
+                adminListSub.join();
+            } catch (Throwable ex) {
+                // we don't care the exception.
+            }
+
+            consumerSub.join().close();
+            admin.topics().delete(topicName, true);
+        }
+
+        admin.namespaces().deleteNamespace(namespace, true);
+    }
+
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index ac2727e33eb..5db17d7ecfa 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -457,8 +457,7 @@ public class PersistentTopicTest extends BrokerTestBase {
                     .topic(partition.toString())
                     .create();
             fail("unexpected behaviour");
-        } catch (PulsarClientException.TopicDoesNotExistException ignored) {
-
+        } catch (PulsarClientException.NotAllowedException ex) {
         }
         Assert.assertEquals(admin.topics().getPartitionedTopicMetadata(topicName).partitions, 4);
     }