You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ma...@apache.org on 2022/12/28 03:49:23 UTC
[pulsar] 01/01: [fix][broker] Reject create non existent partitions.
This is an automated email from the ASF dual-hosted git repository.
mattisonchao pushed a commit to branch reject_create_non_existent_partitions
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 73547fbd0295f0758919afde7a7e776eb7b7be6a
Author: mattisonchao <ma...@apache.org>
AuthorDate: Wed Dec 28 11:46:16 2022 +0800
[fix][broker] Reject create non existent partitions.
---
.../pulsar/broker/service/BrokerService.java | 28 ++++++++---------
.../service/persistent/PersistentTopicTest.java | 36 +++++++++++++++++-----
2 files changed, 43 insertions(+), 21 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index bcec8351733..03f803e1278 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1016,27 +1016,27 @@ public class BrokerService implements Closeable {
}
}
final boolean isPersistentTopic = topicName.getDomain().equals(TopicDomain.persistent);
- if (isPersistentTopic) {
- return topics.computeIfAbsent(topicName.toString(), (k) -> {
- return this.loadOrCreatePersistentTopic(k, createIfMissing, properties);
- });
- } else {
- return topics.computeIfAbsent(topicName.toString(), (name) -> {
+ return topics.computeIfAbsent(topicName.toString(), (name) -> {
+ // partitioned topic
if (topicName.isPartitioned()) {
final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName());
- return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
+ return fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> {
if (topicName.getPartitionIndex() < metadata.partitions) {
- return createNonPersistentTopic(name);
+ return isPersistentTopic ?
+ loadOrCreatePersistentTopic(name, createIfMissing, properties) :
+ createNonPersistentTopic(name);
}
return CompletableFuture.completedFuture(Optional.empty());
});
- } else if (createIfMissing) {
- return createNonPersistentTopic(name);
- } else {
- return CompletableFuture.completedFuture(Optional.empty());
}
- });
- }
+ // non-partitioned topic
+ return isPersistentTopic ?
+ // persistent topic
+ loadOrCreatePersistentTopic(name, createIfMissing, properties) :
+ // non-persistent topic
+ (createIfMissing ? createNonPersistentTopic(name) :
+ CompletableFuture.completedFuture(Optional.empty()));
+ });
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topicName, e);
return FutureUtil.failedFuture(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
index aa05624a5b0..4c6294308d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -29,6 +29,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
+import static org.testng.Assert.fail;
import java.io.ByteArrayOutputStream;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
@@ -38,6 +39,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import com.google.common.collect.Multimap;
import com.google.common.collect.Sets;
import lombok.Cleanup;
@@ -46,6 +48,7 @@ import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.pulsar.broker.service.BrokerTestBase;
import org.apache.pulsar.broker.stats.PrometheusMetricsTest;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsGenerator;
+import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
@@ -128,8 +131,9 @@ public class PersistentTopicTest extends BrokerTestBase {
PersistentDispatcherMultipleConsumers sharedDispatcher = (PersistentDispatcherMultipleConsumers) sharedSub
.getDispatcher();
- PersistentDispatcherSingleActiveConsumer failOverDispatcher = (PersistentDispatcherSingleActiveConsumer) failOverSub
- .getDispatcher();
+ PersistentDispatcherSingleActiveConsumer failOverDispatcher =
+ (PersistentDispatcherSingleActiveConsumer) failOverSub
+ .getDispatcher();
// build backlog
consumer1.close();
@@ -279,14 +283,15 @@ public class PersistentTopicTest extends BrokerTestBase {
@DataProvider(name = "topicAndMetricsLevel")
public Object[][] indexPatternTestData() {
return new Object[][]{
- new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", true},
- new Object[] {"persistent://prop/autoNs/test_delayed_message_metric", false},
+ new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", true},
+ new Object[]{"persistent://prop/autoNs/test_delayed_message_metric", false},
};
}
@Test(dataProvider = "topicAndMetricsLevel")
- public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics) throws Exception {
+ public void testDelayedDeliveryTrackerMemoryUsageMetric(String topic, boolean exposeTopicLevelMetrics)
+ throws Exception {
PulsarClient client = pulsar.getClient();
String namespace = TopicName.get(topic).getNamespace();
admin.namespaces().createNamespace(namespace);
@@ -365,8 +370,8 @@ public class PersistentTopicTest extends BrokerTestBase {
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topicName).get();
- PersistentSubscription persistentSubscription = topic.getSubscription(sharedSubName);
- PersistentSubscription persistentSubscription2 = topic.getSubscription(failoverSubName);
+ PersistentSubscription persistentSubscription = topic.getSubscription(sharedSubName);
+ PersistentSubscription persistentSubscription2 = topic.getSubscription(failoverSubName);
// `addConsumer` should update last active
assertTrue(persistentSubscription.getCursor().getLastActive() > beforeAddConsumerTimestamp);
@@ -402,4 +407,21 @@ public class PersistentTopicTest extends BrokerTestBase {
assertTrue(persistentSubscription.getCursor().getLastActive() > beforeRemoveConsumerTimestamp);
assertTrue(persistentSubscription2.getCursor().getLastActive() > beforeRemoveConsumerTimestamp);
}
+
+
+ @Test
+ public void testCreateNonExistentPartitions() throws PulsarAdminException {
+ final String topicName = "non-persistent://prop/ns-abc/testCreateNonExistentPartitions";
+ admin.topics().createPartitionedTopic(topicName, 4);
+ TopicName partition = TopicName.get(topicName).getPartition(4);
+ try {
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(partition.toString())
+ .create();
+ fail("unexpected behaviour");
+ } catch (PulsarClientException ex) {
+ Assert.assertTrue(ex instanceof PulsarClientException.TopicDoesNotExistException);
+ }
+ }
}