You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/12/01 01:01:20 UTC

[pulsar] branch branch-2.10 updated: [fix][broker] Fix namespace can not be deleted by force (#18686)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new fd00634a126 [fix][broker] Fix namespace can not be deleted by force (#18686)
fd00634a126 is described below

commit fd00634a1266a9bba8f4eec4ba204eb3a589bd97
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Thu Dec 1 09:01:07 2022 +0800

    [fix][broker] Fix namespace can not be deleted by force (#18686)
    
    Cherry-pick #18307
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 84 ++++++++++++++++++----
 .../pulsar/broker/service/BrokerService.java       | 15 +++-
 .../broker/service/persistent/PersistentTopic.java |  9 ++-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 65 +++++++++++++++++
 .../broker/transaction/TransactionProduceTest.java | 29 --------
 5 files changed, 154 insertions(+), 48 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index cfb27a629f2..68ea5443536 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -27,6 +27,7 @@ import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -47,6 +48,7 @@ import javax.ws.rs.container.AsyncResponse;
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.collections4.ListUtils;
 import org.apache.commons.lang.mutable.MutableObject;
 import org.apache.commons.lang3.StringUtils;
@@ -59,6 +61,7 @@ import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.service.persistent.PersistentReplicator;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandGetTopicsOfNamespace;
@@ -472,11 +475,17 @@ public abstract class NamespacesBase extends AdminResource {
             if (!topics.isEmpty()) {
                 Set<String> partitionedTopics = new HashSet<>();
                 Set<String> nonPartitionedTopics = new HashSet<>();
+                Set<String> allSystemTopics = new HashSet<>();
+                Set<String> allPartitionedSystemTopics = new HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
+                            if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                continue;
+                            }
                             String partitionedTopic = topicName.getPartitionedTopicName();
                             if (!partitionedTopics.contains(partitionedTopic)) {
                                 // Distinguish partitioned topic to avoid duplicate deletion of the same schema
@@ -485,6 +494,10 @@ public abstract class NamespacesBase extends AdminResource {
                                 partitionedTopics.add(partitionedTopic);
                             }
                         } else {
+                            if (pulsar().getBrokerService().isSystemTopic(topicName)) {
+                                allSystemTopics.add(topic);
+                                continue;
+                            }
                             topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
                                     topic, true, true));
                             nonPartitionedTopics.add(topic);
@@ -508,21 +521,24 @@ public abstract class NamespacesBase extends AdminResource {
                 }
 
                 final CompletableFuture<Throwable> topicFutureEx =
-                        FutureUtil.waitForAll(topicFutures).handle((result, exception) -> {
-                            if (exception != null) {
-                                if (exception.getCause() instanceof PulsarAdminException) {
-                                    asyncResponse
-                                            .resume(new RestException((PulsarAdminException) exception.getCause()));
-                                } else {
-                                    log.error("[{}] Failed to remove forcefully owned namespace {}",
-                                            clientAppId(), namespaceName, exception);
-                                    asyncResponse.resume(new RestException(exception.getCause()));
-                                }
-                                return exception;
-                            }
-
-                            return null;
-                        });
+                        FutureUtil.waitForAll(topicFutures)
+                                .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
+                                .thenCompose((ignore) ->
+                                        internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .handle((result, exception) -> {
+                                    if (exception != null) {
+                                        if (exception.getCause() instanceof PulsarAdminException) {
+                                            asyncResponse.resume(
+                                                    new RestException((PulsarAdminException) exception.getCause()));
+                                        } else {
+                                            log.error("[{}] Failed to remove forcefully owned namespace {}",
+                                                    clientAppId(), namespaceName, exception);
+                                            asyncResponse.resume(new RestException(exception.getCause()));
+                                        }
+                                        return exception;
+                                    }
+                                    return null;
+                                });
                 if (topicFutureEx.join() != null) {
                     return;
                 }
@@ -564,6 +580,44 @@ public abstract class NamespacesBase extends AdminResource {
         });
     }
 
+    private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+        log.info("internalDeletePartitionedTopicsAsync");
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            TopicName tn = TopicName.get(topicName);
+            futures.add(admin.topics().deletePartitionedTopicAsync(topicName, true, true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+    private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
+        log.info("internalDeleteTopicsAsync");
+        if (CollectionUtils.isEmpty(topicNames)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        PulsarAdmin admin;
+        try {
+            admin = pulsar().getAdminClient();
+        } catch (Exception ex) {
+            log.error("[{}] Get admin client error when preparing to delete topics.", clientAppId(), ex);
+            return FutureUtil.failedFuture(ex);
+        }
+        List<CompletableFuture<Void>> futures = new ArrayList<>();
+        for (String topicName : topicNames) {
+            futures.add(admin.topics().deleteAsync(topicName, true, true));
+        }
+        return FutureUtil.waitForAll(futures);
+    }
+
     protected void internalDeleteNamespaceBundle(String bundleRange, boolean authoritative, boolean force) {
         if (force) {
             internalDeleteNamespaceBundleForcefully(bundleRange, authoritative);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index fb2bc3bbe1a..a6ce043c06e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -2964,11 +2964,20 @@ public class BrokerService implements Closeable {
     }
 
     public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) {
-        if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
+        final PulsarService pulsarService = pulsar();
+        if (!pulsarService.getConfig().isTopicLevelPoliciesEnabled()) {
             return CompletableFuture.completedFuture(null);
         }
-        TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
-        return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+        return pulsarService.getPulsarResources().getNamespaceResources()
+                .getPoliciesAsync(topicName.getNamespaceObject())
+                .thenCompose(optPolicies -> {
+                    if (optPolicies.isPresent() && optPolicies.get().deleted) {
+                        // We can return the completed future directly if the namespace is already deleted.
+                        return CompletableFuture.completedFuture(null);
+                    }
+                    TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
+                    return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName);
+                });
     }
 
     private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index ba5a11444a6..cd2324e2fe5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1183,7 +1183,14 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                 brokerService.deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5);
                 deleteTopicAuthenticationFuture.thenCompose(__ -> deleteSchema ? deleteSchema() :
                                 CompletableFuture.completedFuture(null))
-                        .thenCompose(__ -> deleteTopicPolicies())
+                        .thenCompose(ignore -> {
+                            if (!this.getBrokerService().getPulsar().getBrokerService()
+                                    .isSystemTopic(TopicName.get(topic))) {
+                                return deleteTopicPolicies();
+                            } else {
+                                return CompletableFuture.completedFuture(null);
+                            }
+                        })
                         .thenCompose(__ -> transactionBufferCleanupAndClose())
                         .whenComplete((v, ex) -> {
                             if (ex != null) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 50a9a1d554d..3a5e04ec1fd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -69,6 +69,7 @@ import org.apache.pulsar.broker.namespace.NamespaceEphemeralData;
 import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.broker.namespace.OwnershipCache;
 import org.apache.pulsar.broker.service.AbstractTopic;
+import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService;
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
@@ -81,9 +82,11 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
+import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceBundle;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.BundlesData;
@@ -107,6 +110,7 @@ import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.BeforeMethod;
@@ -1773,4 +1777,65 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         BundlesData bundles = admin.namespaces().getBundles(namespace);
         assertEquals(bundles.getNumBundles(), 14);
     }
+
+    @Test
+    public void testFinallyDeleteSystemTopicWhenDeleteNamespace() throws Exception {
+        String namespace = this.testTenant + "/delete-namespace";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-namespace",
+                "testFinallyDeleteSystemTopicWhenDeleteNamespace").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        pulsar.getConfig().setForceDeleteNamespaceAllowed(true);
+        Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. change the order of the topics in this namespace.
+        List<String> topics = pulsar.getNamespaceService().getFullListOfTopics(NamespaceName.get(namespace)).get();
+        Assert.assertTrue(topics.size() >= 2);
+        for (int i = 0; i < topics.size(); i++) {
+            if (topics.get(i).contains(EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME)) {
+                String systemTopic = topics.get(i);
+                topics.set(i, topics.get(0));
+                topics.set(0, systemTopic);
+            }
+        }
+        NamespaceService mockNamespaceService = spy(pulsar.getNamespaceService());
+        Field namespaceServiceField = pulsar.getClass().getDeclaredField("nsService");
+        namespaceServiceField.setAccessible(true);
+        namespaceServiceField.set(pulsar, mockNamespaceService);
+        doReturn(CompletableFuture.completedFuture(topics)).when(mockNamespaceService).getFullListOfTopics(any());
+        // 5. delete the namespace
+        admin.namespaces().deleteNamespace(namespace, true);
+    }
+
+    @Test
+    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+        String namespace = this.testTenant + "/delete-systemTopic";
+        String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
+                "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
+
+        // 0. enable topic level polices and system topic
+        pulsar.getConfig().setTopicLevelPoliciesEnabled(true);
+        pulsar.getConfig().setSystemTopicEnabled(true);
+        Field policesService = pulsar.getClass().getDeclaredField("topicPoliciesService");
+        policesService.setAccessible(true);
+        policesService.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+        // 1. create a test namespace.
+        admin.namespaces().createNamespace(namespace);
+        // 2. create a test topic.
+        admin.topics().createNonPartitionedTopic(topic);
+        // 3. change policy of the topic.
+        admin.topicPolicies().setMaxConsumers(topic, 5);
+        // 4. delete the policies topic and the topic wil not to clear topic polices
+        admin.topics().delete(namespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
index 48015c50cc6..0d1bbda4568 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionProduceTest.java
@@ -89,35 +89,6 @@ public class TransactionProduceTest extends TransactionTestBase {
         produceTest(true);
     }
 
-    @Test
-    public void testDeleteNamespaceBeforeCommit() throws Exception {
-        final String topic = NAMESPACE1 + "/testDeleteTopicBeforeCommit";
-        PulsarClient pulsarClient = this.pulsarClient;
-        Transaction tnx = pulsarClient.newTransaction()
-                .withTransactionTimeout(60, TimeUnit.SECONDS)
-                .build().get();
-        long txnIdMostBits = ((TransactionImpl) tnx).getTxnIdMostBits();
-        long txnIdLeastBits = ((TransactionImpl) tnx).getTxnIdLeastBits();
-        Assert.assertTrue(txnIdMostBits > -1);
-        Assert.assertTrue(txnIdLeastBits > -1);
-
-        @Cleanup
-        Producer<byte[]> outProducer = pulsarClient
-                .newProducer()
-                .topic(topic)
-                .sendTimeout(0, TimeUnit.SECONDS)
-                .enableBatching(false)
-                .create();
-
-        String content = "Hello Txn";
-        outProducer.newMessage(tnx).value(content.getBytes(UTF_8)).send();
-
-        try {
-            admin.namespaces().deleteNamespace(NAMESPACE1, true);
-        } catch (Exception ignore) {}
-        tnx.commit().get();
-    }
-
     @Test
     public void produceAndAbortTest() throws Exception {
         produceTest(false);