You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/06/12 13:20:08 UTC
[pulsar] branch master updated: Add tests and documentation for
subscribing to non-persistent with topic pattern (#7240)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3ff532d Add tests and documentation for subscribing to non-persistent with topic pattern (#7240)
3ff532d is described below
commit 3ff532d5fe5ae44fb13f72ed780f2ffd000ba6cd
Author: lipenghui <pe...@apache.org>
AuthorDate: Fri Jun 12 21:19:49 2020 +0800
Add tests and documentation for subscribing to non-persistent with topic pattern (#7240)
Fixes #7205
Motivation
Add tests and documentation for subscribing to non-persistent with topic pattern
---
.../broker/service/NonPersistentTopicE2ETest.java | 45 ++++++++++++++++++++++
site2/docs/client-libraries-java.md | 19 ++++++++-
2 files changed, 62 insertions(+), 2 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
index 9b429d3..c5a94db 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/NonPersistentTopicE2ETest.java
@@ -19,17 +19,25 @@
package org.apache.pulsar.broker.service;
import java.util.Optional;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
import lombok.Data;
import org.apache.pulsar.broker.service.schema.SchemaRegistry;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaType;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
@@ -125,4 +133,41 @@ public class NonPersistentTopicE2ETest extends BrokerTestBase {
assertFalse(topicHasSchema(topicName));
}
+ @Test
+ public void testPatternTopic() throws PulsarClientException, InterruptedException {
+ final String topic1 = "non-persistent://prop/ns-abc/testPatternTopic1-" + UUID.randomUUID().toString();
+ final String topic2 = "non-persistent://prop/ns-abc/testPatternTopic2-" + UUID.randomUUID().toString();
+ Pattern pattern = Pattern.compile("prop/ns-abc/test.*");
+ Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+ .topicsPattern(pattern)
+ .subscriptionName("my-sub")
+ .patternAutoDiscoveryPeriod(1, TimeUnit.SECONDS)
+ .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
+ .subscribe();
+
+ Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic1)
+ .create();
+
+ Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+ .topic(topic2)
+ .create();
+
+ Thread.sleep(2000);
+ final int messages = 10;
+ for (int i = 0; i < messages; i++) {
+ producer1.send("Message sent by producer-1 -> " + i);
+ producer2.send("Message sent by producer-2 -> " + i);
+ }
+
+ for (int i = 0; i < messages * 2; i++) {
+ Message<String> received = consumer.receive(3, TimeUnit.SECONDS);
+ Assert.assertNotNull(received);
+ }
+
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ }
+
}
diff --git a/site2/docs/client-libraries-java.md b/site2/docs/client-libraries-java.md
index 68a9256..516042e 100644
--- a/site2/docs/client-libraries-java.md
+++ b/site2/docs/client-libraries-java.md
@@ -379,18 +379,33 @@ ConsumerBuilder consumerBuilder = pulsarClient.newConsumer()
.subscriptionName(subscription);
// Subscribe to all topics in a namespace
-Pattern allTopicsInNamespace = Pattern.compile("persistent://public/default/.*");
+Pattern allTopicsInNamespace = Pattern.compile("public/default/.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(allTopicsInNamespace)
.subscribe();
// Subscribe to a subsets of topics in a namespace, based on regex
-Pattern someTopicsInNamespace = Pattern.compile("persistent://public/default/foo.*");
+Pattern someTopicsInNamespace = Pattern.compile("public/default/foo.*");
Consumer allTopicsConsumer = consumerBuilder
.topicsPattern(someTopicsInNamespace)
.subscribe();
```
+In the above example, the consumer subscribes to the `persistent` topics that can match the topic name pattern. If you want the consumer subscribes to all `persistent` and `non-persistent` topics that can match the topic name pattern, set `subscriptionTopicsMode` to `RegexSubscriptionMode.AllTopics`.
+
+```java
+Pattern pattern = Pattern.compile("public/default/.*");
+pulsarClient.newConsumer()
+ .subscriptionName("my-sub")
+ .topicsPattern(pattern)
+ .subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
+ .subscribe();
+```
+
+> #### Note
+>
+> By default, the `subscriptionTopicsMode` of the consumer is `PersistentOnly`. Available options of `subscriptionTopicsMode` are `PersistentOnly`, `NonPersistentOnly`, and `AllTopics`.
+
You can also subscribe to an explicit list of topics (across namespaces if you wish):
```java