You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/24 00:40:25 UTC

[pulsar] branch branch-2.8 updated: Clean up the metadata of the non-persistent partitioned topics. (#12910)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new b6b0b0e  Clean up the metadata of the non-persistent partitioned topics. (#12910)
b6b0b0e is described below

commit b6b0b0ead75fbfa3d45b748ae2e8cbefa3d4efe9
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Nov 24 08:38:49 2021 +0800

    Clean up the metadata of the non-persistent partitioned topics. (#12910)
---
 .../service/nonpersistent/NonPersistentTopic.java  | 29 ++++++++++++++++++++++
 .../broker/service/NonPersistentTopicE2ETest.java  | 14 +++++++++++
 2 files changed, 43 insertions(+)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 7ae87f3..0180f5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -40,6 +40,8 @@ import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
 import org.apache.pulsar.broker.service.AbstractTopic;
 import org.apache.pulsar.broker.service.BrokerService;
 import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -870,6 +872,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
                     }
 
                     stopReplProducers().thenCompose(v -> delete(true, false, true))
+                            .thenAccept(__ -> tryToDeletePartitionedMetadata())
                             .thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
                             .exceptionally(e -> {
                                 Throwable throwable = e.getCause();
@@ -891,6 +894,32 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         }
     }
 
+    private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
+        if (TopicName.get(topic).isPartitioned() && !deletePartitionedTopicMetadataWhileInactive()) {
+            return CompletableFuture.completedFuture(null);
+        }
+        TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+        try {
+            NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
+                    .getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
+            String path = partitionedTopicPath(topicName);
+            if (!partitionedTopicResources.exists(path)) {
+                return CompletableFuture.completedFuture(null);
+            }
+            return partitionedTopicResources.deleteAsync(path);
+        } catch (Exception e) {
+            return FutureUtil.failedFuture(e);
+        }
+    }
+
+    private static String partitionedTopicPath(TopicName topicName) {
+        return String.format("%s/%s/%s/%s",
+                ConfigurationCacheService.PARTITIONED_TOPICS_ROOT,
+                topicName.getNamespace(),
+                topicName.getDomain(),
+                topicName.getEncodedLocalName());
+    }
+
     @Override
     public void checkInactiveSubscriptions() {
         TopicName name = TopicName.get(topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 1543d3e..9525944 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -52,6 +52,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
     @BeforeMethod(alwaysRun = true)
     @Override
     protected void setup() throws Exception {
+        conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
+        conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
         super.baseSetup();
     }
 
@@ -228,5 +230,17 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         producer2.close();
 
         assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 6. Test for partitioned topic to delete the partitioned metadata
+        String topicGc = "non-persistent://prop/ns-abc/topic-gc";
+        int partitions = 5;
+        admin.topics().createPartitionedTopic(topicGc, partitions);
+        Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
+        producer3.close();
+        assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions == partitions);
+        runGC();
+        assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+                TopicName.get(topicGc)).join().partitions == 0);
     }
 }