You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/09 22:07:28 UTC

[pulsar] branch master updated: Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d9e3c22b90 Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566)
8d9e3c22b90 is described below

commit 8d9e3c22b904d979a901a0c19fd6707df512b13a
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sat Sep 10 00:07:20 2022 +0200

    Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566)
    
    This reverts commit 9529850bd48557dcba124a157a69b75b8f41da3b.
---
 .../broker/service/persistent/PersistentTopic.java |   7 -
 .../broker/admin/AdminApiMultiBrokersTest.java     |  98 -----------
 .../apache/pulsar/broker/admin/NamespacesTest.java |   1 -
 .../broker/service/ExclusiveProducerTest.java      |   8 +-
 .../pulsar/client/impl/TopicsConsumerImplTest.java |  28 ++++
 .../integration/topics/TestTopicDeletion.java      | 183 ---------------------
 6 files changed, 29 insertions(+), 296 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 31dda5539a3..4d48f9c627e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1138,13 +1138,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                                 .map(PersistentSubscription::getName).toList();
                 return FutureUtil.failedFuture(
                         new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
-            } else if (TopicName.get(topic).isPartitioned()
-                    && (getProducers().size() > 0 || getNumberOfConsumers() > 0)
-                    && getBrokerService().isAllowAutoTopicCreation(topic)) {
-                // to avoid inconsistent metadata as a result
-                return FutureUtil.failedFuture(
-                        new TopicBusyException("Partitioned topic has active consumers or producers and "
-                                + "auto-creation of topic is allowed"));
             }
 
             fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
index 5a0bde6f913..2cbff955ecf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
@@ -18,8 +18,6 @@
  */
 package org.apache.pulsar.broker.admin;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
 import static org.testng.AssertJUnit.assertEquals;
 import static org.testng.AssertJUnit.assertTrue;
 
@@ -28,8 +26,6 @@ import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pulsar.broker.MultiBrokerBaseTest;
@@ -37,16 +33,9 @@ import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.loadbalance.LeaderBroker;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -65,7 +54,6 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest {
     @Override
     protected void doInitConf() throws Exception {
         super.doInitConf();
-        this.conf.setManagedLedgerMaxEntriesPerLedger(10);
     }
 
     @Override
@@ -134,90 +122,4 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest {
         Assert.assertEquals(lookupResultSet.size(), 1);
     }
 
-    @Test
-    public void testForceDeletePartitionedTopicWithSub() throws Exception {
-        final int numPartitions = 10;
-        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
-        admin.tenants().createTenant("tenant-xyz", tenantInfo);
-        admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
-
-        admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc",
-                AutoTopicCreationOverride.builder()
-                        .allowAutoTopicCreation(true)
-                        .topicType("partitioned")
-                        .defaultNumPartitions(5)
-                        .build());
-
-        RetentionPolicies retention = new RetentionPolicies(10, 10);
-        admin.namespaces().setRetention("tenant-xyz/ns-abc", retention);
-        final String topic = "persistent://tenant-xyz/ns-abc/topic-"
-                + RandomStringUtils.randomAlphabetic(5)
-                + "-testDeletePartitionedTopicWithSub";
-        final String subscriptionName = "sub";
-        ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();
-
-        log.info("Creating producer and consumer");
-        Consumer<byte[]> consumer = pulsarClient.newConsumer()
-                .topic(topic)
-                .subscriptionName(subscriptionName)
-                .subscribe();
-
-        Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();
-
-        log.info("producing messages");
-        for (int i = 0; i < numPartitions * 100; ++i) {
-            producer.newMessage()
-                    .key("" + i)
-                    .value("value-" + i)
-                    .send();
-        }
-        producer.flush();
-        producer.close();
-
-        log.info("consuming some messages");
-        for (int i = 0; i < numPartitions * 5; i++) {
-            Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
-        }
-
-        log.info("trying to delete the topic");
-        try {
-            admin.topics().deletePartitionedTopic(topic, true);
-            fail("expected PulsarAdminException.NotFoundException");
-        } catch (PulsarAdminException e) {
-            assertTrue(e.getMessage().contains("Partitioned topic has active consumers or producers"));
-        }
-
-        // check that metadata is still consistent
-        assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc")
-                .stream().filter(t -> t.contains(topic)).count());
-        assertEquals(numPartitions,
-                pulsar.getPulsarResources().getTopicResources()
-                        .getExistingPartitions(TopicName.getPartitionedTopicName(topic))
-                        .get()
-                        .stream().filter(t -> t.contains(topic)).count());
-        assertTrue(admin.topics()
-                .getPartitionedTopicList("tenant-xyz/ns-abc")
-                .contains(topic));
-
-        log.info("closing producer and consumer");
-        producer.close();
-        consumer.close();
-
-        log.info("trying to delete the topic again");
-        admin.topics().deletePartitionedTopic(topic, true);
-
-        assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc")
-                .stream().filter(t -> t.contains(topic)).count());
-        assertEquals(0,
-                pulsar.getPulsarResources().getTopicResources()
-                        .getExistingPartitions(TopicName.getPartitionedTopicName(topic))
-                        .get()
-                        .stream().filter(t -> t.contains(topic)).count());
-        assertFalse(admin.topics()
-                .getPartitionedTopicList("tenant-xyz/ns-abc")
-                .contains(topic));
-
-        log.info("trying to create the topic again");
-        ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();
-    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 8caf3a47dd3..c23407bb447 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1262,7 +1262,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         pulsarClient.updateServiceUrl(lookupUrl.toString());
         Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
         pulsar.getConfiguration().setAuthorizationEnabled(true);
-        consumer.close();
         admin.topics().deletePartitionedTopic(topicName, true);
         admin.namespaces().deleteNamespace(namespace);
         admin.tenants().deleteTenant("my-tenants");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 338ffc01807..604abd8d709 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
 import io.netty.util.HashedWheelTimer;
 import lombok.Cleanup;
 
-import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
 import org.apache.pulsar.client.api.PulsarClient;
@@ -332,12 +331,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         p1.send("msg-1");
 
         if (partitioned) {
-            try {
-                admin.topics().deletePartitionedTopic(topic, true);
-                fail("expected error because partitioned topic has active producer");
-            } catch (PulsarAdminException.ServerSideErrorException e) {
-                // expected
-            }
+            admin.topics().deletePartitionedTopic(topic, true);
         } else {
             admin.topics().delete(topic, true);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 35679711fde..8d068d65114 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1171,6 +1171,34 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         }
     }
 
+    @Test(timeOut = testTimeout)
+    public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
+        final String topicName = "persistent://public/default/issue-9585";
+        admin.topics().createPartitionedTopic(topicName, 3);
+        PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
+                .topicsPattern(topicName)
+                .subscriptionName("sub-issue-9585")
+                .subscribe();
+
+        Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
+        Assert.assertEquals(consumer.getConsumers().size(), 3);
+
+        admin.topics().deletePartitionedTopic(topicName, true);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
+            Assert.assertEquals(consumer.getConsumers().size(), 0);
+        });
+
+        admin.topics().createPartitionedTopic(topicName, 7);
+        consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
+            Assert.assertEquals(consumer.getConsumers().size(), 7);
+        });
+    }
+
+
     @Test(timeOut = testTimeout)
     public void testPartitionsUpdatesForMultipleTopics() throws Exception {
         final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
deleted file mode 100644
index 0adb414e8f4..00000000000
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.topics;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.testng.annotations.Test;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.fail;
-
-/**
- * Test cases for compaction.
- */
-@Slf4j
-public class TestTopicDeletion extends PulsarTestSuite {
-
-    final private boolean unload = false;
-    final private int numBrokers = 2;
-
-    public void setupCluster() throws Exception {
-        brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10");
-        brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false");
-        brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false");
-        this.setupCluster("");
-    }
-
-    protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
-            String clusterName,
-            PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
-        specBuilder.numBrokers(numBrokers);
-        specBuilder.enableContainerLog(true);
-        return specBuilder;
-    }
-
-    @Test(dataProvider = "ServiceUrls", timeOut=300_000)
-    public void testPartitionedTopicForceDeletion(Supplier<String> serviceUrl) throws Exception {
-
-        log.info("Creating tenant and namespace");
-
-        final String tenant = "test-partitioned-topic-" + randomName(4);
-        final String namespace = tenant + "/ns1";
-        final String topic = "persistent://" + namespace + "/partitioned-topic";
-        final int numPartitions = numBrokers * 3;
-        final int numKeys = numPartitions * 50;
-        final String subscriptionName = "sub1";
-
-        this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
-
-        this.createNamespace(namespace);
-
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                "set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace);
-
-        pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
-                "set-retention", "--size", "100M", "--time", "100m", namespace);
-
-        this.createPartitionedTopic(topic, numPartitions);
-
-        try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) {
-
-            log.info("Creating consumer");
-            Consumer<byte[]> consumer = client.newConsumer()
-                    .topic(topic)
-                    .subscriptionName(subscriptionName)
-                    .subscribe();
-
-            log.info("Producing messages");
-            try(Producer<byte[]> producer = client.newProducer()
-                .topic(topic)
-                .create()
-            ) {
-                for (int i = 0; i < numKeys; i++) {
-                    producer.newMessage()
-                        .key("" + i)
-                        .value(("value-" + i).getBytes(UTF_8))
-                        .sendAsync();
-                }
-                producer.flush();
-                log.info("Successfully wrote {} values", numKeys);
-            }
-
-            log.info("Consuming half of the messages");
-            for (int i = 0; i < numKeys / 2; i++) {
-                Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
-                log.info("Read value {}", m.getKey());
-            }
-
-            if (unload) {
-                log.info("Unloading topic");
-                pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                        "unload", topic);
-            }
-
-            ContainerExecResult res;
-            log.info("Deleting the topic");
-            try {
-                res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                        "delete-partitioned-topic", "--force", topic);
-                assertNotEquals(0, res.getExitCode());
-            } catch (ContainerExecException e) {
-                log.info("Second delete failed with ContainerExecException, could be ok", e);
-                if (!e.getMessage().contains("with error code 1")) {
-                    fail("Expected different error code");
-                }
-            }
-
-            log.info("Close the consumer and delete the topic again");
-            consumer.close();
-
-            res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
-                    "delete-partitioned-topic", "--force", topic);
-            assertNotEquals(0, res.getExitCode());
-
-            Thread.sleep(5000);
-            // should succeed
-            log.info("Creating the topic again");
-            this.createPartitionedTopic(topic, numBrokers * 2);
-        }
-    }
-
-
-    private ContainerExecResult createTenantName(final String tenantName,
-                                                 final String allowedClusterName,
-                                                 final String adminRoleName) throws Exception {
-        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
-            "tenants", "create", "--allowed-clusters", allowedClusterName,
-            "--admin-roles", adminRoleName, tenantName);
-        assertEquals(0, result.getExitCode());
-        return result;
-    }
-
-    private ContainerExecResult createNamespace(final String Ns) throws Exception {
-        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
-                "namespaces",
-                "create",
-                "--clusters",
-                pulsarCluster.getClusterName(), Ns);
-        assertEquals(0, result.getExitCode());
-        return result;
-    }
-
-    private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions)
-            throws Exception {
-        ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
-            "topics",
-            "create-partitioned-topic",
-            "--partitions", "" + numPartitions,
-            partitionedTopicName);
-        assertEquals(0, result.getExitCode());
-        return result;
-    }
-
-
-}