You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/15 15:38:27 UTC
[pulsar] branch master updated: [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new af983049ccd [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609)
af983049ccd is described below
commit af983049ccd52da1e795032d9a7ba674c6df4b04
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Sep 15 17:38:19 2022 +0200
[bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609)
---
.../pulsar/broker/service/BrokerService.java | 44 +++++++---
.../apache/pulsar/broker/admin/AdminApi2Test.java | 34 ++++++++
.../pulsar/broker/admin/TopicAutoCreationTest.java | 99 ++++++++++++++++++++++
3 files changed, 163 insertions(+), 14 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 17ce9c1ee5a..77cc8f11ff5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2766,6 +2766,9 @@ public class BrokerService implements Closeable {
if (pulsar.getNamespaceService() == null) {
return FutureUtil.failedFuture(new NamingException("namespace service is not ready"));
}
+ Optional<Policies> policies =
+ pulsar.getPulsarResources().getNamespaceResources()
+ .getPoliciesIfCached(topicName.getNamespaceObject());
return pulsar.getNamespaceService().checkTopicExists(topicName)
.thenCompose(topicExists -> {
return fetchPartitionedTopicMetadataAsync(topicName)
@@ -2780,10 +2783,12 @@ public class BrokerService implements Closeable {
if (metadata.partitions == 0
&& !topicExists
&& !topicName.isPartitioned()
- && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
- && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+ && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies)
+ && pulsar.getBrokerService()
+ .isDefaultTopicTypePartitioned(topicName, policies)) {
- pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+ pulsar.getBrokerService()
+ .createDefaultPartitionedTopicAsync(topicName, policies)
.thenAccept(md -> future.complete(md))
.exceptionally(ex -> {
if (ex.getCause()
@@ -2813,8 +2818,9 @@ public class BrokerService implements Closeable {
}
@SuppressWarnings("deprecation")
- private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
- final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
+ private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName,
+ Optional<Policies> policies) {
+ final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
checkArgument(defaultNumPartitions > 0,
"Default number of partitions should be more than 0");
@@ -3000,11 +3006,23 @@ public class BrokerService implements Closeable {
}
public boolean isAllowAutoTopicCreation(final TopicName topicName) {
+ Optional<Policies> policies =
+ pulsar.getPulsarResources().getNamespaceResources()
+ .getPoliciesIfCached(topicName.getNamespaceObject());
+ return isAllowAutoTopicCreation(topicName, policies);
+ }
+
+ public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional<Policies> policies) {
+ if (policies.isPresent() && policies.get().deleted) {
+ log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}",
+ topicName.getNamespaceObject());
+ return false;
+ }
//System topic can always be created automatically
if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
return true;
}
- AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+ AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.isAllowAutoTopicCreation();
} else {
@@ -3012,8 +3030,8 @@ public class BrokerService implements Closeable {
}
}
- public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
- AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+ public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional<Policies> policies) {
+ AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType());
} else {
@@ -3021,8 +3039,8 @@ public class BrokerService implements Closeable {
}
}
- public int getDefaultNumPartitions(final TopicName topicName) {
- AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+ public int getDefaultNumPartitions(final TopicName topicName, final Optional<Policies> policies) {
+ AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
if (autoTopicCreationOverride != null) {
return autoTopicCreationOverride.getDefaultNumPartitions();
} else {
@@ -3030,10 +3048,8 @@ public class BrokerService implements Closeable {
}
}
- private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
- Optional<Policies> policies =
- pulsar.getPulsarResources().getNamespaceResources()
- .getPoliciesIfCached(topicName.getNamespaceObject());
+ private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName,
+ Optional<Policies> policies) {
// If namespace policies have the field set, it will override the broker-level setting
if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
return policies.get().autoTopicCreationOverride;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 4171abc00b7..92eac8342f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1846,6 +1846,40 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
}
}
+ @Test
+ public void testForceDeleteNamespaceWithAutomaticTopicCreation() throws Exception {
+ conf.setForceDeleteNamespaceAllowed(true);
+ final String namespaceName = "prop-xyz2/ns1";
+ TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+ admin.tenants().createTenant("prop-xyz2", tenantInfo);
+ admin.namespaces().createNamespace(namespaceName, 1);
+ admin.namespaces().setAutoTopicCreation(namespaceName,
+ AutoTopicCreationOverride.builder()
+ .allowAutoTopicCreation(true)
+ .topicType("partitioned")
+ .defaultNumPartitions(20)
+ .build());
+ final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();
+
+ // start a consumer, that creates the topic
+ try (Consumer<Double> consumer = pulsarClient.newConsumer(Schema.DOUBLE).topic(topic)
+ .subscriptionName("test").autoUpdatePartitions(true).subscribe()) {
+
+ // wait for the consumer to settle
+ Awaitility.await().ignoreExceptions().untilAsserted(() ->
+ assertNotNull(admin.topics().getSubscriptions(topic).contains("test")));
+
+ // verify that the partitioned topic is created
+ assertEquals(20, admin.topics().getPartitionedTopicMetadata(topic).partitions);
+
+ // the consumer will race with the deletion
+ // the consumer will try to re-create the partitions
+ admin.namespaces().deleteNamespace(namespaceName, true);
+
+ assertFalse(admin.namespaces().getNamespaces("prop-xyz2").contains("ns1"));
+ }
+ }
+
@Test
public void testUpdateClusterWithProxyUrl() throws Exception {
ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 09335d43302..7bd15992f64 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -19,18 +19,34 @@
package org.apache.pulsar.broker.admin;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.powermock.reflect.Whitebox;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
+@Slf4j
public class TopicAutoCreationTest extends ProducerConsumerBase {
@Override
@@ -43,6 +59,11 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
super.producerBaseSetup();
}
+ @Override
+ protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+ clientBuilder.operationTimeout(2, TimeUnit.SECONDS);
+ }
+
@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
@@ -85,4 +106,82 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
producer.close();
}
+
+
+ @Test
+ public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
+ throws Exception {
+ final String namespaceName = "my-property/my-ns";
+ final String topic = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-"
+ + UUID.randomUUID().toString();
+
+ pulsar.getPulsarResources().getNamespaceResources()
+ .setPolicies(NamespaceName.get(namespaceName), old -> {
+ old.deleted = true;
+ return old;
+ });
+
+
+ LookupService original = Whitebox.getInternalState(pulsarClient, "lookup");
+ try {
+
+ // we want to skip the "lookup" phase, because it is blocked by the HTTP API
+ LookupService mockLookup = mock(LookupService.class);
+ Whitebox.setInternalState(pulsarClient, "lookup", mockLookup);
+ when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> {
+ return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
+ });
+ when(mockLookup.getBroker(any())).thenAnswer(i -> {
+ InetSocketAddress brokerAddress =
+ new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
+ return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
+ });
+
+ // Creating a producer and creating a Consumer may trigger automatic topic
+ // creation, let's try to create a Producer and a Consumer
+ try (Producer<byte[]> producer = pulsarClient.newProducer()
+ .sendTimeout(1, TimeUnit.SECONDS)
+ .topic(topic)
+ .create();) {
+ } catch (PulsarClientException.LookupException expected) {
+ String msg = "Namespace bundle for topic (%s) not served by this instance";
+ log.info("Expected error", expected);
+ assertTrue(expected.getMessage().contains(String.format(msg, topic)));
+ }
+
+ try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();) {
+ } catch (PulsarClientException.LookupException expected) {
+ String msg = "Namespace bundle for topic (%s) not served by this instance";
+ log.info("Expected error", expected);
+ assertTrue(expected.getMessage().contains(String.format(msg, topic)));
+ }
+
+
+ // verify that the topic does not exist
+ pulsar.getPulsarResources().getNamespaceResources()
+ .setPolicies(NamespaceName.get(namespaceName), old -> {
+ old.deleted = false;
+ return old;
+ });
+
+ admin.topics().getList(namespaceName).isEmpty();
+
+ // create now the topic using auto creation
+ Whitebox.setInternalState(pulsarClient, "lookup", original);
+
+ try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+ .topic(topic)
+ .subscriptionName("test")
+ .subscribe();) {
+ }
+
+ admin.topics().getList(namespaceName).contains(topic);
+ } finally {
+ Whitebox.setInternalState(pulsarClient, "lookup", original);
+ }
+
+ }
}