You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/28 17:01:48 UTC
[incubator-pulsar] branch master updated: PIP-13-3/3: auto
subscribe based on regex pattern topics changing (#1298)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 518120d PIP-13-3/3: auto subscribe based on regex pattern topics changing (#1298)
518120d is described below
commit 518120df6a202dfcfd40f8ea657632f6d5a94a61
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Wed Feb 28 09:01:46 2018 -0800
PIP-13-3/3: auto subscribe based on regex pattern topics changing (#1298)
* auto discovery of pattern subscribe topics changes; auto sub/unsub
* change following @Matteo's comments
* change to use timer
---
.../client/impl/PatternTopicsConsumerImplTest.java | 355 ++++++++++++++++++++-
.../apache/pulsar/client/api/ConsumerBuilder.java | 9 +
.../pulsar/client/impl/ConsumerBuilderImpl.java | 7 +
.../client/impl/PatternTopicsConsumerImpl.java | 139 +++++++-
.../pulsar/client/impl/PulsarClientImpl.java | 30 +-
.../pulsar/client/impl/TopicsConsumerImpl.java | 11 +-
.../impl/conf/ConsumerConfigurationData.java | 2 +
7 files changed, 535 insertions(+), 18 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 26f3277..c82c525 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.Mockito.doReturn;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -28,14 +29,14 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import java.util.stream.IntStream;
+import org.apache.pulsar.broker.namespace.NamespaceService;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
import org.apache.pulsar.client.api.ProducerConsumerBase;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.policies.data.PropertyAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,8 +137,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
admin.persistentTopics().createPartitionedTopic(topicName3, 3);
// 2. create producer
- ProducerConfiguration producerConfiguration = new ProducerConfiguration();
- producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
String messagePredicate = "my-message-" + key + "-";
int totalMessages = 30;
@@ -152,6 +151,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
Consumer consumer = pulsarClient.newConsumer()
.topicsPattern(pattern)
+ .patternAutoDiscoveryPeriod(2)
.subscriptionName(subscriptionName)
.subscriptionType(SubscriptionType.Shared)
.ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
@@ -201,4 +201,351 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
producer3.close();
}
+ @Test(timeOut = testTimeout)
+ public void testTopicsPatternFilter() throws Exception {
+ String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1";
+ String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2";
+ String topicName3 = "persistent://prop/use/ns-abc/hello-3";
+
+ List<String> topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+ Pattern pattern1 = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+ List<String> result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1);
+ assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2));
+
+ Pattern pattern2 = Pattern.compile("persistent://prop/use/ns-abc/.*");
+ List<String> result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2);
+ assertTrue(result2.size() == 3 &&
+ result2.contains(topicName1) &&
+ result2.contains(topicName2) &&
+ result2.contains(topicName3));
+ }
+
+ @Test(timeOut = testTimeout)
+ public void testTopicsListMinus() throws Exception {
+ String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1";
+ String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2";
+ String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3";
+ String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4";
+ String topicName5 = "persistent://prop/use/ns-abc/pattern-topic-5";
+ String topicName6 = "persistent://prop/use/ns-abc/pattern-topic-6";
+
+ List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
+ List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6);
+
+ List<String> addedNames = PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
+ List<String> removedNames = PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
+
+ assertTrue(addedNames.size() == 2 &&
+ addedNames.contains(topicName5) &&
+ addedNames.contains(topicName6));
+ assertTrue(removedNames.size() == 2 &&
+ removedNames.contains(topicName1) &&
+ removedNames.contains(topicName2));
+
+ // totally 2 different list, should return content of first lists.
+ List<String> addedNames2 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
+ assertTrue(addedNames2.size() == 2 &&
+ addedNames2.contains(topicName5) &&
+ addedNames2.contains(topicName6));
+
+ // 2 same list, should return empty list.
+ List<String> addedNames3 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
+ assertEquals(addedNames3.size(), 0);
+
+ // empty list minus: addedNames2.size = 2, addedNames3.size = 0
+ List<String> addedNames4 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
+ assertTrue(addedNames4.size() == addedNames2.size());
+ addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));
+
+ List<String> addedNames5 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
+ assertEquals(addedNames5.size(), 0);
+ }
+
+ // simulate subscribe a pattern which has no topics, but then matched topics added in.
+ @Test(timeOut = testTimeout)
+ public void testStartEmptyPatternConsumer() throws Exception {
+ String key = "StartEmptyPatternConsumerTest";
+ String subscriptionName = "my-ex-subscription-" + key;
+ String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+ String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+ String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+ Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+ // 1. create partition
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ // 2. Create consumer, this should success, but with empty sub-consumser internal
+ Consumer consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ .patternAutoDiscoveryPeriod(2)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ // 3. verify consumer get methods, to get 0 number of partitions and topics.
+ assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 0);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 0);
+
+ // 4. create producer
+ String messagePredicate = "my-message-" + key + "-";
+ int totalMessages = 30;
+
+ Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+ .create();
+ Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+ Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ // 5. call recheckTopics to subscribe each added topics above
+ log.debug("recheck topics change");
+ PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer);
+ consumer1.run(consumer1.getRecheckPatternTimeout());
+ Thread.sleep(100);
+
+ // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3.
+ assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+
+ // 7. produce data
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-" + i).getBytes());
+ producer2.send((messagePredicate + "producer2-" + i).getBytes());
+ producer3.send((messagePredicate + "producer3-" + i).getBytes());
+ }
+
+ // 8. should receive all the message
+ int messageSet = 0;
+ Message message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ }
+
+ // simulate subscribe a pattern which has 3 topics, but then matched topic added in.
+ @Test(timeOut = testTimeout)
+ public void testAutoSubscribePatternConsumer() throws Exception {
+ String key = "AutoSubscribePatternConsumer";
+ String subscriptionName = "my-ex-subscription-" + key;
+ String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+ String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+ String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+ Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+ // 1. create partition
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ // 2. create producer
+ String messagePredicate = "my-message-" + key + "-";
+ int totalMessages = 30;
+
+ Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+ .create();
+ Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+ Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ Consumer consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ .patternAutoDiscoveryPeriod(2)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+
+ // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
+ assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+ // 5. produce data to topic 1,2,3; verify should receive all the message
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-" + i).getBytes());
+ producer2.send((messagePredicate + "producer2-" + i).getBytes());
+ producer3.send((messagePredicate + "producer3-" + i).getBytes());
+ }
+
+ int messageSet = 0;
+ Message message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ // 6. create another producer with 4 partitions
+ String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4-" + key;
+ admin.persistentTopics().createPartitionedTopic(topicName4, 4);
+ Producer producer4 = pulsarClient.newProducer().topic(topicName4)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4
+ log.debug("recheck topics change");
+ PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer);
+ consumer1.run(consumer1.getRecheckPatternTimeout());
+ Thread.sleep(100);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 10);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 10);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 4);
+
+ // 8. produce data to topic3 and topic4, verify should receive all the message
+ for (int i = 0; i < totalMessages / 2; i++) {
+ producer3.send((messagePredicate + "round2-producer4-" + i).getBytes());
+ producer4.send((messagePredicate + "round2-producer4-" + i).getBytes());
+ }
+
+ messageSet = 0;
+ message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ producer4.close();
+ }
+
+ @Test(timeOut = testTimeout)
+ public void testAutoUnbubscribePatternConsumer() throws Exception {
+ String key = "AutoUnsubscribePatternConsumer";
+ String subscriptionName = "my-ex-subscription-" + key;
+ String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+ String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+ String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+ Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+ // 1. create partition
+ admin.properties().createProperty("prop", new PropertyAdmin());
+ admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+ admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+ // 2. create producer
+ String messagePredicate = "my-message-" + key + "-";
+ int totalMessages = 30;
+
+ Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+ .create();
+ Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+ Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+ .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+ .create();
+
+ Consumer consumer = pulsarClient.newConsumer()
+ .topicsPattern(pattern)
+ .patternAutoDiscoveryPeriod(2)
+ .subscriptionName(subscriptionName)
+ .subscriptionType(SubscriptionType.Shared)
+ .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+ .receiverQueueSize(4)
+ .subscribe();
+
+ assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+
+ // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
+ assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+ // 5. produce data to topic 1,2,3; verify should receive all the message
+ for (int i = 0; i < totalMessages / 3; i++) {
+ producer1.send((messagePredicate + "producer1-" + i).getBytes());
+ producer2.send((messagePredicate + "producer2-" + i).getBytes());
+ producer3.send((messagePredicate + "producer3-" + i).getBytes());
+ }
+
+ int messageSet = 0;
+ Message message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ // 6. remove producer 1,3; verify only consumer 2 left
+ // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
+ List<String> topicNames = Lists.newArrayList(topicName2);
+ NamespaceService nss = pulsar.getNamespaceService();
+ doReturn(topicNames).when(nss).getListOfTopics(NamespaceName.get("prop", "use", "ns-abc"));
+
+ // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
+ log.debug("recheck topics change");
+ PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer);
+ consumer1.run(consumer1.getRecheckPatternTimeout());
+ Thread.sleep(100);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 2);
+ assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 1);
+
+ // 8. produce data to topic2, verify should receive all the message
+ for (int i = 0; i < totalMessages; i++) {
+ producer2.send((messagePredicate + "round2-producer2-" + i).getBytes());
+ }
+
+ messageSet = 0;
+ message = consumer.receive();
+ do {
+ assertTrue(message instanceof TopicMessageImpl);
+ messageSet ++;
+ consumer.acknowledge(message);
+ log.debug("Consumer acknowledged : " + new String(message.getData()));
+ message = consumer.receive(500, TimeUnit.MILLISECONDS);
+ } while (message != null);
+ assertEquals(messageSet, totalMessages);
+
+ consumer.unsubscribe();
+ consumer.close();
+ producer1.close();
+ producer2.close();
+ producer3.close();
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index cf63320..c487425 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -232,6 +232,15 @@ public interface ConsumerBuilder extends Serializable, Cloneable {
ConsumerBuilder readCompacted(boolean readCompacted);
/**
+ * Set topics auto discovery period when using a pattern for topics consumer.
+ * The period is in minute, and default and minimum value is 1 minute.
+ *
+ * @param periodInMinutes
+ * whether to read from the compacted topic
+ */
+ ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes);
+
+ /**
* Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
* messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
* In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index edaeb5b..4c9caa1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -201,4 +201,11 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
conf.setReadCompacted(readCompacted);
return this;
}
+
+ @Override
+ public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) {
+ conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
+ return this;
+ }
+
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
index b09ad48..04445ea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
@@ -18,16 +18,32 @@
*/
package org.apache.pulsar.client.impl;
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
+import java.util.stream.Collectors;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PatternTopicsConsumerImpl extends TopicsConsumerImpl {
+public class PatternTopicsConsumerImpl extends TopicsConsumerImpl implements TimerTask {
private final Pattern topicsPattern;
+ private final TopicsChangedListener topicsChangeListener;
+ private volatile Timeout recheckPatternTimeout = null;
public PatternTopicsConsumerImpl(Pattern topicsPattern,
PulsarClientImpl client,
@@ -36,11 +52,132 @@ public class PatternTopicsConsumerImpl extends TopicsConsumerImpl {
CompletableFuture<Consumer> subscribeFuture) {
super(client, conf, listenerExecutor, subscribeFuture);
this.topicsPattern = topicsPattern;
+
+ if (this.namespaceName == null) {
+ this.namespaceName = getNameSpaceFromPattern(topicsPattern);
+ }
+ checkArgument(getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString()));
+
+ this.topicsChangeListener = new PatternTopicsChangedListener();
+ recheckPatternTimeout = client.timer().newTimeout(this, Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES);
+ }
+
+ public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
+ return TopicName.get(pattern.pattern()).getNamespaceObject();
+ }
+
+ // TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change.
+ @Override
+ public void run(Timeout timeout) throws Exception {
+ if (timeout.isCancelled()) {
+ return;
+ }
+
+ CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
+ List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(2);
+
+ client.getLookup().getTopicsUnderNamespace(namespaceName).thenAccept(topics -> {
+ if (log.isDebugEnabled()) {
+ log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size());
+ topics.forEach(topicName ->
+ log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName));
+ }
+
+ List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
+ List<String> oldTopics = PatternTopicsConsumerImpl.this.getTopics();
+
+ futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics)));
+ futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics)));
+ FutureUtil.waitForAll(futures)
+ .thenAccept(finalFuture -> recheckFuture.complete(null))
+ .exceptionally(ex -> {
+ log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
+ recheckFuture.completeExceptionally(ex);
+ return null;
+ });
+ });
+
+ // schedule the next re-check task
+ client.timer().newTimeout(PatternTopicsConsumerImpl.this,
+ Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES);
}
public Pattern getPattern() {
return this.topicsPattern;
}
+ interface TopicsChangedListener {
+ // unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics.
+ CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics);
+ // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`.
+ CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
+ }
+
+ private class PatternTopicsChangedListener implements TopicsChangedListener {
+ @Override
+ public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) {
+ CompletableFuture<Void> removeFuture = new CompletableFuture<>();
+
+ if (removedTopics.isEmpty()) {
+ removeFuture.complete(null);
+ return removeFuture;
+ }
+
+ List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
+ removedTopics.stream().forEach(topic -> futures.add(unsubscribeAsync(topic)));
+ FutureUtil.waitForAll(futures)
+ .thenAccept(finalFuture -> removeFuture.complete(null))
+ .exceptionally(ex -> {
+ log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage());
+ removeFuture.completeExceptionally(ex);
+ return null;
+ });
+ return removeFuture;
+ }
+
+ @Override
+ public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
+ CompletableFuture<Void> addFuture = new CompletableFuture<>();
+
+ if (addedTopics.isEmpty()) {
+ addFuture.complete(null);
+ return addFuture;
+ }
+
+ List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
+ addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic)));
+ FutureUtil.waitForAll(futures)
+ .thenAccept(finalFuture -> addFuture.complete(null))
+ .exceptionally(ex -> {
+ log.warn("[{}] Failed to unsubscribe topics: {}", topic, ex.getMessage());
+ addFuture.completeExceptionally(ex);
+ return null;
+ });
+ return addFuture;
+ }
+ }
+
+ // get topics, which are contained in list1, and not in list2
+ public static List<String> topicsListsMinus(List<String> list1, List<String> list2) {
+ HashSet<String> s1 = new HashSet<>(list1);
+ s1.removeAll(list2);
+ return s1.stream().collect(Collectors.toList());
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ Timeout timeout = recheckPatternTimeout;
+ if (timeout != null) {
+ timeout.cancel();
+ recheckPatternTimeout = null;
+ }
+ return super.closeAsync();
+ }
+
+ @VisibleForTesting
+ Timeout getRecheckPatternTimeout() {
+ return recheckPatternTimeout;
+ }
+
private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c07ec8b..fd3cada 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
@@ -402,13 +403,13 @@ public class PulsarClientImpl implements PulsarClient {
CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
lookup.getTopicsUnderNamespace(namespaceName)
.thenAccept(topics -> {
- List<String> topicsList = topics.stream()
- .filter(topic -> {
- TopicName destinationName = TopicName.get(topic);
- checkState(destinationName.getNamespaceObject().equals(namespaceName));
- return conf.getTopicsPattern().matcher(destinationName.toString()).matches();
- })
- .collect(Collectors.toList());
+ if (log.isDebugEnabled()) {
+ log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size());
+ topics.forEach(topicName ->
+ log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName));
+ }
+
+ List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern());
conf.getTopicNames().addAll(topicsList);
ConsumerBase consumer = new PatternTopicsConsumerImpl(conf.getTopicsPattern(),
PulsarClientImpl.this,
@@ -429,6 +430,17 @@ public class PulsarClientImpl implements PulsarClient {
return consumerSubscribedFuture;
}
+ // get topics that match 'topicsPattern' from original topics list
+ // return result should contain only topic names, without partition part
+ public static List<String> topicsPatternFilter(List<String> original, Pattern topicsPattern) {
+ return original.stream()
+ .filter(topic -> {
+ TopicName destinationName = TopicName.get(topic);
+ return topicsPattern.matcher(destinationName.toString()).matches();
+ })
+ .collect(Collectors.toList());
+ }
+
@Override
public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
throws PulsarClientException {
@@ -619,6 +631,10 @@ public class PulsarClientImpl implements PulsarClient {
return eventLoopGroup;
}
+ public LookupService getLookup() {
+ return lookup;
+ }
+
public CompletableFuture<Integer> getNumberOfPartitions(String topic) {
return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index 492c014..4a8e663 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -38,7 +38,6 @@ import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
@@ -64,7 +63,7 @@ public class TopicsConsumerImpl extends ConsumerBase {
private final ConcurrentHashMap<String, ConsumerImpl> consumers;
// Map <topic, partitionNumber>, store partition number for each topic
- private final ConcurrentHashMap<String, Integer> topics;
+ protected final ConcurrentHashMap<String, Integer> topics;
// Queue of partition consumers on which we have stopped calling receiveAsync() because the
// shared incoming queue was full
@@ -146,8 +145,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
// - every topic has same namespace,
// - topic names are unique.
private static boolean topicNamesValid(Collection<String> topics) {
- checkState(topics != null && topics.size() > 1,
- "topics should should contain more than 1 topics");
+ checkState(topics != null && topics.size() >= 1,
+ "topics should should contain more than 1 topic");
final String namespace = TopicName.get(topics.stream().findFirst().get()).getNamespace();
@@ -689,8 +688,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
consumers.values().stream()
.filter(consumer1 -> {
String consumerTopicName = consumer1.getTopic();
- if (TopicName.get(consumerTopicName)
- .getPartitionedTopicName().equals(topicName)) {
+ if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+ TopicName.get(topicName).getPartitionedTopicName().toString())) {
return true;
} else {
return false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 5acfe9d..72df4ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -75,6 +75,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable {
private boolean readCompacted = false;
+ private int patternAutoDiscoveryPeriod = 1;
+
@JsonIgnore
public String getSingleTopic() {
checkArgument(topicNames.size() == 1);
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.