You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/01/07 02:19:20 UTC

[pulsar] 03/03: Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 56680581a1212f39b84a00717142b4062fa8de5d
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Dec 29 09:43:20 2020 +0100

    Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)
    
    If a non-persistent topic is unused it is automatically deleted by Pulsar. If you then get the stats on that topic name using the REST API, it causes that topic to re-appear.
    
    For example, a non-persistent topic `public/bob/np` exists in a namespace. It is returned when using the `admin/v2/non-persistent/public/bob` endpoint:
    
    ```
    ["non-persistent://public/bob/np"]
    ```
    
    Since this topic is unused, it gets cleaned and no longer is returned by the endpoint:
    
    ```
    []
    ```
    
    However, if you request the stats for that topic using the CLI (which calls the REST API), like this, you actually get a response (not a 404):
    
    ```
    bin/pulsar-admin topics stats non-persistent://public/bob/np
    Warning: Nashorn engine is planned to be removed from a future JDK release
    {
      "msgRateIn" : 0.0,
      "msgThroughputIn" : 0.0,
      "msgRateOut" : 0.0,
      "msgThroughputOut" : 0.0,
      "bytesInCounter" : 0,
      "msgInCounter" : 0,
      "bytesOutCounter" : 0,
      "msgOutCounter" : 0,
      "averageMsgSize" : 0.0,
      "msgChunkPublished" : false,
      "storageSize" : 0,
      "backlogSize" : 0,
      "publishers" : [ ],
      "subscriptions" : { },
      "replication" : { }
    }
    ```
    
    And now the topic re-appears on the topic-list endpoint:
    
    ```
    ["non-persistent://public/bob/np"]
    ```
    
    ### Modifications
    When loading a temporary topic with createIfMissing = false do not try to create it, simply return an empty value.
    Add test case.
    
    This change added tests and can be verified as in the bug description.
    
    Run:
    pulsar-admin topics create non-persistent://public/default/tmp
    wait for the topic to be deleted
    run
    pulsar-admin topics stats non-persistent://public/default/tmp
    
    (cherry picked from commit b860c059eb4d13969469b23c9a3bffd6bf7e5a66)
---
 .../pulsar/broker/service/BrokerService.java       | 17 ++++--
 .../broker/service/NonPersistentTopicE2ETest.java  | 66 +++++++++++++++++++++-
 2 files changed, 77 insertions(+), 6 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 5348397..32bf07e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -759,10 +759,19 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
                 }
             }
             final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
-            return topics.computeIfAbsent(topic, (topicName) -> {
-                    return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing)
-                        : createNonPersistentTopic(topicName);
-            });
+            if (isPersistentTopic) {
+                return topics.computeIfAbsent(topic, (topicName) -> {
+                    return this.loadOrCreatePersistentTopic(topicName, createIfMissing);
+                });
+            } else {
+                return topics.computeIfAbsent(topic, (topicName) -> {
+                    if (createIfMissing) {
+                        return createNonPersistentTopic(topicName);
+                    } else {
+                        return CompletableFuture.completedFuture(Optional.empty());
+                    }
+                });
+            }
         } catch (IllegalArgumentException e) {
             log.warn("[{}] Illegalargument exception when loading topic", topic, e);
             return failedFuture(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 1bbb20f..fab7a5e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -37,6 +37,8 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -48,6 +50,8 @@ import static org.testng.Assert.assertTrue;
 
 public class NonPersistentTopicE2ETest extends BrokerTestBase {
 
+    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicE2ETest.class);
+
     @BeforeMethod
     @Override
     protected void setup() throws Exception {
@@ -102,8 +106,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         assertFalse(topic.isPresent());
         assertFalse(topicHasSchema(topicName));
 
-        // 2. Topic is not GCed with live connection
-        topicName = "non-persistent://prop/ns-abc/topic-2";
+        // 1a. Topic that add/removes subscription can be GC'd
+        topicName = "non-persistent://prop/ns-abc/topic-1a";
         String subName = "sub1";
         Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
         topic = getTopic(topicName);
@@ -111,6 +115,23 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         topic.get().addSchema(schemaData).join();
         assertTrue(topicHasSchema(topicName));
 
+        admin.topics().deleteSubscription(topicName, subName);
+        consumer.close();
+
+        runGC();
+        topic = getTopic(topicName);
+        assertFalse(topic.isPresent());
+        assertFalse(topicHasSchema(topicName));
+
+        // 2. Topic is not GCed with live connection
+        topicName = "non-persistent://prop/ns-abc/topic-2";
+        subName = "sub1";
+        consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+        topic = getTopic(topicName);
+        assertTrue(topic.isPresent());
+        topic.get().addSchema(schemaData).join();
+        assertTrue(topicHasSchema(topicName));
+
         runGC();
         topic = getTopic(topicName);
         assertTrue(topic.isPresent());
@@ -170,4 +191,45 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         producer2.close();
     }
 
+    @Test
+    public void testGC() throws Exception {
+        // 1. Simple successful GC
+        String topicName = "non-persistent://prop/ns-abc/topic-10";
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        producer.close();
+
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+        runGC();
+        assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 2. Topic is not GCed with live connection
+        String subName = "sub1";
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+        runGC();
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 3. Topic with subscription is not GCed even with no connections
+        consumer.close();
+
+        runGC();
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // 4. Topic can be GCed after unsubscribe
+        admin.topics().deleteSubscription(topicName, subName);
+
+        runGC();
+        assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+        // 5. Get the topic and make sure it doesn't come back
+        admin.lookups().lookupTopic(topicName);
+        Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join();
+        assertFalse(topic.isPresent());
+        assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+        // write again, the topic will be available
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
+        producer2.close();
+
+        assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+    }
 }