You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by bo...@apache.org on 2022/12/10 07:10:08 UTC

[pulsar] branch branch-2.9 updated: [cherry-pick][branch-2.9] Fix delete system topic clean topic policy (#18823) (#18831)

This is an automated email from the ASF dual-hosted git repository.

bogong pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.9 by this push:
     new f7dc68fcd15 [cherry-pick][branch-2.9] Fix delete system topic clean topic policy (#18823) (#18831)
f7dc68fcd15 is described below

commit f7dc68fcd15ab15cee4983dd719b055391b4e27f
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Sat Dec 10 15:10:01 2022 +0800

    [cherry-pick][branch-2.9] Fix delete system topic clean topic policy (#18823) (#18831)
    
    cherry-pick #18823
    ### Motivation
    If users set topic policy for system topic, then delete this system topic, the topic policy should be deleted.
    
    ### Modification
    Only change_events topic do not need to clear topic policies.
    
    ### Matching PR in forked repository
    
    PR in forked repository: https://github.com/liangyepianzhou/pulsar/pull/16
---
 .../pulsar/broker/admin/impl/NamespacesBase.java   | 72 ++++++++++++++--------
 .../pulsar/broker/service/BrokerService.java       |  8 +++
 .../broker/service/persistent/PersistentTopic.java |  4 +-
 .../apache/pulsar/broker/admin/NamespacesTest.java | 37 ++++++++++-
 4 files changed, 94 insertions(+), 27 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7e2fa1ada7b..358085cb3e1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.broker.admin.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.service.BrokerService.isTopicPoliciesSystemTopic;
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
 import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
 import com.google.common.collect.Lists;
@@ -279,21 +280,46 @@ public abstract class NamespacesBase extends AdminResource {
             return;
         }
 
+        CompletableFuture<Void> deleteSystemTopicFuture = null;
         if (!isEmpty) {
             if (log.isDebugEnabled()) {
                 log.debug("Found topics on namespace {}", namespaceName);
             }
+            List<String> allSystemTopics = new ArrayList<>();
+            List<String> allPartitionedSystemTopics = new ArrayList<>();
+            List<String> topicPolicy = new ArrayList<>();
+            List<String> partitionedTopicPolicy = new ArrayList<>();
             boolean hasNonSystemTopic = false;
             for (String topic : topics) {
                 if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
                     hasNonSystemTopic = true;
                     break;
                 }
+                TopicName topicName = TopicName.get(topic);
+                if (topicName.isPartitioned()) {
+                    if (isTopicPoliciesSystemTopic(topic)) {
+                        partitionedTopicPolicy.add(topic);
+                    } else {
+                        allPartitionedSystemTopics.add(topic);
+                    }
+                } else {
+                    if (isTopicPoliciesSystemTopic(topic)) {
+                        topicPolicy.add(topic);
+                    } else {
+                        allSystemTopics.add(topic);
+                    }
+                }
             }
             if (hasNonSystemTopic) {
                 asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace"));
                 return;
             }
+            deleteSystemTopicFuture = internalDeleteTopicsAsync(allSystemTopics)
+                    .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                    .thenCompose(ignore -> internalDeleteTopicsAsync(topicPolicy))
+                    .thenCompose(ignore__ -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
+        } else {
+            deleteSystemTopicFuture = CompletableFuture.completedFuture(null);
         }
 
         // set the policies to deleted so that somebody else cannot acquire this namespace
@@ -308,21 +334,7 @@ public abstract class NamespacesBase extends AdminResource {
             return;
         }
 
-        // remove from owned namespace map and ephemeral node from ZK
-        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
-        // remove system topics first.
-        if (!topics.isEmpty()) {
-            for (String topic : topics) {
-                try {
-                    futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
-                } catch (Exception ex) {
-                    log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
-                    asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
-                    return;
-                }
-            }
-        }
-        FutureUtil.waitForAll(futures).thenCompose(__ -> {
+        deleteSystemTopicFuture.thenCompose(__ -> {
             List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
             NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
                             .getBundles(namespaceName);
@@ -473,17 +485,23 @@ public abstract class NamespacesBase extends AdminResource {
         try {
             // firstly remove all topics including system topics
             if (!topics.isEmpty()) {
-                Set<String> partitionedTopics = new HashSet<>();
-                Set<String> nonPartitionedTopics = new HashSet<>();
-                Set<String> allSystemTopics = new HashSet<>();
-                Set<String> allPartitionedSystemTopics = new HashSet<>();
+                List<String> partitionedTopics = new ArrayList<>();
+                List<String> nonPartitionedTopics = new ArrayList<>();
+                List<String> allSystemTopics = new ArrayList<>();
+                List<String> allPartitionedSystemTopics = new ArrayList<>();
+                List<String> topicPolicy = new ArrayList<>();
+                List<String> partitionedTopicPolicy = new ArrayList<>();
 
                 for (String topic : topics) {
                     try {
                         TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
                             if (pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                allPartitionedSystemTopics.add(topic);
+                                if (isTopicPoliciesSystemTopic(topic)) {
+                                    partitionedTopicPolicy.add(topic);
+                                } else {
+                                    allPartitionedSystemTopics.add(topic);
+                                }
                                 continue;
                             }
                             String partitionedTopic = topicName.getPartitionedTopicName();
@@ -495,7 +513,11 @@ public abstract class NamespacesBase extends AdminResource {
                             }
                         } else {
                             if (pulsar().getBrokerService().isSystemTopic(topicName)) {
-                                allSystemTopics.add(topic);
+                                if (isTopicPoliciesSystemTopic(topic)) {
+                                    topicPolicy.add(topic);
+                                } else {
+                                    allSystemTopics.add(topic);
+                                }
                                 continue;
                             }
                             topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
@@ -522,8 +544,10 @@ public abstract class NamespacesBase extends AdminResource {
 
                 final CompletableFuture<Throwable> topicFutureEx =
                         FutureUtil.waitForAll(topicFutures)
-                                .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
                                 .thenCompose(ignore -> internalDeleteTopicsAsync(allSystemTopics))
+                                .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+                                .thenCompose(ignore -> internalDeleteTopicsAsync(topicPolicy))
+                                .thenCompose(ignore__ -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicy))
                                 .handle((result, exception) -> {
                                     if (exception != null) {
                                         if (exception.getCause() instanceof PulsarAdminException) {
@@ -582,7 +606,7 @@ public abstract class NamespacesBase extends AdminResource {
         });
     }
 
-    private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+    private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String> topicNames) {
         if (CollectionUtils.isEmpty(topicNames)) {
             return CompletableFuture.completedFuture(null);
         }
@@ -601,7 +625,7 @@ public abstract class NamespacesBase extends AdminResource {
         }
         return FutureUtil.waitForAll(futures);
     }
-    private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
+    private CompletableFuture<Void> internalDeleteTopicsAsync(List<String> topicNames) {
         if (CollectionUtils.isEmpty(topicNames)) {
             return CompletableFuture.completedFuture(null);
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b251b0d843b..10ae4fa118f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -24,6 +24,7 @@ import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 import static org.apache.commons.collections.CollectionUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
+import static org.apache.pulsar.common.events.EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -2846,6 +2847,13 @@ public class BrokerService implements Closeable {
         return false;
     }
 
+    public static boolean isTopicPoliciesSystemTopic(String topic) {
+        if (topic == null) {
+            return false;
+        }
+        return TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
+    }
+
     /**
      * Get {@link TopicPolicies} for the parameterized topic.
      * @param topicName
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bcf81fb4625..cc90770f423 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.broker.service.persistent;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.service.BrokerService.isTopicPoliciesSystemTopic;
 import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
 import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
 import com.carrotsearch.hppc.ObjectObjectHashMap;
@@ -1182,8 +1183,7 @@ public class PersistentTopic extends AbstractTopic
                         deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema ? deleteSchema() :
                                         CompletableFuture.completedFuture(null))
                                 .thenCompose(ignore -> {
-                                    if (!this.getBrokerService().getPulsar().getBrokerService()
-                                            .isSystemTopic(TopicName.get(topic))) {
+                                    if (!isTopicPoliciesSystemTopic(topic)) {
                                         return deleteTopicPolicies();
                                     } else {
                                         return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b5b57769e3b..55441d9529f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.admin;
 
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
@@ -33,6 +34,8 @@ import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertNull;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
 import java.lang.reflect.Field;
@@ -57,6 +60,7 @@ import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.UriBuilder;
 import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -80,6 +84,7 @@ import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.events.EventsTopicNames;
 import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -99,6 +104,7 @@ import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.SubscribeRate;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
 import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
 import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
 import org.apache.zookeeper.KeeperException.Code;
@@ -1816,7 +1822,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
     }
 
     @Test
-    public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+    public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception {
         String namespace = this.testTenant + "/delete-systemTopic";
         String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
                 "testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1836,4 +1842,33 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest {
         // 4. delete the policies topic and the topic wil not to clear topic polices
         admin.topics().delete(namespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
     }
+    @Test
+    public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+        conf.setTopicLevelPoliciesEnabled(true);
+        conf.setSystemTopicEnabled(true);
+        Field field = PulsarService.class.getDeclaredField("topicPoliciesService");
+        field.setAccessible(true);
+        field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+        String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic";
+        admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+                new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use", "usc", "usw")));
+
+        admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(systemTopic).create();
+        admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+        Integer maxConsumerPerTopic = pulsar
+                .getTopicPoliciesService()
+                .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+                .getMaxConsumerPerTopic();
+
+        assertEquals(maxConsumerPerTopic.intValue(), 5);
+        admin.topics().delete(systemTopic, true);
+        TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+                .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
+        assertNull(topicPolicies);
+    }
 }