You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/12/28 03:49:23 UTC

[pulsar] 01/01: [fix][broker] Reject create non existent partitions.

This is an automated email from the ASF dual-hosted git repository.

mattisonchao pushed a commit to branch reject_create_non_existent_partitions
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 73547fbd0295f0758919afde7a7e776eb7b7be6a
Author: mattisonchao <ma...@apache.org>
AuthorDate: Wed Dec 28 11:46:16 2022 +0800

    [fix][broker] Reject create non existent partitions.
---
 .../pulsar/broker/service/BrokerService.java       | 28 ++++++++---------
 .../service/persistent/PersistentTopicTest.java    | 36 +++++++++++++++++-----
 2 files changed, 43 insertions(+), 21 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bcec8351733..03f803e1278 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1016,27 +1016,27 @@ public class BrokerService implements Closeable {
                 }
             }
             final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
-            if (isPersistentTopic) {
-                return topics.computeIfAbsent(topicName.toString(), (k) -> {
-                    return this.loadOrCreatePersistentTopic(k, createIfMissing, properties);
-                });
-            } else {
-                return topics.computeIfAbsent(topicName.toString(), (name) -> {
+            return topics.computeIfAbsent(topicName.toString(), (name) -> {
+                    // partitioned topic
                     if (topicName.isPartitioned()) {
                         final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
-                        return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
+                        return fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
                             if (topicName.getPartitionIndex() < metadata.partitions) {
-                                return createNonPersistentTopic(name);
+                                return isPersistentTopic ?
+                                        loadOrCreatePersistentTopic(name, createIfMissing, properties) :
+                                        createNonPersistentTopic(name);
                             }
                             return CompletableFuture.completedFuture(Optional.empty());
                         });
-                    } else if (createIfMissing) {
-                        return createNonPersistentTopic(name);
-                    } else {
-                        return CompletableFuture.completedFuture(Optional.empty());
                     }
-                });
-            }
+                    // non-partitioned topic
+                    return isPersistentTopic ?
+                            // persistent topic
+                            loadOrCreatePersistentTopic(name, createIfMissing, properties) :
+                            // non-persistent topic
+                            (createIfMissing ? createNonPersistentTopic(name) :
+                                    CompletableFuture.completedFuture(Optional.empty()));
+                    });
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
             return FutureUtil.failedFuture(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index aa05624a5b0..4c6294308d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
 import java.io.ByteArrayOutputStream;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
@@ -38,6 +39,7 @@ import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import com.google.common.collect.Multimap;
 import com.google.common.collect.Sets;
 import lombok.Cleanup;
@@ -46,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.pulsar.broker.service.BrokerTestBase;
 import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
 import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.TopicName;
@@ -128,8 +131,9 @@ public class PersistentTopicTest extends BrokerTestBase {
 
         PersistentDispatcherMultipleConsumers sharedDispatcher = (PersistentDispatcherMultipleConsumers) sharedSub
                 .getDispatcher();
-        PersistentDispatcherSingleActiveConsumer failOverDispatcher = (PersistentDispatcherSingleActiveConsumer) failOverSub
-                .getDispatcher();
+        PersistentDispatcherSingleActiveConsumer failOverDispatcher =
+                (PersistentDispatcherSingleActiveConsumer) failOverSub
+                        .getDispatcher();
 
         // build backlog
         consumer1.close();
@@ -279,14 +283,15 @@ public class PersistentTopicTest extends BrokerTestBase {
     @DataProvider(name = "topicAndMetricsLevel")
     public Object[][] indexPatternTestData() {
         return new Object[][]{
-                new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", true},
-                new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", false},
+                new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", true},
+                new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", false},
         };
     }
 
 
     @Test(dataProvider = "topicAndMetricsLevel")
-    public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception {
+    public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics)
+            throws Exception {
         PulsarClient client = pulsar.getClient();
         String namespace = TopicName.get(topic).getNamespace();
         admin.namespaces().createNamespace(namespace);
@@ -365,8 +370,8 @@ public class PersistentTopicTest extends BrokerTestBase {
         Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
 
         PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
-        PersistentSubscription persistentSubscription =  topic.getSubscription(sharedSubName);
-        PersistentSubscription persistentSubscription2 =  topic.getSubscription(failoverSubName);
+        PersistentSubscription persistentSubscription = topic.getSubscription(sharedSubName);
+        PersistentSubscription persistentSubscription2 = topic.getSubscription(failoverSubName);
 
         // `addConsumer` should update last active
         assertTrue(persistentSubscription.getCursor().getLastActive() > beforeAddConsumerTimestamp);
@@ -402,4 +407,21 @@ public class PersistentTopicTest extends BrokerTestBase {
         assertTrue(persistentSubscription.getCursor().getLastActive() > beforeRemoveConsumerTimestamp);
         assertTrue(persistentSubscription2.getCursor().getLastActive() > beforeRemoveConsumerTimestamp);
     }
+
+
+    @Test
+    public void testCreateNonExistentPartitions() throws PulsarAdminException {
+        final String topicName = "non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
+        admin.topics().createPartitionedTopic(topicName, 4);
+        TopicName partition = TopicName.get(topicName).getPartition(4);
+        try {
+            @Cleanup
+            Producer<byte[]> producer = pulsarClient.newProducer()
+                    .topic(partition.toString())
+                    .create();
+            fail("unexpected behaviour");
+        } catch (PulsarClientException ex) {
+            Assert.assertTrue(ex instanceof PulsarClientException.TopicDoesNotExistException);
+        }
+    }
 }