You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/09 22:07:28 UTC
[pulsar] branch master updated: Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8d9e3c22b90 Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566)
8d9e3c22b90 is described below
commit 8d9e3c22b904d979a901a0c19fd6707df512b13a
Author: Enrico Olivelli <eo...@apache.org>
AuthorDate: Sat Sep 10 00:07:20 2022 +0200
Revert "[fix] Combination of autocreate + forced delete of partitioned topic with active consumer leaves topic metadata inconsistent. (#17308)" (#17566)
This reverts commit 9529850bd48557dcba124a157a69b75b8f41da3b.
---
.../broker/service/persistent/PersistentTopic.java | 7 -
.../broker/admin/AdminApiMultiBrokersTest.java | 98 -----------
.../apache/pulsar/broker/admin/NamespacesTest.java | 1 -
.../broker/service/ExclusiveProducerTest.java | 8 +-
.../pulsar/client/impl/TopicsConsumerImplTest.java | 28 ++++
.../integration/topics/TestTopicDeletion.java | 183 ---------------------
6 files changed, 29 insertions(+), 296 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 31dda5539a3..4d48f9c627e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1138,13 +1138,6 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
.map(PersistentSubscription::getName).toList();
return FutureUtil.failedFuture(
new TopicBusyException("Topic has subscriptions did not catch up: " + backlogSubs));
- } else if (TopicName.get(topic).isPartitioned()
- && (getProducers().size() > 0 || getNumberOfConsumers() > 0)
- && getBrokerService().isAllowAutoTopicCreation(topic)) {
- // to avoid inconsistent metadata as a result
- return FutureUtil.failedFuture(
- new TopicBusyException("Partitioned topic has active consumers or producers and "
- + "auto-creation of topic is allowed"));
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
index 5a0bde6f913..2cbff955ecf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java
@@ -18,8 +18,6 @@
*/
package org.apache.pulsar.broker.admin;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.fail;
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
@@ -28,8 +26,6 @@ import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.pulsar.broker.MultiBrokerBaseTest;
@@ -37,16 +33,9 @@ import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.loadbalance.LeaderBroker;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
-import org.apache.pulsar.client.admin.internal.TopicsImpl;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
-import org.apache.pulsar.common.policies.data.RetentionPolicies;
-import org.apache.pulsar.common.policies.data.TenantInfoImpl;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -65,7 +54,6 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest {
@Override
protected void doInitConf() throws Exception {
super.doInitConf();
- this.conf.setManagedLedgerMaxEntriesPerLedger(10);
}
@Override
@@ -134,90 +122,4 @@ public class AdminApiMultiBrokersTest extends MultiBrokerBaseTest {
Assert.assertEquals(lookupResultSet.size(), 1);
}
- @Test
- public void testForceDeletePartitionedTopicWithSub() throws Exception {
- final int numPartitions = 10;
- TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
- admin.tenants().createTenant("tenant-xyz", tenantInfo);
- admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
-
- admin.namespaces().setAutoTopicCreation("tenant-xyz/ns-abc",
- AutoTopicCreationOverride.builder()
- .allowAutoTopicCreation(true)
- .topicType("partitioned")
- .defaultNumPartitions(5)
- .build());
-
- RetentionPolicies retention = new RetentionPolicies(10, 10);
- admin.namespaces().setRetention("tenant-xyz/ns-abc", retention);
- final String topic = "persistent://tenant-xyz/ns-abc/topic-"
- + RandomStringUtils.randomAlphabetic(5)
- + "-testDeletePartitionedTopicWithSub";
- final String subscriptionName = "sub";
- ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();
-
- log.info("Creating producer and consumer");
- Consumer<byte[]> consumer = pulsarClient.newConsumer()
- .topic(topic)
- .subscriptionName(subscriptionName)
- .subscribe();
-
- Producer<String> producer = pulsarClient.newProducer(Schema.STRING).enableBatching(false).topic(topic).create();
-
- log.info("producing messages");
- for (int i = 0; i < numPartitions * 100; ++i) {
- producer.newMessage()
- .key("" + i)
- .value("value-" + i)
- .send();
- }
- producer.flush();
- producer.close();
-
- log.info("consuming some messages");
- for (int i = 0; i < numPartitions * 5; i++) {
- Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
- }
-
- log.info("trying to delete the topic");
- try {
- admin.topics().deletePartitionedTopic(topic, true);
- fail("expected PulsarAdminException.NotFoundException");
- } catch (PulsarAdminException e) {
- assertTrue(e.getMessage().contains("Partitioned topic has active consumers or producers"));
- }
-
- // check that metadata is still consistent
- assertEquals(numPartitions, admin.topics().getList("tenant-xyz/ns-abc")
- .stream().filter(t -> t.contains(topic)).count());
- assertEquals(numPartitions,
- pulsar.getPulsarResources().getTopicResources()
- .getExistingPartitions(TopicName.getPartitionedTopicName(topic))
- .get()
- .stream().filter(t -> t.contains(topic)).count());
- assertTrue(admin.topics()
- .getPartitionedTopicList("tenant-xyz/ns-abc")
- .contains(topic));
-
- log.info("closing producer and consumer");
- producer.close();
- consumer.close();
-
- log.info("trying to delete the topic again");
- admin.topics().deletePartitionedTopic(topic, true);
-
- assertEquals(0, admin.topics().getList("tenant-xyz/ns-abc")
- .stream().filter(t -> t.contains(topic)).count());
- assertEquals(0,
- pulsar.getPulsarResources().getTopicResources()
- .getExistingPartitions(TopicName.getPartitionedTopicName(topic))
- .get()
- .stream().filter(t -> t.contains(topic)).count());
- assertFalse(admin.topics()
- .getPartitionedTopicList("tenant-xyz/ns-abc")
- .contains(topic));
-
- log.info("trying to create the topic again");
- ((TopicsImpl) admin.topics()).createPartitionedTopicAsync(topic, numPartitions, true, null).get();
- }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 8caf3a47dd3..c23407bb447 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -1262,7 +1262,6 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
pulsarClient.updateServiceUrl(lookupUrl.toString());
Awaitility.await().untilAsserted(() -> assertTrue(consumer.isConnected()));
pulsar.getConfiguration().setAuthorizationEnabled(true);
- consumer.close();
admin.topics().deletePartitionedTopic(topicName, true);
admin.namespaces().deleteNamespace(namespace);
admin.tenants().deleteTenant("my-tenants");
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 338ffc01807..604abd8d709 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -30,7 +30,6 @@ import java.util.concurrent.TimeUnit;
import io.netty.util.HashedWheelTimer;
import lombok.Cleanup;
-import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerAccessMode;
import org.apache.pulsar.client.api.PulsarClient;
@@ -332,12 +331,7 @@ public class ExclusiveProducerTest extends BrokerTestBase {
p1.send("msg-1");
if (partitioned) {
- try {
- admin.topics().deletePartitionedTopic(topic, true);
- fail("expected error because partitioned topic has active producer");
- } catch (PulsarAdminException.ServerSideErrorException e) {
- // expected
- }
+ admin.topics().deletePartitionedTopic(topic, true);
} else {
admin.topics().delete(topic, true);
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index 35679711fde..8d068d65114 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -1171,6 +1171,34 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
}
}
+ @Test(timeOut = testTimeout)
+ public void testAutoDiscoverMultiTopicsPartitions() throws Exception {
+ final String topicName = "persistent://public/default/issue-9585";
+ admin.topics().createPartitionedTopic(topicName, 3);
+ PatternMultiTopicsConsumerImpl<String> consumer = (PatternMultiTopicsConsumerImpl<String>) pulsarClient.newConsumer(Schema.STRING)
+ .topicsPattern(topicName)
+ .subscriptionName("sub-issue-9585")
+ .subscribe();
+
+ Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 3);
+ Assert.assertEquals(consumer.getConsumers().size(), 3);
+
+ admin.topics().deletePartitionedTopic(topicName, true);
+ consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 0);
+ Assert.assertEquals(consumer.getConsumers().size(), 0);
+ });
+
+ admin.topics().createPartitionedTopic(topicName, 7);
+ consumer.getPartitionsAutoUpdateTimeout().task().run(consumer.getPartitionsAutoUpdateTimeout());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(consumer.getPartitionsOfTheTopicMap(), 7);
+ Assert.assertEquals(consumer.getConsumers().size(), 7);
+ });
+ }
+
+
@Test(timeOut = testTimeout)
public void testPartitionsUpdatesForMultipleTopics() throws Exception {
final String topicName0 = "persistent://public/default/testPartitionsUpdatesForMultipleTopics-0";
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
deleted file mode 100644
index 0adb414e8f4..00000000000
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/topics/TestTopicDeletion.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.tests.integration.topics;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.client.api.Consumer;
-import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.PulsarClient;
-import org.apache.pulsar.tests.integration.docker.ContainerExecException;
-import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
-import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
-import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec;
-import org.testng.annotations.Test;
-
-import java.util.concurrent.TimeUnit;
-import java.util.function.Supplier;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.fail;
-
-/**
- * Test cases for compaction.
- */
-@Slf4j
-public class TestTopicDeletion extends PulsarTestSuite {
-
- final private boolean unload = false;
- final private int numBrokers = 2;
-
- public void setupCluster() throws Exception {
- brokerEnvs.put("managedLedgerMaxEntriesPerLedger", "10");
- brokerEnvs.put("brokerDeleteInactivePartitionedTopicMetadataEnabled", "false");
- brokerEnvs.put("brokerDeleteInactiveTopicsEnabled", "false");
- this.setupCluster("");
- }
-
- protected PulsarClusterSpec.PulsarClusterSpecBuilder beforeSetupCluster(
- String clusterName,
- PulsarClusterSpec.PulsarClusterSpecBuilder specBuilder) {
- specBuilder.numBrokers(numBrokers);
- specBuilder.enableContainerLog(true);
- return specBuilder;
- }
-
- @Test(dataProvider = "ServiceUrls", timeOut=300_000)
- public void testPartitionedTopicForceDeletion(Supplier<String> serviceUrl) throws Exception {
-
- log.info("Creating tenant and namespace");
-
- final String tenant = "test-partitioned-topic-" + randomName(4);
- final String namespace = tenant + "/ns1";
- final String topic = "persistent://" + namespace + "/partitioned-topic";
- final int numPartitions = numBrokers * 3;
- final int numKeys = numPartitions * 50;
- final String subscriptionName = "sub1";
-
- this.createTenantName(tenant, pulsarCluster.getClusterName(), "admin");
-
- this.createNamespace(namespace);
-
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "set-clusters", "--clusters", pulsarCluster.getClusterName(), namespace);
-
- pulsarCluster.runAdminCommandOnAnyBroker("namespaces",
- "set-retention", "--size", "100M", "--time", "100m", namespace);
-
- this.createPartitionedTopic(topic, numPartitions);
-
- try (PulsarClient client = PulsarClient.builder().serviceUrl(serviceUrl.get()).build()) {
-
- log.info("Creating consumer");
- Consumer<byte[]> consumer = client.newConsumer()
- .topic(topic)
- .subscriptionName(subscriptionName)
- .subscribe();
-
- log.info("Producing messages");
- try(Producer<byte[]> producer = client.newProducer()
- .topic(topic)
- .create()
- ) {
- for (int i = 0; i < numKeys; i++) {
- producer.newMessage()
- .key("" + i)
- .value(("value-" + i).getBytes(UTF_8))
- .sendAsync();
- }
- producer.flush();
- log.info("Successfully wrote {} values", numKeys);
- }
-
- log.info("Consuming half of the messages");
- for (int i = 0; i < numKeys / 2; i++) {
- Message<byte[]> m = consumer.receive(1, TimeUnit.MINUTES);
- log.info("Read value {}", m.getKey());
- }
-
- if (unload) {
- log.info("Unloading topic");
- pulsarCluster.runAdminCommandOnAnyBroker("topics",
- "unload", topic);
- }
-
- ContainerExecResult res;
- log.info("Deleting the topic");
- try {
- res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
- "delete-partitioned-topic", "--force", topic);
- assertNotEquals(0, res.getExitCode());
- } catch (ContainerExecException e) {
- log.info("Second delete failed with ContainerExecException, could be ok", e);
- if (!e.getMessage().contains("with error code 1")) {
- fail("Expected different error code");
- }
- }
-
- log.info("Close the consumer and delete the topic again");
- consumer.close();
-
- res = pulsarCluster.runAdminCommandOnAnyBroker("topics",
- "delete-partitioned-topic", "--force", topic);
- assertNotEquals(0, res.getExitCode());
-
- Thread.sleep(5000);
- // should succeed
- log.info("Creating the topic again");
- this.createPartitionedTopic(topic, numBrokers * 2);
- }
- }
-
-
- private ContainerExecResult createTenantName(final String tenantName,
- final String allowedClusterName,
- final String adminRoleName) throws Exception {
- ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
- "tenants", "create", "--allowed-clusters", allowedClusterName,
- "--admin-roles", adminRoleName, tenantName);
- assertEquals(0, result.getExitCode());
- return result;
- }
-
- private ContainerExecResult createNamespace(final String Ns) throws Exception {
- ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
- "namespaces",
- "create",
- "--clusters",
- pulsarCluster.getClusterName(), Ns);
- assertEquals(0, result.getExitCode());
- return result;
- }
-
- private ContainerExecResult createPartitionedTopic(final String partitionedTopicName, int numPartitions)
- throws Exception {
- ContainerExecResult result = pulsarCluster.runAdminCommandOnAnyBroker(
- "topics",
- "create-partitioned-topic",
- "--partitions", "" + numPartitions,
- partitionedTopicName);
- assertEquals(0, result.getExitCode());
- return result;
- }
-
-
-}