You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/03/20 15:37:58 UTC

[pulsar] branch branch-2.11 updated: [fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.11 by this push:
     new 5eb0ce2ac5c [fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)
5eb0ce2ac5c is described below

commit 5eb0ce2ac5c40ee13affebe9fc738e6b91ccc96e
Author: Xiangying Meng <55...@users.noreply.github.com>
AuthorDate: Mon Mar 20 23:37:23 2023 +0800

    [fix][broker] Fix delete system topic clean topic policy (#18823) (#19835)
    
    Co-authored-by: Jiwe Guo <te...@apache.org>
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 47 +++++++++++++++++-----
 .../broker/service/persistent/PersistentTopic.java |  2 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 36 ++++++++++++++++-
 3 files changed, 74 insertions(+), 11 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index c2ce36d49ff..f76e8a02827 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.common.naming.NamespaceBundleFactory;
 import org.apache.pulsar.common.naming.NamespaceBundleSplitAlgorithm;
 import org.apache.pulsar.common.naming.NamespaceBundles;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.SystemTopicNames;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.AuthAction;
@@ -310,10 +311,21 @@ public abstract class NamespacesBase extends AdminResource {
         // remove from owned namespace map and ephemeral node from ZK
         final List<CompletableFuture<Void>> futures = Lists.newArrayList();
         // remove system topics first.
+        Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+        Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
         if (!topics.isEmpty()) {
             for (String topic : topics) {
                 try {
-                    futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                        TopicName topicName = TopicName.get(topic);
+                        if (topicName.isPartitioned()) {
+                            partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+                        } else {
+                            noPartitionedTopicPolicySystemTopic.add(topic);
+                        }
+                    } else {
+                        futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                    }
                 } catch (Exception ex) {
                     log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
                     asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
@@ -321,11 +333,14 @@ public abstract class NamespacesBase extends AdminResource {
                 }
             }
         }
-        FutureUtil.waitForAll(futures).thenCompose(__ -> {
-            List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
-            NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
-                            .getBundles(namespaceName);
-            for (NamespaceBundle bundle : bundles.getBundles()) {
+        FutureUtil.waitForAll(futures)
+                .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+                .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                .thenCompose(__ -> {
+                    List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
+                    NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
+                                    .getBundles(namespaceName);
+                    for (NamespaceBundle bundle : bundles.getBundles()) {
                         // check if the bundle is owned by any broker, if not then we do not need to delete the bundle
                 deleteBundleFutures.add(pulsar().getNamespaceService().getOwnerAsync(bundle).thenCompose(ownership -> {
                     if (ownership.isPresent()) {
@@ -475,13 +490,19 @@ public abstract class NamespacesBase extends AdminResource {
                 Set<String> nonPartitionedTopics = new HashSet<>();
                 Set<String> allSystemTopics = new HashSet<>();
                 Set<String> allPartitionedSystemTopics = new HashSet<>();
+                Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
+                Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
                             if (pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
+                                } else {
+                                    allPartitionedSystemTopics.add(topicName.getPartitionedTopicName());
+                                }
                                 continue;
                             }
                             String partitionedTopic = topicName.getPartitionedTopicName();
@@ -490,12 +511,17 @@ public abstract class NamespacesBase extends AdminResource {
                             }
                         } else {
                             if (pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                allSystemTopics.add(topic);
+                                if (SystemTopicNames.isTopicPoliciesSystemTopic(topic)) {
+                                    noPartitionedTopicPolicySystemTopic.add(topic);
+                                } else {
+                                    allSystemTopics.add(topic);
+                                }
                                 continue;
                             }
                             nonPartitionedTopics.add(topic);
                         }
-                        topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true));
+                        topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
+                                topic, true, true));
                     } catch (Exception e) {
                         String errorMessage = String.format("Failed to force delete topic %s, "
                                         + "but the previous deletion command of partitioned-topics:%s "
@@ -524,6 +550,9 @@ public abstract class NamespacesBase extends AdminResource {
                                 .thenCompose((ignore) -> internalDeleteTopicsAsync(allSystemTopics))
                                 .thenCompose((ignore) ->
                                         internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .thenCompose(ignore ->
+                                        internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
+                                .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
                                 .handle((result, exception) -> {
                                     if (exception != null) {
                                         if (exception.getCause() instanceof PulsarAdminException) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 8e95f2c4313..d8f56d72960 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1186,7 +1186,7 @@ public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCal
                     deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema())
                                 .thenCompose(ignore -> {
                                     if (!SystemTopicNames.isTopicPoliciesSystemTopic(topic)
-                                            && brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
+                                        && brokerService.getPulsar().getConfiguration().isSystemTopicEnabled()) {
                                         return deleteTopicPolicies();
                                     } else {
                                         return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index 16dfe5bc9a3..eaa54c1fd34 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -57,11 +58,13 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.util.ZkUtils;
 import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.broker.admin.v1.Namespaces;
 import org.apache.pulsar.broker.admin.v1.PersistentTopics;
 import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
@@ -81,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.api.proto.CommandSubscribe;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -103,6 +107,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.common.policies.data.impl.DispatchRateImpl;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
@@ -1938,7 +1943,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+    public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception {
         String namespace = this.testTenant + "/delete-systemTopic";
         String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
                 "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1958,4 +1963,33 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         // 4. delete the policies topic and the topic wil not to clear topic polices
         admin.topics().delete(namespace + "/" + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
     }
+    @Test
+    public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        Field field = PulsarService.class.getDeclaredField("topicPoliciesService");
+        field.setAccessible(true);
+        field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+        String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic";
+        admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Set.of("role1", "role2"), Set.of("use", "usc", "usw")));
+
+        admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(systemTopic).create();
+        admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+        Integer maxConsumerPerTopic = pulsar
+                .getTopicPoliciesService()
+                .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+                .getMaxConsumerPerTopic();
+
+        assertEquals(maxConsumerPerTopic, Integer.valueOf(5));
+        admin.topics().delete(systemTopic, true);
+        TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+                .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
+        assertNull(topicPolicies);
+    }
 }