You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/15 15:38:27 UTC

[pulsar] branch master updated: [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new af983049ccd [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609)
af983049ccd is described below

commit af983049ccd52da1e795032d9a7ba674c6df4b04
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Thu Sep 15 17:38:19 2022 +0200

    [bugfix] Prevent Automatic Topic Creation during namespace deletion (#17609)
---
 .../pulsar/broker/service/BrokerService.java       | 44 +++++++---
 .../apache/pulsar/broker/admin/AdminApi2Test.java  | 34 ++++++++
 .../pulsar/broker/admin/TopicAutoCreationTest.java | 99 ++++++++++++++++++++++
 3 files changed, 163 insertions(+), 14 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 17ce9c1ee5a..77cc8f11ff5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2766,6 +2766,9 @@ public class BrokerService implements Closeable {
         if (pulsar.getNamespaceService() == null) {
             return FutureUtil.failedFuture(new NamingException("namespace service is not ready"));
         }
+        Optional<Policies> policies =
+                pulsar.getPulsarResources().getNamespaceResources()
+                        .getPoliciesIfCached(topicName.getNamespaceObject());
         return pulsar.getNamespaceService().checkTopicExists(topicName)
                 .thenCompose(topicExists -> {
                     return fetchPartitionedTopicMetadataAsync(topicName)
@@ -2780,10 +2783,12 @@ public class BrokerService implements Closeable {
                                     if (metadata.partitions == 0
                                             && !topicExists
                                             && !topicName.isPartitioned()
-                                            && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName)
-                                            && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) {
+                                            && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName, policies)
+                                            && pulsar.getBrokerService()
+                                                            .isDefaultTopicTypePartitioned(topicName, policies)) {
 
-                                        pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName)
+                                        pulsar.getBrokerService()
+                                                .createDefaultPartitionedTopicAsync(topicName, policies)
                                                 .thenAccept(md -> future.complete(md))
                                                 .exceptionally(ex -> {
                                                     if (ex.getCause()
@@ -2813,8 +2818,9 @@ public class BrokerService implements Closeable {
     }
 
     @SuppressWarnings("deprecation")
-    private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) {
-        final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName);
+    private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName,
+                                                                                        Optional<Policies> policies) {
+        final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName, policies);
         final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic();
         checkArgument(defaultNumPartitions > 0,
                 "Default number of partitions should be more than 0");
@@ -3000,11 +3006,23 @@ public class BrokerService implements Closeable {
     }
 
     public boolean isAllowAutoTopicCreation(final TopicName topicName) {
+        Optional<Policies> policies =
+                pulsar.getPulsarResources().getNamespaceResources()
+                        .getPoliciesIfCached(topicName.getNamespaceObject());
+        return isAllowAutoTopicCreation(topicName, policies);
+    }
+
+    public boolean isAllowAutoTopicCreation(final TopicName topicName, final Optional<Policies> policies) {
+        if (policies.isPresent() && policies.get().deleted) {
+            log.info("Preventing AutoTopicCreation on a namespace that is being deleted {}",
+                    topicName.getNamespaceObject());
+            return false;
+        }
         //System topic can always be created automatically
         if (pulsar.getConfiguration().isSystemTopicEnabled() && isSystemTopic(topicName)) {
             return true;
         }
-        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
         if (autoTopicCreationOverride != null) {
             return autoTopicCreationOverride.isAllowAutoTopicCreation();
         } else {
@@ -3012,8 +3030,8 @@ public class BrokerService implements Closeable {
         }
     }
 
-    public boolean isDefaultTopicTypePartitioned(final TopicName topicName) {
-        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+    public boolean isDefaultTopicTypePartitioned(final TopicName topicName, final Optional<Policies> policies) {
+        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
         if (autoTopicCreationOverride != null) {
             return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType());
         } else {
@@ -3021,8 +3039,8 @@ public class BrokerService implements Closeable {
         }
     }
 
-    public int getDefaultNumPartitions(final TopicName topicName) {
-        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName);
+    public int getDefaultNumPartitions(final TopicName topicName, final Optional<Policies> policies) {
+        AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName, policies);
         if (autoTopicCreationOverride != null) {
             return autoTopicCreationOverride.getDefaultNumPartitions();
         } else {
@@ -3030,10 +3048,8 @@ public class BrokerService implements Closeable {
         }
     }
 
-    private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) {
-        Optional<Policies> policies =
-                pulsar.getPulsarResources().getNamespaceResources()
-                        .getPoliciesIfCached(topicName.getNamespaceObject());
+    private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName,
+                                                                   Optional<Policies> policies) {
         // If namespace policies have the field set, it will override the broker-level setting
         if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) {
             return policies.get().autoTopicCreationOverride;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index 4171abc00b7..92eac8342f4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -1846,6 +1846,40 @@ public class AdminApi2Test extends MockedPulsarServiceBaseTest {
         }
     }
 
+    @Test
+    public void testForceDeleteNamespaceWithAutomaticTopicCreation() throws Exception {
+        conf.setForceDeleteNamespaceAllowed(true);
+        final String namespaceName = "prop-xyz2/ns1";
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+        admin.tenants().createTenant("prop-xyz2", tenantInfo);
+        admin.namespaces().createNamespace(namespaceName, 1);
+        admin.namespaces().setAutoTopicCreation(namespaceName,
+                AutoTopicCreationOverride.builder()
+                        .allowAutoTopicCreation(true)
+                        .topicType("partitioned")
+                        .defaultNumPartitions(20)
+                        .build());
+        final String topic = "persistent://" + namespaceName + "/test" + UUID.randomUUID();
+
+        // start a consumer, that creates the topic
+        try (Consumer<Double> consumer = pulsarClient.newConsumer(Schema.DOUBLE).topic(topic)
+                .subscriptionName("test").autoUpdatePartitions(true).subscribe()) {
+
+            // wait for the consumer to settle
+            Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                    assertNotNull(admin.topics().getSubscriptions(topic).contains("test")));
+
+            // verify that the partitioned topic is created
+            assertEquals(20, admin.topics().getPartitionedTopicMetadata(topic).partitions);
+
+            // the consumer will race with the deletion
+            // the consumer will try to re-create the partitions
+            admin.namespaces().deleteNamespace(namespaceName, true);
+
+            assertFalse(admin.namespaces().getNamespaces("prop-xyz2").contains("ns1"));
+        }
+    }
+
     @Test
     public void testUpdateClusterWithProxyUrl() throws Exception {
         ClusterData cluster = ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index 09335d43302..7bd15992f64 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -19,18 +19,34 @@
 
 package org.apache.pulsar.broker.admin;
 
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.client.api.ClientBuilder;
+import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.LookupService;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
+import org.powermock.reflect.Whitebox;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
+@Slf4j
 public class TopicAutoCreationTest extends ProducerConsumerBase {
 
     @Override
@@ -43,6 +59,11 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
         super.producerBaseSetup();
     }
 
+    @Override
+    protected void customizeNewPulsarClientBuilder(ClientBuilder clientBuilder) {
+        clientBuilder.operationTimeout(2, TimeUnit.SECONDS);
+    }
+
     @Override
     @AfterMethod(alwaysRun = true)
     protected void cleanup() throws Exception {
@@ -85,4 +106,82 @@ public class TopicAutoCreationTest extends ProducerConsumerBase {
 
         producer.close();
     }
+
+
+    @Test
+    public void testPartitionedTopicAutoCreationForbiddenDuringNamespaceDeletion()
+            throws Exception {
+        final String namespaceName = "my-property/my-ns";
+        final String topic = "persistent://" + namespaceName + "/test-partitioned-topi-auto-creation-"
+                + UUID.randomUUID().toString();
+
+        pulsar.getPulsarResources().getNamespaceResources()
+                .setPolicies(NamespaceName.get(namespaceName), old -> {
+            old.deleted = true;
+            return old;
+        });
+
+
+        LookupService original = Whitebox.getInternalState(pulsarClient, "lookup");
+        try {
+
+            // we want to skip the "lookup" phase, because it is blocked by the HTTP API
+            LookupService mockLookup = mock(LookupService.class);
+            Whitebox.setInternalState(pulsarClient, "lookup", mockLookup);
+            when(mockLookup.getPartitionedTopicMetadata(any())).thenAnswer(i -> {
+                return CompletableFuture.completedFuture(new PartitionedTopicMetadata(0));
+            });
+            when(mockLookup.getBroker(any())).thenAnswer(i -> {
+                InetSocketAddress brokerAddress =
+                        new InetSocketAddress(pulsar.getAdvertisedAddress(), pulsar.getBrokerListenPort().get());
+                return CompletableFuture.completedFuture(Pair.of(brokerAddress, brokerAddress));
+            });
+
+            // Creating a producer and creating a Consumer may trigger automatic topic
+            // creation, let's try to create a Producer and a Consumer
+            try (Producer<byte[]> producer = pulsarClient.newProducer()
+                    .sendTimeout(1, TimeUnit.SECONDS)
+                    .topic(topic)
+                    .create();) {
+            } catch (PulsarClientException.LookupException expected) {
+                String msg = "Namespace bundle for topic (%s) not served by this instance";
+                log.info("Expected error", expected);
+                assertTrue(expected.getMessage().contains(String.format(msg, topic)));
+            }
+
+            try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionName("test")
+                    .subscribe();) {
+            } catch (PulsarClientException.LookupException expected) {
+                String msg = "Namespace bundle for topic (%s) not served by this instance";
+                log.info("Expected error", expected);
+                assertTrue(expected.getMessage().contains(String.format(msg, topic)));
+            }
+
+
+            // verify that the topic does not exist
+            pulsar.getPulsarResources().getNamespaceResources()
+                    .setPolicies(NamespaceName.get(namespaceName), old -> {
+                        old.deleted = false;
+                        return old;
+                    });
+
+            admin.topics().getList(namespaceName).isEmpty();
+
+            // create now the topic using auto creation
+            Whitebox.setInternalState(pulsarClient, "lookup", original);
+
+            try (Consumer<byte[]> consumer = pulsarClient.newConsumer()
+                    .topic(topic)
+                    .subscriptionName("test")
+                    .subscribe();) {
+            }
+
+            admin.topics().getList(namespaceName).contains(topic);
+        } finally {
+            Whitebox.setInternalState(pulsarClient, "lookup", original);
+        }
+
+    }
 }