You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/01 01:01:20 UTC
[pulsar] branch branch-2.10 updated: [fix][broker] Fix namespace can not be deleted by force (#18686)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new fd00634a126 [fix][broker] Fix namespace can not be deleted by force (#18686)
fd00634a126 is described below
commit fd00634a1266a9bba8f4eec4ba204eb3a589bd97
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Dec 1 09:01:07 2022 +0800
[fix][broker] Fix namespace can not be deleted by force (#18686)
Cherry-pick #18307
---
.../pulsar/broker/admin/impl/NamespacesBase.java | 84 ++++++++++++++++++----
.../pulsar/broker/service/BrokerService.java | 15 +++-
.../broker/service/persistent/PersistentTopic.java | 9 ++-
.../apache/pulsar/broker/admin/NamespacesTest.java | 65 +++++++++++++++++
.../broker/transaction/TransactionProduceTest.java | 29 --------
5 files changed, 154 insertions(+), 48 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index cfb27a629f2..68ea5443536 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
import java.lang.reflect.Field;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -47,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.ListUtils;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.lang3.StringUtils;
@@ -59,6 +61,7 @@ import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -472,11 +475,17 @@ public abstract class NamespacesBase extends AdminResource {
if (!topics.isEmpty()) {
Set<String> partitionedTopics = new HashSet<>();
Set<String> nonPartitionedTopics = new HashSet<>();
+ Set<String> allSystemTopics = new HashSet<>();
+ Set<String> allPartitionedSystemTopics = new HashSet<>();
for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
+ if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+ allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+ continue;
+ }
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
// Distinguish partitioned topic to avoid duplicate deletion of the same schema
@@ -485,6 +494,10 @@ public abstract class NamespacesBase extends AdminResource {
partitionedTopics.add(partitionedTopic);
}
} else {
+ if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+ allSystemTopics.add(topic);
+ continue;
+ }
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
topic, true, true));
nonPartitionedTopics.add(topic);
@@ -508,21 +521,24 @@ public abstract class NamespacesBase extends AdminResource {
}
final CompletableFuture<Throwable> topicFutureEx =
- FutureUtil.waitForAll(topicFutures).handle((result, exception) -> {
- if (exception != null) {
- if (exception.getCause() instanceof PulsarAdminException) {
- asyncResponse
- .resume(new RestException((PulsarAdminException) exception.getCause()));
- } else {
- log.error("[{}] Failed to remove forcefully owned namespace {}",
- clientAppId(), namespaceName, exception);
- asyncResponse.resume(new RestException(exception.getCause()));
- }
- return exception;
- }
-
- return null;
- });
+ FutureUtil.waitForAll(topicFutures)
+ .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
+ .thenCompose((ignore) ->
+ internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .handle((result, exception) -> {
+ if (exception != null) {
+ if (exception.getCause() instanceof PulsarAdminException) {
+ asyncResponse.resume(
+ new RestException((PulsarAdminException) exception.getCause()));
+ } else {
+ log.error("[{}] Failed to remove forcefully owned namespace {}",
+ clientAppId(), namespaceName, exception);
+ asyncResponse.resume(new RestException(exception.getCause()));
+ }
+ return exception;
+ }
+ return null;
+ });
if (topicFutureEx.join() != null) {
return;
}
@@ -564,6 +580,44 @@ public abstract class NamespacesBase extends AdminResource {
});
}
+ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+ log.info("internalDeletePartitionedTopicsAsync");
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ TopicName tn = TopicName.get(topicName);
+ futures.add(admin.topics().deletePartitionedTopicAsync(topicName, true, true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+ private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
+ log.info("internalDeleteTopicsAsync");
+ if (CollectionUtils.isEmpty(topicNames)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ PulsarAdmin admin;
+ try {
+ admin = pulsar().getAdminClient();
+ } catch (Exception ex) {
+ log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+ return FutureUtil.failedFuture(ex);
+ }
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (String topicName : topicNames) {
+ futures.add(admin.topics().deleteAsync(topicName, true, true));
+ }
+ return FutureUtil.waitForAll(futures);
+ }
+
protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
if (force) {
internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fb2bc3bbe1a..a6ce043c06e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2964,11 +2964,20 @@ public class BrokerService implements Closeable {
}
public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
- if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+ final PulsarService pulsarService = pulsar();
+ if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
return CompletableFuture.completedFuture(null);
}
- TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
- return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+ return pulsarService.getPulsarResources().getNamespaceResources()
+ .getPoliciesAsync(topicName.getNamespaceObject())
+ .thenCompose(optPolicies -> {
+ if (optPolicies.isPresent() && optPolicies.get().deleted) {
+ // We can return the completed future directly if the namespace is already deleted.
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+ return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+ });
}
private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ba5a11444a6..cd2324e2fe5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1183,7 +1183,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema ? deleteSchema() :
CompletableFuture.completedFuture(null))
- .thenCompose(__ -> deleteTopicPolicies())
+ .thenCompose(ignore -> {
+ if (!this.getBrokerService().getPulsar().getBrokerService()
+ .isSystemTopic(TopicName.get(topic))) {
+ return deleteTopicPolicies();
+ } else {
+ return CompletableFuture.completedFuture(null);
+ }
+ })
.thenCompose(__ -> transactionBufferCleanupAndClose())
.whenComplete((v, ex) -> {
if (ex != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 50a9a1d554d..3a5e04ec1fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.broker.namespace.OwnershipCache;
import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.web.PulsarWebResource;
import org.apache.pulsar.broker.web.RestException;
@@ -81,9 +82,11 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.NamespaceBundles;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.AuthAction;
import org.apache.pulsar.common.policies.data.BundlesData;
@@ -107,6 +110,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
@@ -1773,4 +1777,65 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
BundlesData bundles = admin.namespaces().getBundles(namespace);
assertEquals(bundles.getNumBundles(), 14);
}
+
+ @Test
+ public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
+ String namespace = this.testTenant + "/delete-namespace";
+ String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-namespace",
+ "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+ Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. change the order of the topics in this namespace.
+ List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+ Assert.assertTrue(topics.size() >= 2);
+ for (int i = 0; i < topics.size(); i++) {
+ if (topics.get(i).contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+ String systemTopic = topics.get(i);
+ topics.set(i, topics.get(0));
+ topics.set(0, systemTopic);
+ }
+ }
+ NamespaceService mockNamespaceService = spy(pulsar.getNamespaceService());
+ Field namespaceServiceField = pulsar.getClass().getDeclaredField("nsService");
+ namespaceServiceField.setAccessible(true);
+ namespaceServiceField.set(pulsar, mockNamespaceService);
+ doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+ // 5. delete the namespace
+ admin.namespaces().deleteNamespace(namespace, true);
+ }
+
+ @Test
+ public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+ String namespace = this.testTenant + "/delete-systemTopic";
+ String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
+ "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+ // 0. enable topic level polices and system topic
+ pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+ pulsar.getConfig().setSystemTopicEnabled(true);
+ Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+ policesService.setAccessible(true);
+ policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+ // 1. create a test namespace.
+ admin.namespaces().createNamespace(namespace);
+ // 2. create a test topic.
+ admin.topics().createNonPartitionedTopic(topic);
+ // 3. change policy of the topic.
+ admin.topicPolicies().setMaxConsumers(topic, 5);
+ // 4. delete the policies topic and the topic wil not to clear topic polices
+ admin.topics().delete(namespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+ }
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 48015c50cc6..0d1bbda4568 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends TransactionTestBase {
produceTest(true);
}
- @Test
- public void testDeleteNamespaceBeforeCommit() throws Exception {
- final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
- PulsarClient pulsarClient = this.pulsarClient;
- Transaction tnx = pulsarClient.newTransaction()
- .withTransactionTimeout(60, TimeUnit.SECONDS)
- .build().get();
- long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
- long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
- Assert.assertTrue(txnIdMostBits > -1);
- Assert.assertTrue(txnIdLeastBits > -1);
-
- @Cleanup
- Producer<byte[]> outProducer = pulsarClient
- .newProducer()
- .topic(topic)
- .sendTimeout(0, TimeUnit.SECONDS)
- .enableBatching(false)
- .create();
-
- String content = "Hello Txn";
- outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
- try {
- admin.namespaces().deleteNamespace(NAMESPACE1, true);
- } catch (Exception ignore) {}
- tnx.commit().get();
- }
-
@Test
public void produceAndAbortTest() throws Exception {
produceTest(false);