You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/28 17:01:48 UTC

[incubator-pulsar] branch master updated: PIP-13-3/3: auto subscribe based on regex pattern topics changing (#1298)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 518120d  PIP-13-3/3: auto subscribe based on regex pattern topics changing (#1298)
518120d is described below

commit 518120df6a202dfcfd40f8ea657632f6d5a94a61
Author: Jia Zhai <zh...@gmail.com>
AuthorDate: Wed Feb 28 09:01:46 2018 -0800

    PIP-13-3/3: auto subscribe based on regex pattern topics changing (#1298)
    
    * auto discovery of pattern subscribe topics changes; auto sub/unsub
    
    * change following @Matteo's comments
    
    * change to use timer
---
 .../client/impl/PatternTopicsConsumerImplTest.java | 355 ++++++++++++++++++++-
 .../apache/pulsar/client/api/ConsumerBuilder.java  |   9 +
 .../pulsar/client/impl/ConsumerBuilderImpl.java    |   7 +
 .../client/impl/PatternTopicsConsumerImpl.java     | 139 +++++++-
 .../pulsar/client/impl/PulsarClientImpl.java       |  30 +-
 .../pulsar/client/impl/TopicsConsumerImpl.java     |  11 +-
 .../impl/conf/ConsumerConfigurationData.java       |   2 +
 7 files changed, 535 insertions(+), 18 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 26f3277..c82c525 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.Mockito.doReturn;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
@@ -28,14 +29,14 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.IntStream;
+import org.apache.pulsar.broker.namespace.NamespaceService;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.Producer;
-import org.apache.pulsar.client.api.ProducerConfiguration;
-import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.PropertyAdmin;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -136,8 +137,6 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         admin.persistentTopics().createPartitionedTopic(topicName3, 3);
 
         // 2. create producer
-        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
-        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
         String messagePredicate = "my-message-" + key + "-";
         int totalMessages = 30;
 
@@ -152,6 +151,7 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
 
         Consumer consumer = pulsarClient.newConsumer()
             .topicsPattern(pattern)
+            .patternAutoDiscoveryPeriod(2)
             .subscriptionName(subscriptionName)
             .subscriptionType(SubscriptionType.Shared)
             .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
@@ -201,4 +201,351 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         producer3.close();
     }
 
+    @Test(timeOut = testTimeout)
+    public void testTopicsPatternFilter() throws Exception {
+        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1";
+        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2";
+        String topicName3 = "persistent://prop/use/ns-abc/hello-3";
+
+        List<String> topicsNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        Pattern pattern1 = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+        List<String> result1 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern1);
+        assertTrue(result1.size() == 2 && result1.contains(topicName1) && result1.contains(topicName2));
+
+        Pattern pattern2 = Pattern.compile("persistent://prop/use/ns-abc/.*");
+        List<String> result2 = PulsarClientImpl.topicsPatternFilter(topicsNames, pattern2);
+        assertTrue(result2.size() == 3 &&
+            result2.contains(topicName1) &&
+            result2.contains(topicName2) &&
+            result2.contains(topicName3));
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testTopicsListMinus() throws Exception {
+        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1";
+        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2";
+        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3";
+        String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4";
+        String topicName5 = "persistent://prop/use/ns-abc/pattern-topic-5";
+        String topicName6 = "persistent://prop/use/ns-abc/pattern-topic-6";
+
+        List<String> oldNames = Lists.newArrayList(topicName1, topicName2, topicName3, topicName4);
+        List<String> newNames = Lists.newArrayList(topicName3, topicName4, topicName5, topicName6);
+
+        List<String> addedNames = PatternTopicsConsumerImpl.topicsListsMinus(newNames, oldNames);
+        List<String> removedNames = PatternTopicsConsumerImpl.topicsListsMinus(oldNames, newNames);
+
+        assertTrue(addedNames.size() == 2 &&
+            addedNames.contains(topicName5) &&
+            addedNames.contains(topicName6));
+        assertTrue(removedNames.size() == 2 &&
+            removedNames.contains(topicName1) &&
+            removedNames.contains(topicName2));
+
+        // totally 2 different list, should return content of first lists.
+        List<String> addedNames2 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, removedNames);
+        assertTrue(addedNames2.size() == 2 &&
+            addedNames2.contains(topicName5) &&
+            addedNames2.contains(topicName6));
+
+        // 2 same list, should return empty list.
+        List<String> addedNames3 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames, addedNames);
+        assertEquals(addedNames3.size(), 0);
+
+        // empty list minus: addedNames2.size = 2, addedNames3.size = 0
+        List<String> addedNames4 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames2, addedNames3);
+        assertTrue(addedNames4.size() == addedNames2.size());
+        addedNames4.forEach(name -> assertTrue(addedNames2.contains(name)));
+
+        List<String> addedNames5 = PatternTopicsConsumerImpl.topicsListsMinus(addedNames3, addedNames2);
+        assertEquals(addedNames5.size(), 0);
+    }
+
+    // simulate subscribe a pattern which has no topics, but then matched topics added in.
+    @Test(timeOut = testTimeout)
+    public void testStartEmptyPatternConsumer() throws Exception {
+        String key = "StartEmptyPatternConsumerTest";
+        String subscriptionName = "my-ex-subscription-" + key;
+        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+        // 1. create partition
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. Create consumer, this should success, but with empty sub-consumser internal
+        Consumer consumer = pulsarClient.newConsumer()
+            .topicsPattern(pattern)
+            .patternAutoDiscoveryPeriod(2)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
+
+        // 3. verify consumer get methods, to get 0 number of partitions and topics.
+        assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 0);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 0);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 0);
+
+        // 4. create producer
+        String messagePredicate = "my-message-" + key + "-";
+        int totalMessages = 30;
+
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+
+        // 5. call recheckTopics to subscribe each added topics above
+        log.debug("recheck topics change");
+        PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer);
+        consumer1.run(consumer1.getRecheckPatternTimeout());
+        Thread.sleep(100);
+
+        // 6. verify consumer get methods, to get number of partitions and topics, value 6=1+2+3.
+        assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+
+        // 7. produce data
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        // 8. should receive all the message
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    // simulate subscribe a pattern which has 3 topics, but then matched topic added in.
+    @Test(timeOut = testTimeout)
+    public void testAutoSubscribePatternConsumer() throws Exception {
+        String key = "AutoSubscribePatternConsumer";
+        String subscriptionName = "my-ex-subscription-" + key;
+        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+        // 1. create partition
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. create producer
+        String messagePredicate = "my-message-" + key + "-";
+        int totalMessages = 30;
+
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+
+        Consumer consumer = pulsarClient.newConsumer()
+            .topicsPattern(pattern)
+            .patternAutoDiscoveryPeriod(2)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
+
+        assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+
+        // 4. verify consumer get methods, to get 6 number of partitions and topics: 6=1+2+3
+        assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+        // 5. produce data to topic 1,2,3; verify should receive all the message
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 6. create another producer with 4 partitions
+        String topicName4 = "persistent://prop/use/ns-abc/pattern-topic-4-" + key;
+        admin.persistentTopics().createPartitionedTopic(topicName4, 4);
+        Producer producer4 = pulsarClient.newProducer().topic(topicName4)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+
+        // 7. call recheckTopics to subscribe each added topics above, verify topics number: 10=1+2+3+4
+        log.debug("recheck topics change");
+        PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer);
+        consumer1.run(consumer1.getRecheckPatternTimeout());
+        Thread.sleep(100);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 10);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 10);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 4);
+
+        // 8. produce data to topic3 and topic4, verify should receive all the message
+        for (int i = 0; i < totalMessages / 2; i++) {
+            producer3.send((messagePredicate + "round2-producer4-" + i).getBytes());
+            producer4.send((messagePredicate + "round2-producer4-" + i).getBytes());
+        }
+
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+        producer4.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testAutoUnbubscribePatternConsumer() throws Exception {
+        String key = "AutoUnsubscribePatternConsumer";
+        String subscriptionName = "my-ex-subscription-" + key;
+        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+        // 1. create partition
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. create producer
+        String messagePredicate = "my-message-" + key + "-";
+        int totalMessages = 30;
+
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+
+        Consumer consumer = pulsarClient.newConsumer()
+            .topicsPattern(pattern)
+            .patternAutoDiscoveryPeriod(2)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
+
+        assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+
+        // 4. verify consumer get methods, to get 0 number of partitions and topics: 6=1+2+3
+        assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+        // 5. produce data to topic 1,2,3; verify should receive all the message
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 6. remove producer 1,3; verify only consumer 2 left
+        // seems no direct way to verify auto-unsubscribe, because this patternConsumer also referenced the topic.
+        List<String> topicNames = Lists.newArrayList(topicName2);
+        NamespaceService nss = pulsar.getNamespaceService();
+        doReturn(topicNames).when(nss).getListOfTopics(NamespaceName.get("prop", "use", "ns-abc"));
+
+        // 7. call recheckTopics to unsubscribe topic 1,3 , verify topics number: 2=6-1-3
+        log.debug("recheck topics change");
+        PatternTopicsConsumerImpl consumer1 = ((PatternTopicsConsumerImpl) consumer);
+        consumer1.run(consumer1.getRecheckPatternTimeout());
+        Thread.sleep(100);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getPartitionedTopics().size(), 2);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getConsumers().size(), 2);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 1);
+
+        // 8. produce data to topic2, verify should receive all the message
+        for (int i = 0; i < totalMessages; i++) {
+            producer2.send((messagePredicate + "round2-producer2-" + i).getBytes());
+        }
+
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index cf63320..c487425 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -232,6 +232,15 @@ public interface ConsumerBuilder extends Serializable, Cloneable {
     ConsumerBuilder readCompacted(boolean readCompacted);
 
     /**
+     * Set topics auto discovery period when using a pattern for topics consumer.
+     * The period is in minute, and default and minimum value is 1 minute.
+     *
+     * @param periodInMinutes
+     *            whether to read from the compacted topic
+     */
+    ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes);
+
+    /**
      * Sets priority level for the shared subscription consumers to which broker gives more priority while dispatching
      * messages. Here, broker follows descending priorities. (eg: 0=max-priority, 1, 2,..) </br>
      * In Shared subscription mode, broker will first dispatch messages to max priority-level consumers if they have
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index edaeb5b..4c9caa1 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -201,4 +201,11 @@ public class ConsumerBuilderImpl implements ConsumerBuilder {
         conf.setReadCompacted(readCompacted);
         return this;
     }
+
+    @Override
+    public ConsumerBuilder patternAutoDiscoveryPeriod(int periodInMinutes) {
+        conf.setPatternAutoDiscoveryPeriod(periodInMinutes);
+        return this;
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
index b09ad48..04445ea 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
@@ -18,16 +18,32 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static com.google.common.base.Preconditions.checkArgument;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class PatternTopicsConsumerImpl extends TopicsConsumerImpl {
+public class PatternTopicsConsumerImpl extends TopicsConsumerImpl implements TimerTask {
     private final Pattern topicsPattern;
+    private final TopicsChangedListener topicsChangeListener;
+    private volatile Timeout recheckPatternTimeout = null;
 
     public PatternTopicsConsumerImpl(Pattern topicsPattern,
                               PulsarClientImpl client,
@@ -36,11 +52,132 @@ public class PatternTopicsConsumerImpl extends TopicsConsumerImpl {
                               CompletableFuture<Consumer> subscribeFuture) {
         super(client, conf, listenerExecutor, subscribeFuture);
         this.topicsPattern = topicsPattern;
+
+        if (this.namespaceName == null) {
+            this.namespaceName = getNameSpaceFromPattern(topicsPattern);
+        }
+        checkArgument(getNameSpaceFromPattern(topicsPattern).toString().equals(this.namespaceName.toString()));
+
+        this.topicsChangeListener = new PatternTopicsChangedListener();
+        recheckPatternTimeout = client.timer().newTimeout(this, Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES);
+    }
+
+    public static NamespaceName getNameSpaceFromPattern(Pattern pattern) {
+        return TopicName.get(pattern.pattern()).getNamespaceObject();
+    }
+
+    // TimerTask to recheck topics change, and trigger subscribe/unsubscribe based on the change.
+    @Override
+    public void run(Timeout timeout) throws Exception {
+        if (timeout.isCancelled()) {
+            return;
+        }
+
+        CompletableFuture<Void> recheckFuture = new CompletableFuture<>();
+        List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(2);
+
+        client.getLookup().getTopicsUnderNamespace(namespaceName).thenAccept(topics -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size());
+                topics.forEach(topicName ->
+                    log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName));
+            }
+
+            List<String> newTopics = PulsarClientImpl.topicsPatternFilter(topics, topicsPattern);
+            List<String> oldTopics = PatternTopicsConsumerImpl.this.getTopics();
+
+            futures.add(topicsChangeListener.onTopicsAdded(topicsListsMinus(newTopics, oldTopics)));
+            futures.add(topicsChangeListener.onTopicsRemoved(topicsListsMinus(oldTopics, newTopics)));
+            FutureUtil.waitForAll(futures)
+                .thenAccept(finalFuture -> recheckFuture.complete(null))
+                .exceptionally(ex -> {
+                    log.warn("[{}] Failed to recheck topics change: {}", topic, ex.getMessage());
+                    recheckFuture.completeExceptionally(ex);
+                    return null;
+                });
+        });
+
+        // schedule the next re-check task
+        client.timer().newTimeout(PatternTopicsConsumerImpl.this,
+            Math.min(1, conf.getPatternAutoDiscoveryPeriod()), TimeUnit.MINUTES);
     }
 
     public Pattern getPattern() {
         return this.topicsPattern;
     }
 
+    interface TopicsChangedListener {
+        // unsubscribe and delete ConsumerImpl in the `consumers` map in `TopicsConsumerImpl` based on added topics.
+        CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics);
+        // subscribe and create a list of new ConsumerImpl, added them to the `consumers` map in `TopicsConsumerImpl`.
+        CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics);
+    }
+
+    private class PatternTopicsChangedListener implements TopicsChangedListener {
+        @Override
+        public CompletableFuture<Void> onTopicsRemoved(Collection<String> removedTopics) {
+            CompletableFuture<Void> removeFuture = new CompletableFuture<>();
+
+            if (removedTopics.isEmpty()) {
+                removeFuture.complete(null);
+                return removeFuture;
+            }
+
+            List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
+            removedTopics.stream().forEach(topic -> futures.add(unsubscribeAsync(topic)));
+            FutureUtil.waitForAll(futures)
+                .thenAccept(finalFuture -> removeFuture.complete(null))
+                .exceptionally(ex -> {
+                    log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage());
+                    removeFuture.completeExceptionally(ex);
+                return null;
+            });
+            return removeFuture;
+        }
+
+        @Override
+        public CompletableFuture<Void> onTopicsAdded(Collection<String> addedTopics) {
+            CompletableFuture<Void> addFuture = new CompletableFuture<>();
+
+            if (addedTopics.isEmpty()) {
+                addFuture.complete(null);
+                return addFuture;
+            }
+
+            List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
+            addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic)));
+            FutureUtil.waitForAll(futures)
+                .thenAccept(finalFuture -> addFuture.complete(null))
+                .exceptionally(ex -> {
+                    log.warn("[{}] Failed to unsubscribe topics: {}", topic, ex.getMessage());
+                    addFuture.completeExceptionally(ex);
+                    return null;
+                });
+            return addFuture;
+        }
+    }
+
+    // get topics, which are contained in list1, and not in list2
+    public static List<String> topicsListsMinus(List<String> list1, List<String> list2) {
+        HashSet<String> s1 = new HashSet<>(list1);
+        s1.removeAll(list2);
+        return s1.stream().collect(Collectors.toList());
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        Timeout timeout = recheckPatternTimeout;
+        if (timeout != null) {
+            timeout.cancel();
+            recheckPatternTimeout = null;
+        }
+        return super.closeAsync();
+    }
+
+    @VisibleForTesting
+    Timeout getRecheckPatternTimeout() {
+        return recheckPatternTimeout;
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index c07ec8b..fd3cada 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -31,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
@@ -402,13 +403,13 @@ public class PulsarClientImpl implements PulsarClient {
         CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
         lookup.getTopicsUnderNamespace(namespaceName)
             .thenAccept(topics -> {
-                List<String> topicsList = topics.stream()
-                    .filter(topic -> {
-                        TopicName destinationName = TopicName.get(topic);
-                        checkState(destinationName.getNamespaceObject().equals(namespaceName));
-                        return conf.getTopicsPattern().matcher(destinationName.toString()).matches();
-                    })
-                    .collect(Collectors.toList());
+                if (log.isDebugEnabled()) {
+                    log.debug("Get topics under namespace {}, topics.size: {}", namespaceName.toString(), topics.size());
+                    topics.forEach(topicName ->
+                        log.debug("Get topics under namespace {}, topic: {}", namespaceName.toString(), topicName));
+                }
+
+                List<String> topicsList = topicsPatternFilter(topics, conf.getTopicsPattern());
                 conf.getTopicNames().addAll(topicsList);
                 ConsumerBase consumer = new PatternTopicsConsumerImpl(conf.getTopicsPattern(),
                     PulsarClientImpl.this,
@@ -429,6 +430,17 @@ public class PulsarClientImpl implements PulsarClient {
         return consumerSubscribedFuture;
     }
 
+    // get topics that match 'topicsPattern' from original topics list
+    // return result should contain only topic names, without partition part
+    public static List<String> topicsPatternFilter(List<String> original, Pattern topicsPattern) {
+        return original.stream()
+            .filter(topic -> {
+                TopicName destinationName = TopicName.get(topic);
+                return topicsPattern.matcher(destinationName.toString()).matches();
+            })
+            .collect(Collectors.toList());
+    }
+
     @Override
     public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
             throws PulsarClientException {
@@ -619,6 +631,10 @@ public class PulsarClientImpl implements PulsarClient {
         return eventLoopGroup;
     }
 
+    public LookupService getLookup() {
+        return lookup;
+    }
+
     public CompletableFuture<Integer> getNumberOfPartitions(String topic) {
         return getPartitionedTopicMetadata(topic).thenApply(metadata -> metadata.partitions);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
index 492c014..4a8e663 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -38,7 +38,6 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.MessageId;
@@ -64,7 +63,7 @@ public class TopicsConsumerImpl extends ConsumerBase {
     private final ConcurrentHashMap<String, ConsumerImpl> consumers;
 
     // Map <topic, partitionNumber>, store partition number for each topic
-    private final ConcurrentHashMap<String, Integer> topics;
+    protected final ConcurrentHashMap<String, Integer> topics;
 
     // Queue of partition consumers on which we have stopped calling receiveAsync() because the
     // shared incoming queue was full
@@ -146,8 +145,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
     // - every topic has same namespace,
     // - topic names are unique.
     private static boolean topicNamesValid(Collection<String> topics) {
-        checkState(topics != null && topics.size() > 1,
-            "topics should should contain more than 1 topics");
+        checkState(topics != null && topics.size() >= 1,
+            "topics should should contain more than 1 topic");
 
         final String namespace = TopicName.get(topics.stream().findFirst().get()).getNamespace();
 
@@ -689,8 +688,8 @@ public class TopicsConsumerImpl extends ConsumerBase {
                             consumers.values().stream()
                                 .filter(consumer1 -> {
                                     String consumerTopicName = consumer1.getTopic();
-                                    if (TopicName.get(consumerTopicName)
-                                        .getPartitionedTopicName().equals(topicName)) {
+                                    if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
+                                        TopicName.get(topicName).getPartitionedTopicName().toString())) {
                                         return true;
                                     } else {
                                         return false;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 5acfe9d..72df4ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -75,6 +75,8 @@ public class ConsumerConfigurationData implements Serializable, Cloneable {
 
     private boolean readCompacted = false;
 
+    private int patternAutoDiscoveryPeriod = 1;
+
     @JsonIgnore
     public String getSingleTopic() {
         checkArgument(topicNames.size() == 1);

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.