You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/11/24 00:40:25 UTC
[pulsar] branch branch-2.8 updated: Clean up the metadata of the non-persistent partitioned topics. (#12910)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.8 by this push:
new b6b0b0e Clean up the metadata of the non-persistent partitioned topics. (#12910)
b6b0b0e is described below
commit b6b0b0ead75fbfa3d45b748ae2e8cbefa3d4efe9
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Wed Nov 24 08:38:49 2021 +0800
Clean up the metadata of the non-persistent partitioned topics. (#12910)
---
.../service/nonpersistent/NonPersistentTopic.java | 29 ++++++++++++++++++++++
.../broker/service/NonPersistentTopicE2ETest.java | 14 +++++++++++
2 files changed, 43 insertions(+)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 7ae87f3..0180f5d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -40,6 +40,8 @@ import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.broker.cache.ConfigurationCacheService;
+import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
@@ -870,6 +872,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
stopReplProducers().thenCompose(v -> delete(true, false, true))
+ .thenAccept(__ -> tryToDeletePartitionedMetadata())
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
Throwable throwable = e.getCause();
@@ -891,6 +894,32 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
}
}
+ private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
+ if (TopicName.get(topic).isPartitioned() && !deletePartitionedTopicMetadataWhileInactive()) {
+ return CompletableFuture.completedFuture(null);
+ }
+ TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
+ try {
+ NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
+ .getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
+ String path = partitionedTopicPath(topicName);
+ if (!partitionedTopicResources.exists(path)) {
+ return CompletableFuture.completedFuture(null);
+ }
+ return partitionedTopicResources.deleteAsync(path);
+ } catch (Exception e) {
+ return FutureUtil.failedFuture(e);
+ }
+ }
+
+ private static String partitionedTopicPath(TopicName topicName) {
+ return String.format("%s/%s/%s/%s",
+ ConfigurationCacheService.PARTITIONED_TOPICS_ROOT,
+ topicName.getNamespace(),
+ topicName.getDomain(),
+ topicName.getEncodedLocalName());
+ }
+
@Override
public void checkInactiveSubscriptions() {
TopicName name = TopicName.get(topic);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 1543d3e..9525944 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -52,6 +52,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
@BeforeMethod(alwaysRun = true)
@Override
protected void setup() throws Exception {
+ conf.setBrokerDeleteInactivePartitionedTopicMetadataEnabled(true);
+ conf.setBrokerDeleteInactiveTopicsFrequencySeconds(1);
super.baseSetup();
}
@@ -228,5 +230,17 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
producer2.close();
assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // 6. Test for partitioned topic to delete the partitioned metadata
+ String topicGc = "non-persistent://prop/ns-abc/topic-gc";
+ int partitions = 5;
+ admin.topics().createPartitionedTopic(topicGc, partitions);
+ Producer<byte[]> producer3 = pulsarClient.newProducer().topic(topicGc).create();
+ producer3.close();
+ assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ TopicName.get(topicGc)).join().partitions == partitions);
+ runGC();
+ assertTrue(pulsar.getBrokerService().fetchPartitionedTopicMetadataAsync(
+ TopicName.get(topicGc)).join().partitions == 0);
}
}