You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/29 08:43:44 UTC
[pulsar] branch master updated: Getting the stats of a
non-persistent topic that has been cleaned causes it to re-appear (#9029)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new b860c05 Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)
b860c05 is described below
commit b860c059eb4d13969469b23c9a3bffd6bf7e5a66
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Dec 29 09:43:20 2020 +0100
Getting the stats of a non-persistent topic that has been cleaned causes it to re-appear (#9029)
If a non-persistent topic is unused it is automatically deleted by Pulsar. If you then get the stats on that topic name using the REST API, it causes that topic to re-appear.
For example, a non-persistent topic `public/bob/np` exists in a namespace. It is returned when using the `admin/v2/non-persistent/public/bob` endpoint:
```
["non-persistent://public/bob/np"]
```
Since this topic is unused, it gets cleaned and no longer is returned by the endpoint:
```
[]
```
However, if you request the stats for that topic using the CLI (which calls the REST API), like this, you actually get a response (not a 404):
```
bin/pulsar-admin topics stats non-persistent://public/bob/np
Warning: Nashorn engine is planned to be removed from a future JDK release
{
"msgRateIn" : 0.0,
"msgThroughputIn" : 0.0,
"msgRateOut" : 0.0,
"msgThroughputOut" : 0.0,
"bytesInCounter" : 0,
"msgInCounter" : 0,
"bytesOutCounter" : 0,
"msgOutCounter" : 0,
"averageMsgSize" : 0.0,
"msgChunkPublished" : false,
"storageSize" : 0,
"backlogSize" : 0,
"publishers" : [ ],
"subscriptions" : { },
"replication" : { }
}
```
And now the topic re-appears on the topic-list endpoint:
```
["non-persistent://public/bob/np"]
```
### Modifications
When loading a temporary topic with createIfMissing = false do not try to create it, simply return an empty value.
Add test case.
This change added tests and can be verified as in the bug description.
Run:
pulsar-admin topics create non-persistent://public/default/tmp
wait for the topic to be deleted
run
pulsar-admin topics stats non-persistent://public/default/tmp
---
.../pulsar/broker/service/BrokerService.java | 17 ++++--
.../broker/service/NonPersistentTopicE2ETest.java | 66 +++++++++++++++++++++-
2 files changed, 77 insertions(+), 6 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index daf90b5..a438e11 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -786,10 +786,19 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
}
}
final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent);
- return topics.computeIfAbsent(topic, (topicName) -> {
- return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing)
- : createNonPersistentTopic(topicName);
- });
+ if (isPersistentTopic) {
+ return topics.computeIfAbsent(topic, (topicName) -> {
+ return this.loadOrCreatePersistentTopic(topicName, createIfMissing);
+ });
+ } else {
+ return topics.computeIfAbsent(topic, (topicName) -> {
+ if (createIfMissing) {
+ return createNonPersistentTopic(topicName);
+ } else {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
+ });
+ }
} catch (IllegalArgumentException e) {
log.warn("[{}] Illegalargument exception when loading topic", topic, e);
return failedFuture(e);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 1bbb20f..fab7a5e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -37,6 +37,8 @@ import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -48,6 +50,8 @@ import static org.testng.Assert.assertTrue;
public class NonPersistentTopicE2ETest extends BrokerTestBase {
+ private static final Logger log = LoggerFactory.getLogger(NonPersistentTopicE2ETest.class);
+
@BeforeMethod
@Override
protected void setup() throws Exception {
@@ -102,8 +106,8 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
assertFalse(topic.isPresent());
assertFalse(topicHasSchema(topicName));
- // 2. Topic is not GCed with live connection
- topicName = "non-persistent://prop/ns-abc/topic-2";
+ // 1a. Topic that add/removes subscription can be GC'd
+ topicName = "non-persistent://prop/ns-abc/topic-1a";
String subName = "sub1";
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
topic = getTopic(topicName);
@@ -111,6 +115,23 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
topic.get().addSchema(schemaData).join();
assertTrue(topicHasSchema(topicName));
+ admin.topics().deleteSubscription(topicName, subName);
+ consumer.close();
+
+ runGC();
+ topic = getTopic(topicName);
+ assertFalse(topic.isPresent());
+ assertFalse(topicHasSchema(topicName));
+
+ // 2. Topic is not GCed with live connection
+ topicName = "non-persistent://prop/ns-abc/topic-2";
+ subName = "sub1";
+ consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+ topic = getTopic(topicName);
+ assertTrue(topic.isPresent());
+ topic.get().addSchema(schemaData).join();
+ assertTrue(topicHasSchema(topicName));
+
runGC();
topic = getTopic(topicName);
assertTrue(topic.isPresent());
@@ -170,4 +191,45 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
producer2.close();
}
+ @Test
+ public void testGC() throws Exception {
+ // 1. Simple successful GC
+ String topicName = "non-persistent://prop/ns-abc/topic-10";
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ producer.close();
+
+ assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+ runGC();
+ assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // 2. Topic is not GCed with live connection
+ String subName = "sub1";
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName).subscribe();
+
+ runGC();
+ assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // 3. Topic with subscription is not GCed even with no connections
+ consumer.close();
+
+ runGC();
+ assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // 4. Topic can be GCed after unsubscribe
+ admin.topics().deleteSubscription(topicName, subName);
+
+ runGC();
+ assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+ // 5. Get the topic and make sure it doesn't come back
+ admin.lookups().lookupTopic(topicName);
+ Optional<Topic> topic = pulsar.getBrokerService().getTopicIfExists(topicName).join();
+ assertFalse(topic.isPresent());
+ assertFalse(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+
+ // write again, the topic will be available
+ Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName).create();
+ producer2.close();
+
+ assertTrue(pulsar.getBrokerService().getTopicReference(topicName).isPresent());
+ }
}