You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/12 13:20:08 UTC

[pulsar] branch master updated: Add tests and documentation for subscribing to non-persistent with topic pattern (#7240)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ff532d  Add tests and documentation for subscribing to non-persistent with topic pattern (#7240)
3ff532d is described below

commit 3ff532d5fe5ae44fb13f72ed780f2ffd000ba6cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jun 12 21:19:49 2020 +0800

    Add tests and documentation for subscribing to non-persistent with topic pattern (#7240)
    
    Fixes #7205
    
    Motivation
    Add tests and documentation for subscribing to non-persistent with topic pattern
---
 .../broker/service/NonPersistentTopicE2ETest.java  | 45 ++++++++++++++++++++++
 site2/docs/client-libraries-java.md                | 19 ++++++++-
 2 files changed, 62 insertions(+), 2 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 9b429d3..c5a94db 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -19,17 +19,25 @@
 package org.apache.pulsar.broker.service;
 
 import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import lombok.Data;
 
 import org.apache.pulsar.broker.service.schema.SchemaRegistry;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.schema.SchemaDefinition;
 import org.apache.pulsar.client.impl.schema.JSONSchema;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.protocol.schema.SchemaData;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -125,4 +133,41 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
         assertFalse(topicHasSchema(topicName));
     }
 
+    @Test
+    public void testPatternTopic() throws PulsarClientException, InterruptedException {
+        final String topic1 = "non-persistent://prop/ns-abc/testPatternTopic1-" + UUID.randomUUID().toString();
+        final String topic2 = "non-persistent://prop/ns-abc/testPatternTopic2-" + UUID.randomUUID().toString();
+        Pattern pattern = Pattern.compile("prop/ns-abc/test.*");
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+                .topicsPattern(pattern)
+                .subscriptionName("my-sub")
+                .patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
+                .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
+                .subscribe();
+
+        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic1)
+                .create();
+
+        Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic2)
+                .create();
+
+        Thread.sleep(2000);
+        final int messages = 10;
+        for (int i = 0; i < messages; i++) {
+            producer1.send("Message sent by producer-1 -> " + i);
+            producer2.send("Message sent by producer-2 -> " + i);
+        }
+
+        for (int i = 0; i < messages * 2; i++) {
+            Message<String> received = consumer.receive(3, TimeUnit.SECONDS);
+            Assert.assertNotNull(received);
+        }
+
+        consumer.close();
+        producer1.close();
+        producer2.close();
+    }
+
 }
diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index 68a9256..516042e 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -379,18 +379,33 @@ ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
         .subscriptionName(subscription);
 
 // Subscribe to all topics in a namespace
-Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
+Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
 Consumer allTopicsConsumer = consumerBuilder
         .topicsPattern(allTopicsInNamespace)
         .subscribe();
 
 // Subscribe to a subsets of topics in a namespace, based on regex
-Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
+Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
 Consumer allTopicsConsumer = consumerBuilder
         .topicsPattern(someTopicsInNamespace)
         .subscribe();
 ```
 
+In the above example, the consumer subscribes to the `persistent` topics that can match the topic name pattern. If you want the consumer subscribes to all `persistent` and `non-persistent` topics that can match the topic name pattern, set `subscriptionTopicsMode` to `RegexSubscriptionMode.AllTopics`.
+
+```java
+Pattern pattern = Pattern.compile("public/default/.*");
+pulsarClient.newConsumer()
+        .subscriptionName("my-sub")
+        .topicsPattern(pattern)
+        .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
+        .subscribe();
+```
+
+> #### Note
+> 
+> By default, the `subscriptionTopicsMode` of the consumer is `PersistentOnly`. Available options of `subscriptionTopicsMode` are `PersistentOnly`, `NonPersistentOnly`, and `AllTopics`.
+
 You can also subscribe to an explicit list of topics (across namespaces if you wish):
 
 ```java