You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/21 22:15:46 UTC

[GitHub] merlimat closed pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace

merlimat closed pull request #1103: PIP-13-1/3: Provide `TopicsConsumer` to consume from several topics under same namespace
URL: https://github.com/apache/incubator-pulsar/pull/1103
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 7a65cbc62..f311f4ce8 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -539,11 +539,11 @@ public void testUnackedCountWithRedeliveries() throws Exception {
             producer.send(("hello-" + i).getBytes());
         }
 
-        Set<MessageIdImpl> c1_receivedMessages = new HashSet<>();
+        Set<MessageId> c1_receivedMessages = new HashSet<>();
 
         // C-1 gets all messages but doesn't ack
         for (int i = 0; i < numMsgs; i++) {
-            c1_receivedMessages.add((MessageIdImpl) consumer1.receive().getMessageId());
+            c1_receivedMessages.add(consumer1.receive().getMessageId());
         }
 
         // C-2 will not get any message initially, since everything went to C-1 already
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
new file mode 100644
index 000000000..a2cb6d579
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TopicsConsumerImplTest extends ProducerConsumerBase {
+    private static final long testTimeout = 90000; // 1.5 min
+    private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImplTest.class);
+    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @Override
+    @AfterMethod
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    // Verify subscribe topics from different namespace should return error.
+    @Test(timeOut = testTimeout)
+    public void testDifferentTopicsNameSubscribe() throws Exception {
+        String key = "TopicsFromDifferentNamespace";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc1/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc2/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc3/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        try {
+            Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+            fail("subscribe for topics from different namespace should fail.");
+        } catch (IllegalArgumentException e) {
+            // expected for have different namespace
+        }
+    }
+
+
+    @Test(timeOut = testTimeout)
+    public void testGetConsumersAndGetTopics() throws Exception {
+        String key = "TopicsConsumerGet";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+        topics.forEach(topic -> log.info("topic: {}", topic));
+        consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
+
+        IntStream.range(0, 6).forEach(index ->
+            assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
+
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+        consumer.unsubscribe();
+        consumer.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testSyncProducerAndConsumer() throws Exception {
+        String key = "TopicsConsumerSyncTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testAsyncConsumer() throws Exception {
+        String key = "TopicsConsumerAsyncTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // Asynchronously produce messages
+        List<Future<MessageId>> futures = Lists.newArrayList();
+        for (int i = 0; i < totalMessages / 3; i++) {
+            futures.add(producer1.sendAsync((messagePredicate + "producer1-" + i).getBytes()));
+            futures.add(producer2.sendAsync((messagePredicate + "producer2-" + i).getBytes()));
+            futures.add(producer3.sendAsync((messagePredicate + "producer3-" + i).getBytes()));
+        }
+        log.info("Waiting for async publish to complete : {}", futures.size());
+        for (Future<MessageId> future : futures) {
+            future.get();
+        }
+
+        log.info("start async consume");
+        CountDownLatch latch = new CountDownLatch(totalMessages);
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        executor.execute(() -> IntStream.range(0, totalMessages).forEach(index ->
+            consumer.receiveAsync()
+                .thenAccept(msg -> {
+                    assertTrue(msg instanceof TopicMessageImpl);
+                    try {
+                        consumer.acknowledge(msg);
+                    } catch (PulsarClientException e1) {
+                        fail("message acknowledge failed", e1);
+                    }
+                    latch.countDown();
+                    log.info("receive index: {}, latch countDown: {}", index, latch.getCount());
+                })
+                .exceptionally(ex -> {
+                    log.warn("receive index: {}, failed receive message {}", index, ex.getMessage());
+                    ex.printStackTrace();
+                    return null;
+                })));
+
+        latch.await();
+        log.info("success latch wait");
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testConsumerUnackedRedelivery() throws Exception {
+        String key = "TopicsConsumerRedeliveryTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        // 4. Receiver receives the message, not ack, Unacked Message Tracker size should be totalMessages.
+        Message message = consumer.receive();
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            log.debug("Consumer received : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        }
+        long size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, totalMessages);
+
+        // 5. Blocking call, redeliver should kick in, after receive and ack, Unacked Message Tracker size should be 0.
+        message = consumer.receive();
+        HashSet<String> hSet = new HashSet<>();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            hSet.add(new String(message.getData()));
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+
+        size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+        assertEquals(hSet.size(), totalMessages);
+
+        // 6. producer publish more messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round2" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round2" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round2" + i).getBytes());
+        }
+
+        // 7. Receiver receives the message, ack them
+        message = consumer.receive();
+        int received = 0;
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            received++;
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            consumer.acknowledge(message);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+        assertEquals(received, totalMessages);
+
+        // 8. Simulate ackTimeout
+        ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().toggle();
+        ((TopicsConsumerImpl) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
+
+        // 9. producer publish more messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round3" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round3" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round3" + i).getBytes());
+        }
+
+        // 10. Receiver receives the message, doesn't ack
+        message = consumer.receive();
+        while (message != null) {
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 30);
+
+        Thread.sleep(ackTimeOutMillis);
+
+        // 11. Receiver receives redelivered messages
+        message = consumer.receive();
+        int redelivered = 0;
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            redelivered++;
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            consumer.acknowledge(message);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        assertEquals(redelivered, 30);
+        size =  ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.info(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test
+    public void testSubscribeUnsubscribeSingleTopic() throws Exception {
+        String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 4, unsubscribe topic3
+        CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl)consumer).unsubscribeAsync(topicName3);
+        unsubFuture.get();
+
+        // 5. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round2" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round2" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round2" + i).getBytes());
+        }
+
+        // 6. should not receive messages from topic3, verify get 2/3 of all messages
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages * 2 / 3);
+
+        // 7. use getter to verify internal topics number after un-subscribe topic3
+        List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+        assertEquals(topics.size(), 3);
+        assertEquals(consumers.size(), 3);
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 2);
+
+        // 8. re-subscribe topic3
+        CompletableFuture<Void> subFuture = ((TopicsConsumerImpl)consumer).subscribeAsync(topicName3);
+        subFuture.get();
+
+        // 9. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round3" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round3" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round3" + i).getBytes());
+        }
+
+        // 10. should receive messages from all 3 topics
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 11. use getter to verify internal topics number after subscribe topic3
+        topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+        consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+        assertEquals(topics.size(), 6);
+        assertEquals(consumers.size(), 6);
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index e15db40a6..af8c8498d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.pulsar.client.impl.PulsarClientImpl;
@@ -245,4 +246,60 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
+
+
+    /**
+     * Subscribe to the given topic and subscription combination with default {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     */
+    Consumer subscribe(Collection<String> topics, String subscription) throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topics and subscription combination with
+     * default {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription);
+
+    /**
+     * Subscribe to the given topics and subscription combination using given {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    Consumer subscribe(Collection<String> topics, String subscription, ConsumerConfiguration conf)
+        throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topics and subscription combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
+                                               String subscription,
+                                               ConsumerConfiguration conf);
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index be5e6584b..a076fa799 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.collect.Queues;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -27,7 +28,6 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
@@ -41,8 +41,6 @@
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
-import com.google.common.collect.Queues;
-
 public abstract class ConsumerBase extends HandlerBase implements Consumer {
 
     enum ConsumerType {
@@ -57,7 +55,7 @@
     protected final ExecutorService listenerExecutor;
     final BlockingQueue<Message> incomingMessages;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives;
-    protected final int maxReceiverQueueSize;
+    protected int maxReceiverQueueSize;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
             int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
@@ -330,7 +328,7 @@ public String getSubscription() {
      * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
      * breaks, the messages are redelivered after reconnect.
      */
-    protected abstract void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds);
+    protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> messageIds);
 
     @Override
     public String toString() {
@@ -340,4 +338,9 @@ public String toString() {
                 ", topic='" + topic + '\'' +
                 '}';
     }
+
+    protected void setMaxReceiverQueueSize(int newSize) {
+        this.maxReceiverQueueSize = newSize;
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index a43689971..47d88bb87 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -1174,7 +1174,9 @@ public void redeliverUnacknowledgedMessages() {
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
+
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
             // We cannot redeliver single messages if subscription type is not Shared
             redeliverUnacknowledgedMessages();
@@ -1183,7 +1185,10 @@ public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
         ClientCnx cnx = cnx();
         if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) {
             int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
-            Iterable<List<MessageIdImpl>> batches = Iterables.partition(messageIds, MAX_REDELIVER_UNACKNOWLEDGED);
+            Iterable<List<MessageIdImpl>> batches = Iterables.partition(
+                messageIds.stream()
+                    .map(messageId -> (MessageIdImpl)messageId)
+                    .collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED);
             MessageIdData.Builder builder = MessageIdData.newBuilder();
             batches.forEach(ids -> {
                 List<MessageIdData> messageIdDatas = ids.stream().map(messageId -> {
@@ -1360,7 +1365,7 @@ private MessageIdImpl getMessageIdImpl(Message msg) {
         return messageId;
     }
 
-    private int removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
+    private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         int messagesFromQueue = 0;
         Message peek = incomingMessages.peek();
         if (peek != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 60e392804..7a4368517 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -468,7 +468,8 @@ public void redeliverUnacknowledgedMessages() {
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
             // We cannot redeliver single messages if subscription type is not Shared
             redeliverUnacknowledgedMessages();
@@ -476,9 +477,11 @@ public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
         }
         removeExpiredMessagesFromQueue(messageIds);
         messageIds.stream()
-                .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet()))
-                .forEach((partitionIndex, messageIds1) ->
-                        consumers.get(partitionIndex).redeliverUnacknowledgedMessages(messageIds1));
+            .map(messageId -> (MessageIdImpl)messageId)
+            .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet()))
+            .forEach((partitionIndex, messageIds1) ->
+                consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
+                    messageIds1.stream().map(mid -> (MessageId)mid).collect(Collectors.toSet())));
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
@@ -546,10 +549,10 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
 
-    private void removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
+    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         Message peek = incomingMessages.peek();
         if (peek != null) {
-            if (!messageIds.contains((MessageIdImpl) peek.getMessageId())) {
+            if (!messageIds.contains(peek.getMessageId())) {
                 // first message is not expired, then no message is expired in queue.
                 return;
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index f6bd759bb..4b2f7f86c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -20,6 +20,7 @@
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
 
+import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -281,6 +282,70 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
         return consumerSubscribedFuture;
     }
 
+
+    @Override
+    public Consumer subscribe(Collection<String> topics, final String subscription) throws PulsarClientException {
+        return subscribe(topics, subscription, new ConsumerConfiguration());
+    }
+
+    @Override
+    public Consumer subscribe(Collection<String> topics,
+                              String subscription,
+                              ConsumerConfiguration conf)
+        throws PulsarClientException {
+        try {
+            return subscribeAsync(topics, subscription, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription) {
+        return subscribeAsync(topics, subscription, new ConsumerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
+                                                      String subscription,
+                                                      ConsumerConfiguration conf) {
+        if (topics == null || topics.isEmpty()) {
+            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Empty topics name"));
+        }
+
+        if (state.get() != State.Open) {
+            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
+        }
+
+        if (isBlank(subscription)) {
+            return FutureUtil
+                .failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+        }
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+        }
+
+        CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
+
+        ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, topics, subscription,
+            conf, externalExecutorProvider.getExecutor(),
+            consumerSubscribedFuture);
+        synchronized (consumers) {
+            consumers.put(consumer, Boolean.TRUE);
+        }
+
+        return consumerSubscribedFuture;
+    }
+
     @Override
     public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
             throws PulsarClientException {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
new file mode 100644
index 000000000..c7cc4536a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
+    private final String topicName;
+    private final MessageId messageId;
+
+    TopicMessageIdImpl(String topicName, MessageId messageId) {
+        this.topicName = topicName;
+        this.messageId = messageId;
+    }
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public MessageId getInnerMessageId() {
+        return messageId;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        return messageId.toByteArray();
+    }
+
+    @Override
+    public int compareTo(MessageId o) {
+        return messageId.compareTo(o);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
new file mode 100644
index 000000000..619d15cde
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
+
+    private final String topicName;
+    private final Message msg;
+    private final MessageId msgId;
+
+    TopicMessageImpl(String topicName,
+                     Message msg) {
+        this.topicName = topicName;
+        this.msg = msg;
+        this.msgId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+    }
+
+    /**
+     * Get the topic name of this message.
+     * @return the name of the topic on which this message was published
+     */
+    public String getTopicName() {
+        return topicName;
+    }
+
+    @Override
+    public MessageId getMessageId() {
+        return msgId;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return msg.getProperties();
+    }
+
+    @Override
+    public boolean hasProperty(String name) {
+        return msg.hasProperty(name);
+    }
+
+    @Override
+    public String getProperty(String name) {
+        return msg.getProperty(name);
+    }
+
+    @Override
+    public byte[] getData() {
+        return msg.getData();
+    }
+
+    @Override
+    public long getPublishTime() {
+        return msg.getPublishTime();
+    }
+
+    @Override
+    public long getEventTime() {
+        return msg.getEventTime();
+    }
+
+    @Override
+    public long getSequenceId() {
+        return msg.getSequenceId();
+    }
+
+    @Override
+    public String getProducerName() {
+        return msg.getProducerName();
+    }
+
+    @Override
+    public boolean hasKey() {
+        return msg.hasKey();
+    }
+
+    @Override
+    public String getKey() {
+        return msg.getKey();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
new file mode 100644
index 000000000..852c5d25e
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -0,0 +1,823 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.util.FutureUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+    // All topics should be in same namespace
+    protected NamespaceName namespaceName;
+
+    // Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name
+    private final ConcurrentHashMap<String, ConsumerImpl> consumers;
+
+    // Map <topic, partitionNumber>, store partition number for each topic
+    private final ConcurrentHashMap<String, Integer> topics;
+
+    // Queue of partition consumers on which we have stopped calling receiveAsync() because the
+    // shared incoming queue was full
+    private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
+
+    // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to
+    // resume receiving from the paused consumer partitions
+    private final int sharedQueueResumeThreshold;
+
+    // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number.
+    AtomicInteger numberTopicPartitions;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ConsumerStats stats;
+    private final UnAckedMessageTracker unAckedMessageTracker;
+    private final ConsumerConfiguration internalConfig;
+
+    TopicsConsumerImpl(PulsarClientImpl client, Collection<String> topics, String subscription,
+                       ConsumerConfiguration conf, ExecutorService listenerExecutor,
+                       CompletableFuture<Consumer> subscribeFuture) {
+        super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription,
+            conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+            subscribeFuture);
+
+        checkArgument(conf.getReceiverQueueSize() > 0,
+            "Receiver queue size needs to be greater than 0 for Topics Consumer");
+
+        this.topics = new ConcurrentHashMap<>();
+        this.consumers = new ConcurrentHashMap<>();
+        this.pausedConsumers = new ConcurrentLinkedQueue<>();
+        this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+        this.numberTopicPartitions = new AtomicInteger(0);
+
+        if (conf.getAckTimeoutMillis() != 0) {
+            this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
+        } else {
+            this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
+        }
+
+        this.internalConfig = getInternalConsumerConfig();
+        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+
+        if (topics.isEmpty()) {
+            this.namespaceName = null;
+            setState(State.Ready);
+            subscribeFuture().complete(TopicsConsumerImpl.this);
+            return;
+        }
+
+        checkArgument(topics.isEmpty() || topicNamesValid(topics), "Topics should have same namespace.");
+        this.namespaceName = topics.stream().findFirst().flatMap(
+            new Function<String, Optional<NamespaceName>>() {
+                @Override
+                public Optional<NamespaceName> apply(String s) {
+                    return Optional.of(DestinationName.get(s).getNamespaceObject());
+                }
+            }).get();
+
+        List<CompletableFuture<Void>> futures = topics.stream().map(t -> subscribeAsync(t)).collect(Collectors.toList());
+        FutureUtil.waitForAll(futures)
+            .thenAccept(finalFuture -> {
+                try {
+                    if (numberTopicPartitions.get() > maxReceiverQueueSize) {
+                        setMaxReceiverQueueSize(numberTopicPartitions.get());
+                    }
+                    setState(State.Ready);
+                    // We have successfully created N consumers, so we can start receiving messages now
+                    startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+                    subscribeFuture().complete(TopicsConsumerImpl.this);
+                    log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
+                        topic, subscription, numberTopicPartitions.get());
+                } catch (PulsarClientException e) {
+                    log.warn("[{}] Failed startReceivingMessages while subscribe topics: {}", topic, e.getMessage());
+                    subscribeFuture.completeExceptionally(e);
+                }})
+            .exceptionally(ex -> {
+                log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage());
+                subscribeFuture.completeExceptionally(ex);
+                return null;
+            });
+    }
+
+    // Check topics are valid.
+    // - each topic is valid,
+    // - every topic has same namespace,
+    // - topic names are unique.
+    private static boolean topicNamesValid(Collection<String> topics) {
+        checkState(topics != null && topics.size() > 1,
+            "topics should should contain more than 1 topics");
+
+        final String namespace = DestinationName.get(topics.stream().findFirst().get()).getNamespace();
+
+        Optional<String> result = topics.stream()
+            .filter(topic -> {
+                boolean topicInvalid = !DestinationName.isValid(topic);
+                if (topicInvalid) {
+                    return true;
+                }
+
+                String newNamespace =  DestinationName.get(topic).getNamespace();
+                if (!namespace.equals(newNamespace)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).findFirst();
+
+        if (result.isPresent()) {
+            log.warn("[{}] Received invalid topic name.  {}/{}", result.get());
+            return false;
+        }
+
+        // check topic names are unique
+        HashSet<String> set = new HashSet<>(topics);
+        if (set.size() == topics.size()) {
+            return true;
+        } else {
+            log.warn("Topic names not unique. unique/all : {}/{}", set.size(), topics.size());
+            return false;
+        }
+    }
+
+    private void startReceivingMessages(List<ConsumerImpl> newConsumers) throws PulsarClientException {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
+                topic, newConsumers.size(), getState());
+        }
+        if (getState() == State.Ready) {
+            newConsumers.forEach(consumer -> {
+                consumer.sendFlowPermitsToBroker(consumer.cnx(), conf.getReceiverQueueSize());
+                receiveMessageFromConsumer(consumer);
+            });
+        }
+    }
+
+    private void receiveMessageFromConsumer(ConsumerImpl consumer) {
+        consumer.receiveAsync().thenAccept(message -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Receive message from sub consumer:{}",
+                    topic, subscription, consumer.getTopic());
+            }
+            // Process the message, add to the queue and trigger listener or async callback
+            messageReceived(consumer, message);
+
+            // we're modifying pausedConsumers
+            lock.writeLock().lock();
+            try {
+                int size = incomingMessages.size();
+                if (size >= maxReceiverQueueSize
+                        || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
+                    // mark this consumer to be resumed later: if No more space left in shared queue,
+                    // or if any consumer is already paused (to create fair chance for already paused consumers)
+                    pausedConsumers.add(consumer);
+                } else {
+                    // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
+                    // recursion and stack overflow
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        });
+    }
+
+    private void messageReceived(ConsumerImpl consumer, Message message) {
+        checkArgument(message instanceof MessageImpl);
+        lock.writeLock().lock();
+        try {
+            TopicMessageImpl topicMessage = new TopicMessageImpl(consumer.getTopic(), message);
+            unAckedMessageTracker.add(topicMessage.getMessageId());
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Received message from topics-consumer {}",
+                    topic, subscription, message.getMessageId());
+            }
+
+            // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
+            if (!pendingReceives.isEmpty()) {
+                CompletableFuture<Message> receivedFuture = pendingReceives.poll();
+                listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
+            } else {
+                // Enqueue the message so that it can be retrieved when application calls receive()
+                // Waits for the queue to have space for the message
+                // This should never block cause TopicsConsumerImpl should always use GrowableArrayBlockingQueue
+                incomingMessages.put(topicMessage);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (listener != null) {
+            // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
+            // thread while the message processing happens
+            listenerExecutor.execute(() -> {
+                Message msg;
+                try {
+                    msg = internalReceive();
+                } catch (PulsarClientException e) {
+                    log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+                    return;
+                }
+
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Calling message listener for message {}",
+                            topic, subscription, message.getMessageId());
+                    }
+                    listener.received(TopicsConsumerImpl.this, msg);
+                } catch (Throwable t) {
+                    log.error("[{}][{}] Message listener error in processing message: {}",
+                        topic, subscription, message, t);
+                }
+            });
+        }
+    }
+
+    private void resumeReceivingFromPausedConsumersIfNeeded() {
+        lock.readLock().lock();
+        try {
+            if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
+                while (true) {
+                    ConsumerImpl consumer = pausedConsumers.poll();
+                    if (consumer == null) {
+                        break;
+                    }
+
+                    // if messages are readily available on consumer we will attempt to writeLock on the same thread
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    protected Message internalReceive() throws PulsarClientException {
+        Message message;
+        try {
+            message = incomingMessages.take();
+            checkState(message instanceof TopicMessageImpl);
+            unAckedMessageTracker.add(message.getMessageId());
+            resumeReceivingFromPausedConsumersIfNeeded();
+            return message;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
+        Message message;
+        try {
+            message = incomingMessages.poll(timeout, unit);
+            if (message != null) {
+                checkArgument(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+            }
+            resumeReceivingFromPausedConsumersIfNeeded();
+            return message;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Message> internalReceiveAsync() {
+        CompletableFuture<Message> result = new CompletableFuture<>();
+        Message message;
+        try {
+            lock.writeLock().lock();
+            message = incomingMessages.poll(0, TimeUnit.SECONDS);
+            if (message == null) {
+                pendingReceives.add(result);
+            } else {
+                checkState(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+                resumeReceivingFromPausedConsumersIfNeeded();
+                result.complete(message);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            result.completeExceptionally(new PulsarClientException(e));
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        return result;
+    }
+
+    @Override
+    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
+                                                    Map<String,Long> properties) {
+        checkArgument(messageId instanceof TopicMessageIdImpl);
+        TopicMessageIdImpl messageId1 = (TopicMessageIdImpl) messageId;
+
+        if (getState() != State.Ready) {
+            return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
+        }
+
+        if (ackType == AckType.Cumulative) {
+            return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException(
+                    "Cumulative acknowledge not supported for topics consumer"));
+        } else {
+            ConsumerImpl consumer = consumers.get(messageId1.getTopicName());
+
+            MessageId innerId = messageId1.getInnerMessageId();
+            return consumer.doAcknowledge(innerId, ackType, properties)
+                .thenRun(() ->
+                    unAckedMessageTracker.remove(messageId1));
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> unsubscribeAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+        }
+        setState(State.Closing);
+
+        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+        List<CompletableFuture<Void>> futureList = consumers.values().stream()
+            .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futureList)
+            .whenComplete((r, ex) -> {
+                if (ex == null) {
+                    setState(State.Closed);
+                    unAckedMessageTracker.close();
+                    unsubscribeFuture.complete(null);
+                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
+                        topic, subscription, consumerName);
+                } else {
+                    setState(State.Failed);
+                    unsubscribeFuture.completeExceptionally(ex);
+                    log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer",
+                        topic, subscription, consumerName, ex.getCause());
+                }
+            });
+
+        return unsubscribeFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            unAckedMessageTracker.close();
+            return CompletableFuture.completedFuture(null);
+        }
+        setState(State.Closing);
+
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+        List<CompletableFuture<Void>> futureList = consumers.values().stream()
+            .map(c -> c.closeAsync()).collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futureList)
+            .whenComplete((r, ex) -> {
+                if (ex == null) {
+                    setState(State.Closed);
+                    unAckedMessageTracker.close();
+                    closeFuture.complete(null);
+                    log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
+                    client.cleanupConsumer(this);
+                    // fail all pending-receive futures to notify application
+                    failPendingReceive();
+                } else {
+                    setState(State.Failed);
+                    closeFuture.completeExceptionally(ex);
+                    log.error("[{}] [{}] Could not close Topics Consumer", topic, subscription,
+                        ex.getCause());
+                }
+            });
+
+        return closeFuture;
+    }
+
+    private void failPendingReceive() {
+        lock.readLock().lock();
+        try {
+            if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+                while (!pendingReceives.isEmpty()) {
+                    CompletableFuture<Message> receiveFuture = pendingReceives.poll();
+                    if (receiveFuture != null) {
+                        receiveFuture.completeExceptionally(
+                                new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
+                    } else {
+                        break;
+                    }
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return consumers.values().stream().allMatch(consumer -> consumer.isConnected());
+    }
+
+    @Override
+    void connectionFailed(PulsarClientException exception) {
+        // noop
+
+    }
+
+    @Override
+    void connectionOpened(ClientCnx cnx) {
+        // noop
+
+    }
+
+    @Override
+    String getHandlerName() {
+        return subscription;
+    }
+
+    private ConsumerConfiguration getInternalConsumerConfig() {
+        ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
+        internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
+        internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
+        internalConsumerConfig.setConsumerName(consumerName);
+        if (conf.getCryptoKeyReader() != null) {
+            internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
+            internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
+        }
+        if (conf.getAckTimeoutMillis() != 0) {
+            internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
+        }
+
+        return internalConsumerConfig;
+    }
+
+    @Override
+    public void redeliverUnacknowledgedMessages() {
+        synchronized (this) {
+            consumers.values().stream().forEach(consumer -> consumer.redeliverUnacknowledgedMessages());
+            incomingMessages.clear();
+            unAckedMessageTracker.clear();
+            resumeReceivingFromPausedConsumersIfNeeded();
+        }
+    }
+
+    @Override
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof TopicMessageIdImpl);
+
+        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+            // We cannot redeliver single messages if subscription type is not Shared
+            redeliverUnacknowledgedMessages();
+            return;
+        }
+        removeExpiredMessagesFromQueue(messageIds);
+        messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
+            .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet()))
+            .forEach((topicName, messageIds1) ->
+                consumers.get(topicName)
+                    .redeliverUnacknowledgedMessages(messageIds1.stream()
+                        .map(mid -> mid.getInnerMessageId()).collect(Collectors.toSet())));
+        resumeReceivingFromPausedConsumersIfNeeded();
+    }
+
+    @Override
+    public void seek(MessageId messageId) throws PulsarClientException {
+        try {
+            seekAsync(messageId).get();
+        } catch (ExecutionException e) {
+            throw new PulsarClientException(e.getCause());
+        } catch (InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
+    }
+
+    /**
+     * helper method that returns current state of data structure used to track acks for batch messages
+     *
+     * @return true if all batch messages have been acknowledged
+     */
+    public boolean isBatchingAckTrackerEmpty() {
+        return consumers.values().stream().allMatch(consumer -> consumer.isBatchingAckTrackerEmpty());
+    }
+
+
+    @Override
+    public int getAvailablePermits() {
+        return consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
+    }
+
+    @Override
+    public boolean hasReachedEndOfTopic() {
+        return consumers.values().stream().allMatch(Consumer::hasReachedEndOfTopic);
+    }
+
+    @Override
+    public int numMessagesInQueue() {
+        return incomingMessages.size() + consumers.values().stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
+    }
+
+    @Override
+    public synchronized ConsumerStats getStats() {
+        if (stats == null) {
+            return null;
+        }
+        stats.reset();
+
+        consumers.values().stream().forEach(consumer -> stats.updateCumulativeStats(consumer.getStats()));
+        return stats;
+    }
+
+    public UnAckedMessageTracker getUnAckedMessageTracker() {
+        return unAckedMessageTracker;
+    }
+
+    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
+        Message peek = incomingMessages.peek();
+        if (peek != null) {
+            if (!messageIds.contains(peek.getMessageId())) {
+                // first message is not expired, then no message is expired in queue.
+                return;
+            }
+
+            // try not to remove elements that are added while we remove
+            Message message = incomingMessages.poll();
+            checkState(message instanceof TopicMessageImpl);
+            while (message != null) {
+                MessageId messageId = message.getMessageId();
+                if (!messageIds.contains(messageId)) {
+                    messageIds.add(messageId);
+                    break;
+                }
+                message = incomingMessages.poll();
+            }
+        }
+    }
+
+    private boolean topicNameValid(String topicName) {
+        checkArgument(DestinationName.isValid(topicName), "Invalid topic name:" + topicName);
+        checkArgument(!topics.containsKey(topicName), "Topics already contains topic:" + topicName);
+
+        if (this.namespaceName != null) {
+            checkArgument(DestinationName.get(topicName).getNamespace().toString().equals(this.namespaceName.toString()),
+                "Topic " + topicName + " not in same namespace with Topics");
+        }
+
+        return true;
+    }
+
+    // subscribe one more given topic
+    public CompletableFuture<Void> subscribeAsync(String topicName) {
+        if (!topicNameValid(topicName)) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topic name not valid"));
+        }
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+        final AtomicInteger partitionNumber = new AtomicInteger(0);
+
+        client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Received topic {} metadata.partitions: {}", topicName, metadata.partitions);
+            }
+
+            List<CompletableFuture<Consumer>> futureList;
+
+            if (metadata.partitions > 1) {
+                this.topics.putIfAbsent(topicName, metadata.partitions);
+                numberTopicPartitions.addAndGet(metadata.partitions);
+                partitionNumber.addAndGet(metadata.partitions);
+
+                futureList = IntStream
+                    .range(0, partitionNumber.get())
+                    .mapToObj(
+                        partitionIndex -> {
+                            String partitionName = DestinationName.get(topicName).getPartition(partitionIndex).toString();
+                            CompletableFuture<Consumer> subFuture = new CompletableFuture<Consumer>();
+                            ConsumerImpl newConsumer = new ConsumerImpl(client, partitionName, subscription, internalConfig,
+                                client.externalExecutorProvider().getExecutor(), partitionIndex,
+                                subFuture);
+                            consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+                            return subFuture;
+                        })
+                    .collect(Collectors.toList());
+            } else {
+                this.topics.putIfAbsent(topicName, 1);
+                numberTopicPartitions.incrementAndGet();
+                partitionNumber.incrementAndGet();
+
+                CompletableFuture<Consumer> subFuture = new CompletableFuture<Consumer>();
+                ConsumerImpl newConsumer = new ConsumerImpl(client, topicName, subscription, internalConfig,
+                    client.externalExecutorProvider().getExecutor(), 0,
+                    subFuture);
+                consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
+
+                futureList = Lists.newArrayList(subFuture);
+            }
+
+            FutureUtil.waitForAll(futureList)
+                .thenAccept(finalFuture -> {
+                    try {
+                        if (numberTopicPartitions.get() > maxReceiverQueueSize) {
+                            setMaxReceiverQueueSize(numberTopicPartitions.get());
+                        }
+                        int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum();
+                        checkState(numberTopicPartitions.get() == numTopics,
+                            "numberTopicPartitions " + numberTopicPartitions.get()
+                                + " not equals expected: " + numTopics);
+
+                        // We have successfully created new consumers, so we can start receiving messages for them
+                        startReceivingMessages(
+                            consumers.values().stream()
+                                .filter(consumer1 -> {
+                                    String consumerTopicName = consumer1.getTopic();
+                                    if (DestinationName.get(consumerTopicName)
+                                        .getPartitionedTopicName().equals(topicName)) {
+                                        return true;
+                                    } else {
+                                        return false;
+                                    }
+                                })
+                                .collect(Collectors.toList()));
+
+                        subscribeResult.complete(null);
+                        log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, numberTopicPartitions {}",
+                            topic, subscription, topicName, numberTopicPartitions.get());
+                        if (this.namespaceName == null) {
+                            this.namespaceName = DestinationName.get(topicName).getNamespaceObject();
+                        }
+                        return;
+                    } catch (PulsarClientException e) {
+                        handleSubscribeOneTopicError(topicName, e);
+                        subscribeResult.completeExceptionally(e);
+                    }
+                })
+                .exceptionally(ex -> {
+                    handleSubscribeOneTopicError(topicName, ex);
+                    subscribeResult.completeExceptionally(ex);
+                    return null;
+                });
+        }).exceptionally(ex1 -> {
+            log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
+            subscribeResult.completeExceptionally(ex1);
+            return null;
+        });
+
+        return subscribeResult;
+    }
+
+    // handling failure during subscribe new topic, unsubscribe success created partitions
+    private void handleSubscribeOneTopicError(String topicName, Throwable error) {
+        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer ", topic, topicName, error.getMessage());
+
+        consumers.values().stream().filter(consumer1 -> {
+            String consumerTopicName = consumer1.getTopic();
+            if (DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+                return true;
+            } else {
+                return false;
+            }
+        }).forEach(consumer2 ->  {
+            consumer2.closeAsync().handle((ok, closeException) -> {
+                consumer2.subscribeFuture().completeExceptionally(error);
+                return null;
+            });
+            consumers.remove(consumer2.getTopic());
+        });
+
+        topics.remove(topicName);
+        checkState(numberTopicPartitions.get() == consumers.values().size());
+    }
+
+    // un-subscribe a given topic
+    public CompletableFuture<Void> unsubscribeAsync(String topicName) {
+        checkArgument(DestinationName.isValid(topicName), "Invalid topic name:" + topicName);
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+        String topicPartName = DestinationName.get(topicName).getPartitionedTopicName();
+
+        List<ConsumerImpl> consumersToUnsub = consumers.values().stream()
+            .filter(consumer -> {
+                String consumerTopicName = consumer.getTopic();
+                if (DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).collect(Collectors.toList());
+
+        List<CompletableFuture<Void>> futureList = consumersToUnsub.stream()
+            .map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futureList)
+            .whenComplete((r, ex) -> {
+                if (ex == null) {
+                    consumersToUnsub.forEach(consumer1 -> {
+                        consumers.remove(consumer1.getTopic());
+                        pausedConsumers.remove(consumer1);
+                        numberTopicPartitions.decrementAndGet();
+                    });
+
+                    topics.remove(topicName);
+                    ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
+
+                    unsubscribeFuture.complete(null);
+                    log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, numberTopicPartitions: {}",
+                        topicName, subscription, consumerName, numberTopicPartitions);
+                } else {
+                    unsubscribeFuture.completeExceptionally(ex);
+                    setState(State.Failed);
+                    log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer",
+                        topicName, subscription, consumerName, ex.getCause());
+                }
+            });
+
+        return unsubscribeFuture;
+    }
+
+    // get topics name
+    public List<String> getTopics() {
+        return topics.keySet().stream().collect(Collectors.toList());
+    }
+
+    // get partitioned topics name
+    public List<String> getPartitionedTopics() {
+        return consumers.keySet().stream().collect(Collectors.toList());
+    }
+
+    // get partitioned consumers
+    public List<ConsumerImpl> getConsumers() {
+        return consumers.values().stream().collect(Collectors.toList());
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImpl.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 796abf65f..0066e72ae 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -18,28 +18,25 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
-    private ConcurrentOpenHashSet<MessageIdImpl> currentSet;
-    private ConcurrentOpenHashSet<MessageIdImpl> oldOpenSet;
+    protected ConcurrentOpenHashSet<MessageId> currentSet;
+    protected ConcurrentOpenHashSet<MessageId> oldOpenSet;
     private final ReentrantReadWriteLock readWriteLock;
-    private final Lock readLock;
+    protected final Lock readLock;
     private final Lock writeLock;
     private Timeout timeout;
 
@@ -51,17 +48,17 @@ public void clear() {
         }
 
         @Override
-        public boolean add(MessageIdImpl m) {
+        public boolean add(MessageId m) {
             return true;
         }
 
         @Override
-        public boolean remove(MessageIdImpl m) {
+        public boolean remove(MessageId m) {
             return true;
         }
 
         @Override
-        public int removeMessagesTill(MessageIdImpl msgId) {
+        public int removeMessagesTill(MessageId msgId) {
             return 0;
         }
 
@@ -77,8 +74,8 @@ public UnAckedMessageTracker() {
     }
 
     public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) {
-        currentSet = new ConcurrentOpenHashSet<MessageIdImpl>();
-        oldOpenSet = new ConcurrentOpenHashSet<MessageIdImpl>();
+        currentSet = new ConcurrentOpenHashSet<MessageId>();
+        oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
         readWriteLock = new ReentrantReadWriteLock();
         readLock = readWriteLock.readLock();
         writeLock = readWriteLock.writeLock();
@@ -92,7 +89,7 @@ public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ackTi
             public void run(Timeout t) throws Exception {
                 if (isAckTimeout()) {
                     log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
-                    Set<MessageIdImpl> messageIds = new HashSet<>();
+                    Set<MessageId> messageIds = new HashSet<>();
                     oldOpenSet.forEach(messageIds::add);
                     oldOpenSet.clear();
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
@@ -106,7 +103,7 @@ public void run(Timeout t) throws Exception {
     void toggle() {
         writeLock.lock();
         try {
-            ConcurrentOpenHashSet<MessageIdImpl> temp = currentSet;
+            ConcurrentOpenHashSet<MessageId> temp = currentSet;
             currentSet = oldOpenSet;
             oldOpenSet = temp;
         } finally {
@@ -124,7 +121,7 @@ public void clear() {
         }
     }
 
-    public boolean add(MessageIdImpl m) {
+    public boolean add(MessageId m) {
         readLock.lock();
         try {
             oldOpenSet.remove(m);
@@ -144,7 +141,7 @@ boolean isEmpty() {
         }
     }
 
-    public boolean remove(MessageIdImpl m) {
+    public boolean remove(MessageId m) {
         readLock.lock();
         try {
             return currentSet.remove(m) || oldOpenSet.remove(m);
@@ -171,15 +168,12 @@ private boolean isAckTimeout() {
         }
     }
 
-    public int removeMessagesTill(MessageIdImpl msgId) {
+    public int removeMessagesTill(MessageId msgId) {
         readLock.lock();
         try {
-            int currentSetRemovedMsgCount = currentSet.removeIf(m -> ((m.getLedgerId() < msgId.getLedgerId()
-                    || (m.getLedgerId() == msgId.getLedgerId() && m.getEntryId() <= msgId.getEntryId()))
-                    && m.getPartitionIndex() == msgId.getPartitionIndex()));
-            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> ((m.getLedgerId() < msgId.getLedgerId()
-                    || (m.getLedgerId() == msgId.getLedgerId() && m.getEntryId() <= msgId.getEntryId()))
-                    && m.getPartitionIndex() == msgId.getPartitionIndex()));
+            int currentSetRemovedMsgCount = currentSet.removeIf(m -> (m.compareTo(msgId) <= 0));
+            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> (m.compareTo(msgId) <= 0));
+
             return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
         } finally {
             readLock.unlock();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
new file mode 100644
index 000000000..eceedf650
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedTopicMessageTracker.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class UnAckedTopicMessageTracker extends UnAckedMessageTracker {
+
+    public UnAckedTopicMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) {
+        super(client, consumerBase, ackTimeoutMillis);
+    }
+
+    public int removeTopicMessages(String topicName) {
+        readLock.lock();
+        try {
+            int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
+                checkState(m instanceof TopicMessageIdImpl,
+                    "message should be of type TopicMessageIdImpl");
+                return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+            });
+            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
+                checkState(m instanceof TopicMessageIdImpl,
+                    "message should be of type TopicMessageIdImpl");
+                return ((TopicMessageIdImpl)m).getTopicName().contains(topicName);
+            });
+
+            return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services