You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/22 22:21:08 UTC

[GitHub] zhaijack closed pull request #1165: PIP-13-2/3: support regex based subscription

zhaijack closed pull request #1165: PIP-13-2/3: support regex based subscription
URL: https://github.com/apache/incubator-pulsar/pull/1165
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 8888c97e6..d566825f3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -26,6 +26,7 @@
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
@@ -40,6 +41,7 @@
 import org.apache.pulsar.broker.PulsarServerException;
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -56,6 +58,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStats;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
@@ -967,6 +970,34 @@ protected void handleCloseConsumer(CommandCloseConsumer closeConsumer) {
         }
     }
 
+    @Override
+    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
+        final long requestId = commandGetTopicsOfNamespace.getRequestId();
+        final String property = commandGetTopicsOfNamespace.getProperty();
+        final String cluster = commandGetTopicsOfNamespace.getCluster();
+        final String localName = commandGetTopicsOfNamespace.getLocalName();
+
+        try {
+            List<String> topics = getBrokerService().pulsar()
+                .getNamespaceService()
+                .getListOfDestinations(property, cluster, localName);
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}/{}/{}] by {}, size:{}",
+                    remoteAddress, property, cluster, localName, requestId, topics.size());
+            }
+
+            ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId));
+        } catch (Exception e) {
+            log.warn("[{]] Error GetTopicsOfNamespace for namespace [//{}/{}/{}] by {}",
+                remoteAddress, property, cluster, localName, requestId);
+            ctx.writeAndFlush(
+                Commands.newError(requestId,
+                    BrokerServiceException.getClientErrorCode(new ServerMetadataException(e)),
+                    e.getMessage()));
+        }
+    }
+
     @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
index 64567d274..088b304a2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentQueueE2ETest.java
@@ -539,11 +539,11 @@ public void testUnackedCountWithRedeliveries() throws Exception {
             producer.send(("hello-" + i).getBytes());
         }
 
-        Set<MessageIdImpl> c1_receivedMessages = new HashSet<>();
+        Set<MessageId> c1_receivedMessages = new HashSet<>();
 
         // C-1 gets all messages but doesn't ack
         for (int i = 0; i < numMsgs; i++) {
-            c1_receivedMessages.add((MessageIdImpl) consumer1.receive().getMessageId());
+            c1_receivedMessages.add(consumer1.receive().getMessageId());
         }
 
         // C-2 will not get any message initially, since everything went to C-1 already
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
new file mode 100644
index 000000000..0445792a8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
+    private static final long testTimeout = 90000; // 1.5 min
+    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplTest.class);
+    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        // set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
+        isTcpLookup = true;
+        super.internalSetup();
+    }
+
+    @Override
+    @AfterMethod
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    // verify consumer create success, and works well.
+    @Test(timeOut = testTimeout)
+    public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
+        String key = "BinaryProtoToGetTopics";
+        String subscriptionName = "my-ex-subscription-" + key;
+        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+        // 1. create partition
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. create producer
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+        String messagePredicate = "my-message-" + key + "-";
+        int totalMessages = 30;
+
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 3. Create consumer, this should success
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(pattern, subscriptionName, conf).get();
+        assertTrue(consumer instanceof PatternTopicsConsumerImpl);
+
+        // 4. verify consumer get methods, to get right number of partitions and topics.
+        assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+        List<String> topics = ((PatternTopicsConsumerImpl) consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((PatternTopicsConsumerImpl) consumer).getConsumers();
+
+        assertEquals(topics.size(), 6);
+        assertEquals(consumers.size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+        topics.forEach(topic -> log.info("topic: {}", topic));
+        consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
+
+        IntStream.range(0, topics.size()).forEach(index ->
+            assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
+
+        ((PatternTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.info("getTopics topic: {}", topic));
+
+        // 5. produce data
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        // 6. should receive all the message
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
new file mode 100644
index 000000000..a2cb6d579
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -0,0 +1,522 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.HashSet;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class TopicsConsumerImplTest extends ProducerConsumerBase {
+    private static final long testTimeout = 90000; // 1.5 min
+    private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImplTest.class);
+    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @Override
+    @AfterMethod
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    // Verify subscribe topics from different namespace should return error.
+    @Test(timeOut = testTimeout)
+    public void testDifferentTopicsNameSubscribe() throws Exception {
+        String key = "TopicsFromDifferentNamespace";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc1/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc2/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc3/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        try {
+            Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+            fail("subscribe for topics from different namespace should fail.");
+        } catch (IllegalArgumentException e) {
+            // expected for have different namespace
+        }
+    }
+
+
+    @Test(timeOut = testTimeout)
+    public void testGetConsumersAndGetTopics() throws Exception {
+        String key = "TopicsConsumerGet";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+        topics.forEach(topic -> log.info("topic: {}", topic));
+        consumers.forEach(c -> log.info("consumer: {}", c.getTopic()));
+
+        IntStream.range(0, 6).forEach(index ->
+            assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
+
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+        consumer.unsubscribe();
+        consumer.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testSyncProducerAndConsumer() throws Exception {
+        String key = "TopicsConsumerSyncTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testAsyncConsumer() throws Exception {
+        String key = "TopicsConsumerAsyncTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // Asynchronously produce messages
+        List<Future<MessageId>> futures = Lists.newArrayList();
+        for (int i = 0; i < totalMessages / 3; i++) {
+            futures.add(producer1.sendAsync((messagePredicate + "producer1-" + i).getBytes()));
+            futures.add(producer2.sendAsync((messagePredicate + "producer2-" + i).getBytes()));
+            futures.add(producer3.sendAsync((messagePredicate + "producer3-" + i).getBytes()));
+        }
+        log.info("Waiting for async publish to complete : {}", futures.size());
+        for (Future<MessageId> future : futures) {
+            future.get();
+        }
+
+        log.info("start async consume");
+        CountDownLatch latch = new CountDownLatch(totalMessages);
+        ExecutorService executor = Executors.newFixedThreadPool(1);
+        executor.execute(() -> IntStream.range(0, totalMessages).forEach(index ->
+            consumer.receiveAsync()
+                .thenAccept(msg -> {
+                    assertTrue(msg instanceof TopicMessageImpl);
+                    try {
+                        consumer.acknowledge(msg);
+                    } catch (PulsarClientException e1) {
+                        fail("message acknowledge failed", e1);
+                    }
+                    latch.countDown();
+                    log.info("receive index: {}, latch countDown: {}", index, latch.getCount());
+                })
+                .exceptionally(ex -> {
+                    log.warn("receive index: {}, failed receive message {}", index, ex.getMessage());
+                    ex.printStackTrace();
+                    return null;
+                })));
+
+        latch.await();
+        log.info("success latch wait");
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test(timeOut = testTimeout)
+    public void testConsumerUnackedRedelivery() throws Exception {
+        String key = "TopicsConsumerRedeliveryTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        // 4. Receiver receives the message, not ack, Unacked Message Tracker size should be totalMessages.
+        Message message = consumer.receive();
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            log.debug("Consumer received : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        }
+        long size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, totalMessages);
+
+        // 5. Blocking call, redeliver should kick in, after receive and ack, Unacked Message Tracker size should be 0.
+        message = consumer.receive();
+        HashSet<String> hSet = new HashSet<>();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            hSet.add(new String(message.getData()));
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+
+        size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+        assertEquals(hSet.size(), totalMessages);
+
+        // 6. producer publish more messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round2" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round2" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round2" + i).getBytes());
+        }
+
+        // 7. Receiver receives the message, ack them
+        message = consumer.receive();
+        int received = 0;
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            received++;
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            consumer.acknowledge(message);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+        assertEquals(received, totalMessages);
+
+        // 8. Simulate ackTimeout
+        ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().toggle();
+        ((TopicsConsumerImpl) consumer).getConsumers().forEach(c -> c.getUnAckedMessageTracker().toggle());
+
+        // 9. producer publish more messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round3" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round3" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round3" + i).getBytes());
+        }
+
+        // 10. Receiver receives the message, doesn't ack
+        message = consumer.receive();
+        while (message != null) {
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        size = ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.debug(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 30);
+
+        Thread.sleep(ackTimeOutMillis);
+
+        // 11. Receiver receives redelivered messages
+        message = consumer.receive();
+        int redelivered = 0;
+        while (message != null) {
+            assertTrue(message instanceof TopicMessageImpl);
+            redelivered++;
+            String data = new String(message.getData());
+            log.debug("Consumer received : " + data);
+            consumer.acknowledge(message);
+            message = consumer.receive(100, TimeUnit.MILLISECONDS);
+        }
+        assertEquals(redelivered, 30);
+        size =  ((TopicsConsumerImpl) consumer).getUnAckedMessageTracker().size();
+        log.info(key + " Unacked Message Tracker size is " + size);
+        assertEquals(size, 0);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+    @Test
+    public void testSubscribeUnsubscribeSingleTopic() throws Exception {
+        String key = "TopicsConsumerSubscribeUnsubscribeSingleTopicTest";
+        final String subscriptionName = "my-ex-subscription-" + key;
+        final String messagePredicate = "my-message-" + key + "-";
+        final int totalMessages = 30;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+
+        // 1. producer connect
+        Producer producer1 = pulsarClient.createProducer(topicName1);
+        Producer producer2 = pulsarClient.createProducer(topicName2, producerConfiguration);
+        Producer producer3 = pulsarClient.createProducer(topicName3, producerConfiguration);
+
+        // 2. Create consumer
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        conf.setReceiverQueueSize(4);
+        conf.setAckTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS);
+        conf.setSubscriptionType(SubscriptionType.Shared);
+        Consumer consumer = pulsarClient.subscribeAsync(topicNames, subscriptionName, conf).get();
+        assertTrue(consumer instanceof TopicsConsumerImpl);
+
+        // 3. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 4, unsubscribe topic3
+        CompletableFuture<Void> unsubFuture = ((TopicsConsumerImpl)consumer).unsubscribeAsync(topicName3);
+        unsubFuture.get();
+
+        // 5. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round2" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round2" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round2" + i).getBytes());
+        }
+
+        // 6. should not receive messages from topic3, verify get 2/3 of all messages
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages * 2 / 3);
+
+        // 7. use getter to verify internal topics number after un-subscribe topic3
+        List<String> topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+        assertEquals(topics.size(), 3);
+        assertEquals(consumers.size(), 3);
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 2);
+
+        // 8. re-subscribe topic3
+        CompletableFuture<Void> subFuture = ((TopicsConsumerImpl)consumer).subscribeAsync(topicName3);
+        subFuture.get();
+
+        // 9. producer publish messages
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-round3" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-round3" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-round3" + i).getBytes());
+        }
+
+        // 10. should receive messages from all 3 topics
+        messageSet = 0;
+        message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        // 11. use getter to verify internal topics number after subscribe topic3
+        topics = ((TopicsConsumerImpl) consumer).getPartitionedTopics();
+        consumers = ((TopicsConsumerImpl) consumer).getConsumers();
+
+        assertEquals(topics.size(), 6);
+        assertEquals(consumers.size(), 6);
+        assertTrue(((TopicsConsumerImpl) consumer).getTopics().size() == 3);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
index 27e0ac27a..d62c5564d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Message.java
@@ -124,4 +124,14 @@
      * @return the key of the message
      */
     String getKey();
+
+    /**
+     * Get the topic name of this message.
+     * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic.
+     *
+     * @return the topic name
+     */
+    default String getTopicName() {
+        return null;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
index f74c169dc..9104cede4 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/MessageId.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.client.api;
 
 import java.io.IOException;
-
 import org.apache.pulsar.client.impl.MessageIdImpl;
 
 /**
@@ -37,6 +36,16 @@
      */
     byte[] toByteArray();
 
+    /**
+     * Get the topic name of this MessageId.
+     * This is mainly for TopicsConsumerImpl to identify a message belongs to which topic.
+     *
+     * @return the topic name
+     */
+    default String getTopicName() {
+        return null;
+    }
+
     /**
      * De-serialize a message id from a byte array
      *
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index e15db40a6..244fa9902 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -19,8 +19,10 @@
 package org.apache.pulsar.client.api;
 
 import java.io.Closeable;
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 
+import java.util.regex.Pattern;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 
 /**
@@ -245,4 +247,116 @@ public static PulsarClient create(String serviceUrl, ClientConfiguration conf) t
      *             if the forceful shutdown fails
      */
     void shutdown() throws PulsarClientException;
+
+
+    /**
+     * Subscribe to the given topics and subscription combination with default {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     */
+    Consumer subscribe(Collection<String> topics, String subscription) throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topics and subscription combination with
+     * default {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription);
+
+    /**
+     * Subscribe to the given topics and subscription combination using given {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    Consumer subscribe(Collection<String> topics, String subscription, ConsumerConfiguration conf)
+        throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the given topics and subscription combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
+                                               String subscription,
+                                               ConsumerConfiguration conf);
+
+    /**
+     * Subscribe to the topics with given regex pattern and subscription combination with default
+     * {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return The {@code Consumer} object
+     * @throws PulsarClientException
+     */
+    Consumer subscribe(Pattern topicsPattern, String subscription) throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the topics with given regex pattern and subscription combination with
+     * default {@code ConsumerConfiguration}
+     *
+     * @param topics
+     *            The collection of topic names, they should be under same namespace
+     * @param subscription
+     *            The name of the subscription
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern, String subscription);
+
+    /**
+     * Subscribe to the topics with given regex pattern and subscription combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topicsPattern
+     *            The regex pattern that wanted to subscribe. e.g. "persistent://prop/cluster/ns/abc.*"
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    Consumer subscribe(Pattern topicsPattern, String subscription, ConsumerConfiguration conf)
+        throws PulsarClientException;
+
+    /**
+     * Asynchronously subscribe to the topics with given regex pattern and subscription combination using given
+     * {@code ConsumerConfiguration}
+     *
+     * @param topicsPattern
+     *            The regex pattern that wanted to subscribe. e.g. "persistent://prop/cluster/ns/abc.*"
+     * @param subscription
+     *            The name of the subscription
+     * @param conf
+     *            The {@code ConsumerConfiguration} object
+     * @return Future of the {@code Consumer} object
+     */
+    CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern,
+                                               String subscription,
+                                               ConsumerConfiguration conf);
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index 9f5338d6a..aede9cfec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -20,8 +20,11 @@
 
 import static java.lang.String.format;
 
+import com.google.common.collect.Lists;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -30,6 +33,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -176,6 +180,46 @@ public String getServiceUrl() {
         return serviceAddress.toString();
     }
 
+    @Override
+    public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace) {
+        return getTopicsUnderNamespace(serviceAddress, namespace);
+    }
+
+    private CompletableFuture<List<String>> getTopicsUnderNamespace(InetSocketAddress socketAddress,
+                                                                    NamespaceName namespace) {
+        CompletableFuture<List<String>> topicsFuture = new CompletableFuture<List<String>>();
+
+        client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
+            long requestId = client.newRequestId();
+            ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
+                namespace.getProperty(), namespace.getCluster(), namespace.getLocalName(), requestId);
+
+            clientCnx.newGetTopicsOfNamespace(request, requestId).thenAccept(topicsList -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Success get topics list in request: {}", namespace.toString(), requestId);
+                }
+
+                // do not keep partition part of topic name
+                List<String> result = Lists.newArrayList();
+                topicsList.forEach(topic -> {
+                    String filtered = DestinationName.get(topic).getPartitionedTopicName();
+                    if (!result.contains(filtered)) {
+                        result.add(filtered);
+                    }
+                });
+
+                topicsFuture.complete(result);
+            }).exceptionally((e) -> {
+                log.warn("[{}] failed to get topics list: {}", namespace.toString(), e.getCause().getMessage(), e);
+                topicsFuture.completeExceptionally(e);
+                return null;
+            });
+        });
+
+        return topicsFuture;
+    }
+
+
     @Override
     public void close() throws Exception {
         // no-op
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 85e286745..4d0e947ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -24,6 +24,7 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -41,6 +42,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandCloseProducer;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
@@ -70,6 +72,9 @@
             16, 1);
     private final ConcurrentLongHashMap<CompletableFuture<LookupDataResult>> pendingLookupRequests = new ConcurrentLongHashMap<>(
             16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>(
+        16, 1);
+
     private final ConcurrentLongHashMap<ProducerImpl> producers = new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1);
 
@@ -144,6 +149,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         // Fail out all the pending ops
         pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
         pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
+        pendingGetTopicsRequests.forEach((key, future) -> future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -438,6 +444,42 @@ protected boolean isHandshakeCompleted() {
         return future;
     }
 
+    public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf request, long requestId) {
+        CompletableFuture<List<String>> future = new CompletableFuture<>();
+
+        pendingGetTopicsRequests.put(requestId, future);
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
+                    writeFuture.cause().getMessage());
+                pendingGetTopicsRequests.remove(requestId);
+                future.completeExceptionally(writeFuture.cause());
+            }
+        });
+
+        return future;
+    }
+
+    @Override
+    protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse success) {
+        checkArgument(state == State.Ready);
+
+        long requestId = success.getRequestId();
+        List<String> topics = success.getTopicsList();
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received get topics of namespace success response from server: {} - topics.size: {}",
+                ctx.channel(), success.getRequestId(), topics.size());
+        }
+
+        CompletableFuture<List<String>> requestFuture = pendingGetTopicsRequests.remove(requestId);
+        if (requestFuture != null) {
+            requestFuture.complete(topics);
+        } else {
+            log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId());
+        }
+    }
+
     Promise<Void> newPromise() {
         return ctx.newPromise();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
index 6ef58b26e..90363131d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.collect.Queues;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Set;
@@ -27,7 +28,6 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.Message;
@@ -41,8 +41,6 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
 import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
 
-import com.google.common.collect.Queues;
-
 public abstract class ConsumerBase extends HandlerBase implements Consumer {
 
     enum ConsumerType {
@@ -57,7 +55,7 @@
     protected final ExecutorService listenerExecutor;
     final BlockingQueue<Message> incomingMessages;
     protected final ConcurrentLinkedQueue<CompletableFuture<Message>> pendingReceives;
-    protected final int maxReceiverQueueSize;
+    protected int maxReceiverQueueSize;
 
     protected ConsumerBase(PulsarClientImpl client, String topic, String subscription, ConsumerConfiguration conf,
             int receiverQueueSize, ExecutorService listenerExecutor, CompletableFuture<Consumer> subscribeFuture) {
@@ -330,7 +328,7 @@ public String getSubscription() {
      * the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
      * breaks, the messages are redelivered after reconnect.
      */
-    protected abstract void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds);
+    protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> messageIds);
 
     @Override
     public String toString() {
@@ -340,4 +338,9 @@ public String toString() {
                 ", topic='" + topic + '\'' +
                 '}';
     }
+
+    protected void setMaxReceiverQueueSize(int newSize) {
+        this.maxReceiverQueueSize = newSize;
+    }
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 845ad4467..2df682dc7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -25,8 +25,21 @@
 import static org.apache.pulsar.common.api.Commands.hasChecksum;
 import static org.apache.pulsar.common.api.Commands.readChecksum;
 
+import com.google.common.collect.Iterables;
+import io.netty.buffer.ByteBuf;
+import io.netty.util.Timeout;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -37,7 +50,6 @@
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
-
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -60,13 +72,6 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Iterables;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.util.Timeout;
-import io.netty.util.concurrent.Future;
-import io.netty.util.concurrent.GenericFutureListener;
-
 public class ConsumerImpl extends ConsumerBase {
     private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
 
@@ -1162,7 +1167,9 @@ public void redeliverUnacknowledgedMessages() {
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
+
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
             // We cannot redeliver single messages if subscription type is not Shared
             redeliverUnacknowledgedMessages();
@@ -1171,7 +1178,10 @@ public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
         ClientCnx cnx = cnx();
         if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getNumber()) {
             int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
-            Iterable<List<MessageIdImpl>> batches = Iterables.partition(messageIds, MAX_REDELIVER_UNACKNOWLEDGED);
+            Iterable<List<MessageIdImpl>> batches = Iterables.partition(
+                messageIds.stream()
+                    .map(messageId -> (MessageIdImpl)messageId)
+                    .collect(Collectors.toSet()), MAX_REDELIVER_UNACKNOWLEDGED);
             MessageIdData.Builder builder = MessageIdData.newBuilder();
             batches.forEach(ids -> {
                 List<MessageIdData> messageIdDatas = ids.stream().map(messageId -> {
@@ -1253,7 +1263,7 @@ private MessageIdImpl getMessageIdImpl(Message msg) {
         return messageId;
     }
 
-    private int removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
+    private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         int messagesFromQueue = 0;
         Message peek = incomingMessages.peek();
         if (peek != null) {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 8c12ab167..78ea63715 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -18,23 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
-import java.net.InetAddress;
+import com.google.common.collect.Lists;
+import io.netty.channel.EventLoopGroup;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
-
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.channel.EventLoopGroup;
-
 class HttpLookupService implements LookupService {
 
     private final HttpClient httpClient;
@@ -89,6 +90,29 @@ public String getServiceUrl() {
     	return httpClient.url.toString();
     }
 
+    @Override
+    public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace) {
+        CompletableFuture<List<String>> future = new CompletableFuture<>();
+        httpClient
+            .get(String.format("admin/namespaces/%s/destinations", namespace.getLookupName()), String[].class)
+            .thenAccept(topics -> {
+                List<String> result = Lists.newArrayList();
+                // do not keep partition part of topic name
+                Arrays.asList(topics).forEach(topic -> {
+                    String filtered = DestinationName.get(topic).getPartitionedTopicName();
+                    if (!result.contains(filtered)) {
+                        result.add(filtered);
+                    }
+                });
+                future.complete(result);})
+            .exceptionally(ex -> {
+                log.warn("Failed to getTopicsUnderNamespace namespace: {}.", namespace, ex.getMessage());
+                future.completeExceptionally(ex);
+                return null;
+            });
+        return future;
+    }
+
     @Override
     public void close() throws Exception {
         httpClient.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index 63e85a313..71f5ef731 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -19,10 +19,13 @@
 package org.apache.pulsar.client.impl;
 
 import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
 /**
@@ -62,4 +65,13 @@
 	 * @return
 	 */
 	public String getServiceUrl();
+
+	/**
+	 * Returns all the topics name for a given namespace.
+	 *
+	 * @param namespace : namespace-name
+	 * @return
+	 */
+	public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace);
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
index 1dd3e4b69..ad18bcb4b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedConsumerImpl.java
@@ -468,7 +468,8 @@ public void redeliverUnacknowledgedMessages() {
     }
 
     @Override
-    public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof MessageIdImpl);
         if (conf.getSubscriptionType() != SubscriptionType.Shared) {
             // We cannot redeliver single messages if subscription type is not Shared
             redeliverUnacknowledgedMessages();
@@ -476,9 +477,11 @@ public void redeliverUnacknowledgedMessages(Set<MessageIdImpl> messageIds) {
         }
         removeExpiredMessagesFromQueue(messageIds);
         messageIds.stream()
-                .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet()))
-                .forEach((partitionIndex, messageIds1) ->
-                        consumers.get(partitionIndex).redeliverUnacknowledgedMessages(messageIds1));
+            .map(messageId -> (MessageIdImpl)messageId)
+            .collect(Collectors.groupingBy(MessageIdImpl::getPartitionIndex, Collectors.toSet()))
+            .forEach((partitionIndex, messageIds1) ->
+                consumers.get(partitionIndex).redeliverUnacknowledgedMessages(
+                    messageIds1.stream().map(mid -> (MessageId)mid).collect(Collectors.toSet())));
         resumeReceivingFromPausedConsumersIfNeeded();
     }
 
@@ -546,10 +549,10 @@ public UnAckedMessageTracker getUnAckedMessageTracker() {
         return unAckedMessageTracker;
     }
 
-    private void removeExpiredMessagesFromQueue(Set<MessageIdImpl> messageIds) {
+    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
         Message peek = incomingMessages.peek();
         if (peek != null) {
-            if (!messageIds.contains((MessageIdImpl) peek.getMessageId())) {
+            if (!messageIds.contains(peek.getMessageId())) {
                 // first message is not expired, then no message is expired in queue.
                 return;
             }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
new file mode 100644
index 000000000..43250a472
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.regex.Pattern;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PatternTopicsConsumerImpl extends TopicsConsumerImpl {
+    private final Pattern topicsPattern;
+
+    public PatternTopicsConsumerImpl(Pattern topicsPattern,
+                              PulsarClientImpl client,
+                              Collection<String> topics,
+                              String subscription,
+                              ConsumerConfiguration conf,
+                              ExecutorService listenerExecutor,
+                              CompletableFuture<Consumer> subscribeFuture) {
+        super(client, topics, subscription, conf, listenerExecutor, subscribeFuture);
+        this.topicsPattern = topicsPattern;
+    }
+
+    public Pattern getPattern() {
+        return this.topicsPattern;
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 9f108211e..176e0e47b 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -19,7 +19,9 @@
 package org.apache.pulsar.client.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static com.google.common.base.Preconditions.checkState;
 
+import java.util.Collection;
 import java.util.IdentityHashMap;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
@@ -30,6 +32,8 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerConfiguration;
@@ -43,6 +47,7 @@
 import org.apache.pulsar.client.util.ExecutorProvider;
 import org.apache.pulsar.client.util.FutureUtil;
 import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.netty.EventLoopUtil;
 import org.slf4j.Logger;
@@ -271,6 +276,148 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
         return consumerSubscribedFuture;
     }
 
+
+    @Override
+    public Consumer subscribe(Collection<String> topics, final String subscription) throws PulsarClientException {
+        return subscribe(topics, subscription, new ConsumerConfiguration());
+    }
+
+    @Override
+    public Consumer subscribe(Collection<String> topics,
+                              String subscription,
+                              ConsumerConfiguration conf)
+        throws PulsarClientException {
+        try {
+            return subscribeAsync(topics, subscription, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics, String subscription) {
+        return subscribeAsync(topics, subscription, new ConsumerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Collection<String> topics,
+                                                      String subscription,
+                                                      ConsumerConfiguration conf) {
+        if (topics == null || topics.isEmpty()) {
+            return FutureUtil.failedFuture(new PulsarClientException.InvalidTopicNameException("Empty topics name"));
+        }
+
+        if (state.get() != State.Open) {
+            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
+        }
+
+        if (isBlank(subscription)) {
+            return FutureUtil
+                .failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+        }
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+        }
+
+        CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
+
+        ConsumerBase consumer = new TopicsConsumerImpl(PulsarClientImpl.this, topics, subscription,
+            conf, externalExecutorProvider.getExecutor(),
+            consumerSubscribedFuture);
+        synchronized (consumers) {
+            consumers.put(consumer, Boolean.TRUE);
+        }
+
+        return consumerSubscribedFuture;
+    }
+
+    @Override
+    public Consumer subscribe(Pattern topicsPattern, final String subscription) throws PulsarClientException {
+        return subscribe(topicsPattern, subscription, new ConsumerConfiguration());
+    }
+
+    @Override
+    public Consumer subscribe(Pattern topicsPattern,
+                              String subscription,
+                              ConsumerConfiguration conf)
+        throws PulsarClientException {
+        try {
+            return subscribeAsync(topicsPattern, subscription, conf).get();
+        } catch (ExecutionException e) {
+            Throwable t = e.getCause();
+            if (t instanceof PulsarClientException) {
+                throw (PulsarClientException) t;
+            } else {
+                throw new PulsarClientException(t);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern, String subscription) {
+        return subscribeAsync(topicsPattern, subscription, new ConsumerConfiguration());
+    }
+
+    @Override
+    public CompletableFuture<Consumer> subscribeAsync(Pattern topicsPattern,
+                                                      String subscription,
+                                                      ConsumerConfiguration conf) {
+        if (state.get() != State.Open) {
+            return FutureUtil.failedFuture(new PulsarClientException.AlreadyClosedException("Client already closed"));
+        }
+        if (isBlank(subscription)) {
+            return FutureUtil
+                .failedFuture(new PulsarClientException.InvalidConfigurationException("Empty subscription name"));
+        }
+        if (conf == null) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.InvalidConfigurationException("Consumer configuration undefined"));
+        }
+
+        String regex = topicsPattern.pattern();
+        DestinationName destination = DestinationName.get(regex);
+        NamespaceName namespaceName = destination.getNamespaceObject();
+
+        CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
+        lookup.getTopicsUnderNamespace(namespaceName)
+            .thenAccept(topics -> {
+                List<String> topicsList = topics.stream()
+                    .filter(topic -> {
+                        DestinationName destinationName = DestinationName.get(topic);
+                        checkState(destinationName.getNamespaceObject().equals(namespaceName));
+                        return topicsPattern.matcher(destinationName.toString()).matches();
+                    })
+                    .collect(Collectors.toList());
+
+                ConsumerBase consumer = new PatternTopicsConsumerImpl(topicsPattern, PulsarClientImpl.this, topicsList, subscription,
+                    conf, externalExecutorProvider.getExecutor(),
+                    consumerSubscribedFuture);
+
+                synchronized (consumers) {
+                    consumers.put(consumer, Boolean.TRUE);
+                }
+            })
+            .exceptionally(ex -> {
+                log.warn("[{}] Failed to get topics under namespace", namespaceName);
+                consumerSubscribedFuture.completeExceptionally(ex);
+                return null;
+            });
+
+        return consumerSubscribedFuture;
+    }
+
     @Override
     public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
             throws PulsarClientException {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
new file mode 100644
index 000000000..26d91d02a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageIdImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageIdImpl implements MessageId {
+    private final String topicName;
+    private final MessageId messageId;
+
+    TopicMessageIdImpl(String topicName, MessageId messageId) {
+        this.topicName = topicName;
+        this.messageId = messageId;
+    }
+
+    @Override
+    public String getTopicName() {
+        return topicName;
+    }
+
+    public MessageId getInnerMessageId() {
+        return messageId;
+    }
+
+    @Override
+    public byte[] toByteArray() {
+        return messageId.toByteArray();
+    }
+
+    @Override
+    public int compareTo(MessageId o) {
+        return messageId.compareTo(o);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
new file mode 100644
index 000000000..00759fa97
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicMessageImpl.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import java.util.Map;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+
+public class TopicMessageImpl implements Message {
+
+    private final String topicName;
+    private final Message msg;
+    private final MessageId msgId;
+
+    TopicMessageImpl(String topicName,
+                     Message msg) {
+        this.topicName = topicName;
+        this.msg = msg;
+        this.msgId = new TopicMessageIdImpl(topicName, msg.getMessageId());
+    }
+
+    @Override
+    public String getTopicName() {
+        return topicName;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return msg.getProperties();
+    }
+
+    @Override
+    public boolean hasProperty(String name) {
+        return msg.hasProperty(name);
+    }
+
+    @Override
+    public String getProperty(String name) {
+        return msg.getProperty(name);
+    }
+
+    @Override
+    public byte[] getData() {
+        return msg.getData();
+    }
+
+    @Override
+    public MessageId getMessageId() {
+        return msgId;
+    }
+
+    @Override
+    public long getPublishTime() {
+        return msg.getPublishTime();
+    }
+
+    @Override
+    public long getEventTime() {
+        return msg.getEventTime();
+    }
+
+    @Override
+    public long getSequenceId() {
+        return msg.getSequenceId();
+    }
+
+    @Override
+    public String getProducerName() {
+        return msg.getProducerName();
+    }
+
+    @Override
+    public boolean hasKey() {
+        return msg.hasKey();
+    }
+
+    @Override
+    public String getKey() {
+        return msg.getKey();
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
new file mode 100644
index 000000000..83e730d7a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicsConsumerImpl.java
@@ -0,0 +1,881 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+
+import com.google.common.collect.Lists;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerConfiguration;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.util.ConsumerName;
+import org.apache.pulsar.client.util.FutureUtil;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
+import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.NamespaceName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TopicsConsumerImpl extends ConsumerBase {
+
+    // All topics should be in same namespace
+    protected NamespaceName namespaceName;
+
+    // Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name
+    private final ConcurrentHashMap<String, ConsumerImpl> consumers;
+
+    // Map <topic, partitionNumber>, store partition number for each topic
+    private final ConcurrentHashMap<String, Integer> topics;
+
+    // Queue of partition consumers on which we have stopped calling receiveAsync() because the
+    // shared incoming queue was full
+    private final ConcurrentLinkedQueue<ConsumerImpl> pausedConsumers;
+
+    // Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to
+    // resume receiving from the paused consumer partitions
+    private final int sharedQueueResumeThreshold;
+
+    // sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number.
+    AtomicInteger numberTopicPartitions;
+
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    private final ConsumerStats stats;
+    private final UnAckedMessageTracker unAckedMessageTracker;
+    private final ConsumerConfiguration internalConfig;
+
+    TopicsConsumerImpl(PulsarClientImpl client, Collection<String> topics, String subscription,
+                       ConsumerConfiguration conf, ExecutorService listenerExecutor,
+                       CompletableFuture<Consumer> subscribeFuture) {
+        super(client, "TopicsConsumerFakeTopicName" + ConsumerName.generateRandomName(), subscription,
+            conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor,
+            subscribeFuture);
+
+        checkArgument(conf.getReceiverQueueSize() > 0,
+            "Receiver queue size needs to be greater than 0 for Topics Consumer");
+
+        this.topics = new ConcurrentHashMap<>();
+        this.consumers = new ConcurrentHashMap<>();
+        this.pausedConsumers = new ConcurrentLinkedQueue<>();
+        this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
+        this.numberTopicPartitions = new AtomicInteger(0);
+
+        if (conf.getAckTimeoutMillis() != 0) {
+            this.unAckedMessageTracker = new UnAckedMessageTracker(client, this, conf.getAckTimeoutMillis());
+        } else {
+            this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
+        }
+
+        this.internalConfig = getInternalConsumerConfig();
+        this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStats() : null;
+
+        if (topics.isEmpty()) {
+            this.namespaceName = null;
+            setState(State.Ready);
+            // We have successfully created N consumers, so we can start receiving messages now
+            subscribeFuture().complete(TopicsConsumerImpl.this);
+            return;
+        }
+
+        checkArgument(topics.isEmpty() || !topicsNameInvalid(topics), "Topics should have same namespace.");
+        this.namespaceName = DestinationName.get(topics.stream().findFirst().get()).getNamespaceObject();
+
+        List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
+        topics.forEach(topic -> futures.add(subscribeAsync(topic)));
+        FutureUtil.waitForAll(futures)
+            .thenAccept(finalFuture -> {
+                try {
+                    if (numberTopicPartitions.get() > maxReceiverQueueSize) {
+                        setMaxReceiverQueueSize(numberTopicPartitions.get());
+                    }
+                    setState(State.Ready);
+                    // We have successfully created N consumers, so we can start receiving messages now
+                    startReceivingMessages(consumers.values().stream().collect(Collectors.toList()));
+                    subscribeFuture().complete(TopicsConsumerImpl.this);
+                    log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
+                        topic, subscription, numberTopicPartitions.get());
+                } catch (PulsarClientException e) {
+                    log.warn("[{}] Failed startReceivingMessages while subscribe topics: {}", topic, e.getMessage());
+                    subscribeFuture.completeExceptionally(e);
+                }})
+            .exceptionally(ex -> {
+                log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage());
+                subscribeFuture.completeExceptionally(ex);
+                return null;
+            });
+    }
+
+    // Check topics are valid.
+    // - each topic is valid,
+    // - every topic has same namespace.
+    private static boolean topicsNameInvalid(Collection<String> topics) {
+        checkState(topics != null && topics.size() > 1,
+            "topics should should contain more than 1 topics");
+
+        final String namespace = DestinationName.get(topics.stream().findFirst().get()).getNamespace();
+
+        Optional<String> result = topics.stream()
+            .filter(topic -> {
+                boolean topicInvalid = !DestinationName.isValid(topic);
+                if (topicInvalid) {
+                    return true;
+                }
+
+                String newNamespace =  DestinationName.get(topic).getNamespace();
+                if (!namespace.equals(newNamespace)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).findFirst();
+        if (result.isPresent()) {
+            log.warn("[{}] Received invalid topic name.  {}/{}", result.get());
+        }
+        return result.isPresent();
+    }
+
+    private void startReceivingMessages(List<ConsumerImpl> newConsumers) throws PulsarClientException {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer",
+                topic, newConsumers.size());
+        }
+        if (getState() == State.Ready) {
+            newConsumers.forEach(consumer -> {
+                consumer.sendFlowPermitsToBroker(consumer.cnx(), conf.getReceiverQueueSize());
+                receiveMessageFromConsumer(consumer);
+            });
+        }
+    }
+
+    private void receiveMessageFromConsumer(ConsumerImpl consumer) {
+        consumer.receiveAsync().thenAccept(message -> {
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] [{}] Receive message from sub consumer:{}",
+                    topic, subscription, consumer.getTopic());
+            }
+            // Process the message, add to the queue and trigger listener or async callback
+            messageReceived(consumer, message);
+
+            // we're modifying pausedConsumers
+            lock.writeLock().lock();
+            try {
+                int size = incomingMessages.size();
+                if (size >= maxReceiverQueueSize
+                        || (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
+                    // mark this consumer to be resumed later: if No more space left in shared queue,
+                    // or if any consumer is already paused (to create fair chance for already paused consumers)
+                    pausedConsumers.add(consumer);
+                } else {
+                    // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
+                    // recursion and stack overflow
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        });
+    }
+
+    private void messageReceived(ConsumerImpl consumer, Message message) {
+        checkArgument(message instanceof MessageImpl);
+        lock.writeLock().lock();
+        try {
+            TopicMessageImpl topicMessage = new TopicMessageImpl(consumer.getTopic(), message);
+            unAckedMessageTracker.add(topicMessage.getMessageId());
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}][{}] Received message from topics-consumer {}",
+                    topic, subscription, message.getMessageId());
+            }
+
+            // if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
+            if (!pendingReceives.isEmpty()) {
+                CompletableFuture<Message> receivedFuture = pendingReceives.poll();
+                listenerExecutor.execute(() -> receivedFuture.complete(topicMessage));
+            } else {
+                // Enqueue the message so that it can be retrieved when application calls receive()
+                // Waits for the queue to have space for the message
+                // This should never block cause PartitonedConsumerImpl should always use GrowableArrayBlockingQueue
+                incomingMessages.put(topicMessage);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        if (listener != null) {
+            // Trigger the notification on the message listener in a separate thread to avoid blocking the networking
+            // thread while the message processing happens
+            listenerExecutor.execute(() -> {
+                Message msg;
+                try {
+                    msg = internalReceive();
+                } catch (PulsarClientException e) {
+                    log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
+                    return;
+                }
+
+                try {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}][{}] Calling message listener for message {}",
+                            topic, subscription, message.getMessageId());
+                    }
+                    listener.received(TopicsConsumerImpl.this, msg);
+                } catch (Throwable t) {
+                    log.error("[{}][{}] Message listener error in processing message: {}",
+                        topic, subscription, message, t);
+                }
+            });
+        }
+    }
+
+    private void resumeReceivingFromPausedConsumersIfNeeded() {
+        lock.readLock().lock();
+        try {
+            if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
+                while (true) {
+                    ConsumerImpl consumer = pausedConsumers.poll();
+                    if (consumer == null) {
+                        break;
+                    }
+
+                    // if messages are readily available on consumer we will attempt to writeLock on the same thread
+                    client.eventLoopGroup().execute(() -> {
+                        receiveMessageFromConsumer(consumer);
+                    });
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    protected Message internalReceive() throws PulsarClientException {
+        Message message;
+        try {
+            message = incomingMessages.take();
+            checkState(message instanceof TopicMessageImpl);
+            unAckedMessageTracker.add(message.getMessageId());
+            resumeReceivingFromPausedConsumersIfNeeded();
+            return message;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    protected Message internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
+        Message message;
+        try {
+            message = incomingMessages.poll(timeout, unit);
+            if (message != null) {
+                checkArgument(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+            }
+            resumeReceivingFromPausedConsumersIfNeeded();
+            return message;
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    protected CompletableFuture<Message> internalReceiveAsync() {
+        CompletableFuture<Message> result = new CompletableFuture<>();
+        Message message;
+        try {
+            lock.writeLock().lock();
+            message = incomingMessages.poll(0, TimeUnit.SECONDS);
+            if (message == null) {
+                pendingReceives.add(result);
+            } else {
+                checkState(message instanceof TopicMessageImpl);
+                unAckedMessageTracker.add(message.getMessageId());
+                resumeReceivingFromPausedConsumersIfNeeded();
+                result.complete(message);
+            }
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            result.completeExceptionally(new PulsarClientException(e));
+        } finally {
+            lock.writeLock().unlock();
+        }
+
+        return result;
+    }
+
+    @Override
+    protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
+                                                    Map<String,Long> properties) {
+        checkArgument(messageId instanceof TopicMessageIdImpl);
+
+        if (getState() != State.Ready) {
+            return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
+        }
+
+        if (ackType == AckType.Cumulative) {
+            return FutureUtil.failedFuture(new PulsarClientException.NotSupportedException(
+                    "Cumulative acknowledge not supported for topics consumer"));
+        } else {
+            ConsumerImpl consumer = consumers.get(messageId.getTopicName());
+
+            MessageId innerId = ((TopicMessageIdImpl)messageId).getInnerMessageId();
+            return consumer.doAcknowledge(innerId, ackType, properties)
+                .thenRun(() ->
+                    unAckedMessageTracker.remove(messageId));
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> unsubscribeAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                    new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+        }
+        setState(State.Closing);
+
+        AtomicReference<Throwable> unsubscribeFail = new AtomicReference<Throwable>();
+        AtomicInteger completed = new AtomicInteger(numberTopicPartitions.get());
+        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+
+        consumers.values().stream().forEach(consumer -> {
+            if (consumer != null) {
+                consumer.unsubscribeAsync().handle((unsubscribed, ex) -> {
+                    if (ex != null) {
+                        unsubscribeFail.compareAndSet(null, ex);
+                    }
+                    if (completed.decrementAndGet() == 0) {
+                        if (unsubscribeFail.get() == null) {
+                            setState(State.Closed);
+                            unAckedMessageTracker.close();
+                            unsubscribeFuture.complete(null);
+                            log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
+                                topic, subscription, consumerName);
+                        } else {
+                            setState(State.Failed);
+                            unsubscribeFuture.completeExceptionally(unsubscribeFail.get());
+                            log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer",
+                                topic, subscription, consumerName, unsubscribeFail.get().getCause());
+                        }
+                    }
+
+                    return null;
+                });
+            }
+        });
+
+        return unsubscribeFuture;
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        if (getState() == State.Closing || getState() == State.Closed) {
+            unAckedMessageTracker.close();
+            return CompletableFuture.completedFuture(null);
+        }
+        setState(State.Closing);
+
+        AtomicReference<Throwable> closeFail = new AtomicReference<Throwable>();
+        AtomicInteger completed = new AtomicInteger(numberTopicPartitions.get());
+        CompletableFuture<Void> closeFuture = new CompletableFuture<>();
+
+        consumers.values().stream().forEach(consumer -> {
+            if (consumer != null) {
+                consumer.closeAsync().handle((closed, ex) -> {
+                    if (ex != null) {
+                        closeFail.compareAndSet(null, ex);
+                    }
+
+                    if (log.isDebugEnabled()) {
+                        log.debug("[[{}] [{}] Closed Topics Consumer {}, want close {}, completed {}",
+                            topic, subscription, consumer, consumers.size(), completed.get());
+                    }
+
+                    if (completed.decrementAndGet() == 0) {
+                        if (closeFail.get() == null) {
+                            setState(State.Closed);
+                            unAckedMessageTracker.close();
+                            closeFuture.complete(null);
+                            log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
+                            client.cleanupConsumer(this);
+                            // fail all pending-receive futures to notify application
+                            failPendingReceive();
+                        } else {
+                            setState(State.Failed);
+                            closeFuture.completeExceptionally(closeFail.get());
+                            log.error("[{}] [{}] Could not close Topics Consumer", topic, subscription,
+                                closeFail.get().getCause());
+                        }
+                    }
+
+                    return null;
+                });
+            }
+        });
+
+        return closeFuture;
+    }
+
+    private void failPendingReceive() {
+        lock.readLock().lock();
+        try {
+            if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
+                while (!pendingReceives.isEmpty()) {
+                    CompletableFuture<Message> receiveFuture = pendingReceives.poll();
+                    if (receiveFuture != null) {
+                        receiveFuture.completeExceptionally(
+                                new PulsarClientException.AlreadyClosedException("Consumer is already closed"));
+                    } else {
+                        break;
+                    }
+                }
+            }
+        } finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    @Override
+    public boolean isConnected() {
+        return consumers.values().stream().allMatch(consumer -> consumer.isConnected());
+    }
+
+    @Override
+    void connectionFailed(PulsarClientException exception) {
+        // noop
+
+    }
+
+    @Override
+    void connectionOpened(ClientCnx cnx) {
+        // noop
+
+    }
+
+    @Override
+    String getHandlerName() {
+        return subscription;
+    }
+
+    private ConsumerConfiguration getInternalConsumerConfig() {
+        ConsumerConfiguration internalConsumerConfig = new ConsumerConfiguration();
+        internalConsumerConfig.setReceiverQueueSize(conf.getReceiverQueueSize());
+        internalConsumerConfig.setSubscriptionType(conf.getSubscriptionType());
+        internalConsumerConfig.setConsumerName(consumerName);
+        if (conf.getCryptoKeyReader() != null) {
+            internalConsumerConfig.setCryptoKeyReader(conf.getCryptoKeyReader());
+            internalConsumerConfig.setCryptoFailureAction(conf.getCryptoFailureAction());
+        }
+        if (conf.getAckTimeoutMillis() != 0) {
+            internalConsumerConfig.setAckTimeout(conf.getAckTimeoutMillis(), TimeUnit.MILLISECONDS);
+        }
+
+        return internalConsumerConfig;
+    }
+
+    @Override
+    public void redeliverUnacknowledgedMessages() {
+        synchronized (this) {
+            consumers.values().stream().forEach(consumer -> consumer.redeliverUnacknowledgedMessages());
+            incomingMessages.clear();
+            unAckedMessageTracker.clear();
+            resumeReceivingFromPausedConsumersIfNeeded();
+        }
+    }
+
+    @Override
+    public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
+        checkArgument(messageIds.stream().findFirst().get() instanceof TopicMessageIdImpl);
+
+        if (conf.getSubscriptionType() != SubscriptionType.Shared) {
+            // We cannot redeliver single messages if subscription type is not Shared
+            redeliverUnacknowledgedMessages();
+            return;
+        }
+        removeExpiredMessagesFromQueue(messageIds);
+        messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
+            .collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicName, Collectors.toSet()))
+            .forEach((topicName, messageIds1) ->
+                consumers.get(topicName)
+                    .redeliverUnacknowledgedMessages(messageIds1.stream()
+                        .map(mid -> mid.getInnerMessageId()).collect(Collectors.toSet())));
+        resumeReceivingFromPausedConsumersIfNeeded();
+    }
+
+    @Override
+    public void seek(MessageId messageId) throws PulsarClientException {
+        try {
+            seekAsync(messageId).get();
+        } catch (ExecutionException e) {
+            throw new PulsarClientException(e.getCause());
+        } catch (InterruptedException e) {
+            throw new PulsarClientException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> seekAsync(MessageId messageId) {
+        return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
+    }
+
+    /**
+     * helper method that returns current state of data structure used to track acks for batch messages
+     *
+     * @return true if all batch messages have been acknowledged
+     */
+    public boolean isBatchingAckTrackerEmpty() {
+        return consumers.values().stream().allMatch(consumer -> consumer.isBatchingAckTrackerEmpty());
+    }
+
+
+    @Override
+    public int getAvailablePermits() {
+        return consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
+    }
+
+    @Override
+    public boolean hasReachedEndOfTopic() {
+        return consumers.values().stream().allMatch(Consumer::hasReachedEndOfTopic);
+    }
+
+    @Override
+    public int numMessagesInQueue() {
+        return incomingMessages.size() + consumers.values().stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
+    }
+
+    @Override
+    public synchronized ConsumerStats getStats() {
+        if (stats == null) {
+            return null;
+        }
+        stats.reset();
+
+        consumers.values().stream().forEach(consumer -> stats.updateCumulativeStats(consumer.getStats()));
+        return stats;
+    }
+
+    public UnAckedMessageTracker getUnAckedMessageTracker() {
+        return unAckedMessageTracker;
+    }
+
+    private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
+        Message peek = incomingMessages.peek();
+        if (peek != null) {
+            if (!messageIds.contains(peek.getMessageId())) {
+                // first message is not expired, then no message is expired in queue.
+                return;
+            }
+
+            // try not to remove elements that are added while we remove
+            Message message = incomingMessages.poll();
+            checkState(message instanceof TopicMessageImpl);
+            while (message != null) {
+                MessageId messageId = message.getMessageId();
+                if (!messageIds.contains(messageId)) {
+                    messageIds.add(messageId);
+                    break;
+                }
+                message = incomingMessages.poll();
+            }
+        }
+    }
+
+    private boolean topicNameValid(String topicName) {
+        checkArgument(DestinationName.isValid(topicName), "Invalid topic name:" + topicName);
+        checkArgument(!topics.containsKey(topicName), "Topics already contains topic:" + topicName);
+
+        if (this.namespaceName != null) {
+            checkArgument(DestinationName.get(topicName).getNamespace().toString().equals(this.namespaceName.toString()),
+                "Topic " + topicName + " not in same namespace with Topics");
+        }
+
+        return true;
+    }
+
+    // subscribe one more given topic
+    public CompletableFuture<Void> subscribeAsync(String topicName) {
+        if (!topicNameValid(topicName)) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topic name not valid"));
+        }
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
+
+        AtomicReference<Throwable> subscribeFailure = new AtomicReference<Throwable>();
+        final AtomicInteger partitionNumber = new AtomicInteger(0);
+
+        client.getPartitionedTopicMetadata(topicName).thenAccept(metadata -> {
+            if (log.isDebugEnabled()) {
+                log.debug("Received topic {} metadata.partitions: {}", topicName, metadata.partitions);
+            }
+
+            if (metadata.partitions > 1) {
+                this.topics.putIfAbsent(topicName, metadata.partitions);
+                numberTopicPartitions.addAndGet(metadata.partitions);
+                partitionNumber.addAndGet(metadata.partitions);
+
+                IntStream.range(0, metadata.partitions).forEach(partitionIndex -> {
+                    String partitionName = DestinationName.get(topicName).getPartition(partitionIndex).toString();
+                    addConsumerForOneTopic(
+                        new ConsumerImpl(client, partitionName, subscription, internalConfig,
+                            client.externalExecutorProvider().getExecutor(), partitionIndex,
+                            new CompletableFuture<Consumer>()),
+                        topicName,
+                        partitionNumber,
+                        subscribeFailure,
+                        subscribeResult);
+                });
+            } else {
+                this.topics.putIfAbsent(topicName, 1);
+                numberTopicPartitions.incrementAndGet();
+                partitionNumber.incrementAndGet();
+
+                addConsumerForOneTopic(
+                    new ConsumerImpl(client, topicName, subscription, internalConfig,
+                        client.externalExecutorProvider().getExecutor(), 0,
+                        new CompletableFuture<Consumer>()),
+                    topicName,
+                    partitionNumber,
+                    subscribeFailure,
+                    subscribeResult);
+            }
+        }).exceptionally(ex -> {
+            log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex.getMessage());
+            subscribeResult.completeExceptionally(ex);
+            return null;
+        });
+
+        return subscribeResult;
+    }
+
+    // handling failure during subscribe new topic, unsubscribe success created partitions
+    private void handleSubscribeOneTopicError(String topicName, Throwable error) {
+        log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer ", topic, topicName, error.getMessage());
+
+        consumers.values().stream().filter(consumer1 -> {
+            String consumerTopicName = consumer1.getTopic();
+            if (DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicName)) {
+                return true;
+            } else {
+                return false;
+            }
+        }).forEach(consumer2 ->  {
+            consumer2.closeAsync().handle((ok, closeException) -> {
+                consumer2.subscribeFuture().completeExceptionally(error);
+                return null;
+            });
+            consumers.remove(consumer2.getTopic());
+        });
+
+        topics.remove(topicName);
+        checkState(numberTopicPartitions.get() == consumers.values().size());
+    }
+
+    /**
+     * Add consumer for the given topic.
+     * @param numPartitionsToCreate the partition number for this topicName. once success add a consumer, it get a
+     *                              decrement. when this reach 0, it has success add all consumers.
+     * @param subscribeFailure this use to record the Throwable that meet during consumers subscribe.
+     * @param resultFuture the final result future for the subscribe of new topic.
+     */
+    private void addConsumerForOneTopic(ConsumerImpl consumer,
+                                        String topicName,
+                                        AtomicInteger numPartitionsToCreate,
+                                        AtomicReference<Throwable> subscribeFailure,
+                                        CompletableFuture<Void> resultFuture) {
+        if (log.isDebugEnabled()) {
+            log.debug("[{}] Put topic consumer-{} into topics consumer for {}", topic, consumer.getTopic(), topicName);
+        }
+        consumer.subscribeFuture().handle((cons, subscribeException) -> {
+            if (subscribeException != null) {
+                subscribeFailure.compareAndSet(null, subscribeException);
+                handleSubscribeOneTopicError(topicName, subscribeException);
+                resultFuture.completeExceptionally(subscribeException);
+            } else {
+                // success subscribe this consumer, add it in.
+                consumers.putIfAbsent(consumer.getTopic(), consumer);
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] topic consumer-{} subscribed success. numPartitionsToCreate: {}",
+                        topic, consumer.getTopic(), numPartitionsToCreate.get());
+                }
+            }
+
+            // completed all consumer subscribe
+            if (numPartitionsToCreate.decrementAndGet() == 0) {
+                if (subscribeFailure.get() == null) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Success subscribed for topic {} in topics consumer",
+                            topic, topicName);
+                    }
+
+                    try {
+                        if (numberTopicPartitions.get() > maxReceiverQueueSize) {
+                            setMaxReceiverQueueSize(numberTopicPartitions.get());
+                        }
+
+                        int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum();
+                        checkState(numberTopicPartitions.get() == numTopics,
+                            "numberTopicPartitions "+ numberTopicPartitions.get()
+                                + " not equals expected: " + numTopics);
+
+                        // We have successfully created new consumers, so we can start receiving messages for them
+                        startReceivingMessages(
+                            consumers.values().stream()
+                                .filter(consumer1 -> {
+                                    String consumerTopicName = consumer1.getTopic();
+                                    if (DestinationName.get(consumerTopicName)
+                                        .getPartitionedTopicName().equals(topicName)) {
+                                        return true;
+                                    } else {
+                                        return false;
+                                    }
+                                })
+                                .collect(Collectors.toList()));
+
+                        resultFuture.complete(null);
+                        log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, numberTopicPartitions {}",
+                            topic, subscription, topicName, numberTopicPartitions.get());
+                        if (this.namespaceName == null) {
+                            this.namespaceName = DestinationName.get(topicName).getNamespaceObject();
+                        }
+                        return null;
+                    } catch (PulsarClientException e) {
+                        handleSubscribeOneTopicError(topicName, e);
+                        resultFuture.completeExceptionally(e);
+                    }
+                }
+
+                log.error("[{}] [{}] Could not subscribe more topic {} in topics consumer.",
+                    topic, subscription, topicName, subscribeFailure.get().getCause());
+            }
+            return null;
+        });
+    }
+
+    // un-subscribe a given topic
+    public CompletableFuture<Void> unsubscribeAsync(String topicName) {
+        checkArgument(DestinationName.isValid(topicName), "Invalid topic name:" + topicName);
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+        }
+
+        AtomicReference<Throwable> unsubscribeFail = new AtomicReference<Throwable>();
+        AtomicInteger completed = new AtomicInteger(topics.get(topicName));
+        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+        ConcurrentLinkedQueue<ConsumerImpl> successConsumers = new ConcurrentLinkedQueue<>();
+
+        String topicPartName = DestinationName.get(topicName).getPartitionedTopicName();
+
+        consumers.values().stream().filter(consumer -> {
+            String consumerTopicName = consumer.getTopic();
+            if (DestinationName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
+                return true;
+            } else {
+                return false;
+            }
+        }).forEach(consumer -> {
+            if (consumer != null) {
+                consumer.unsubscribeAsync().handle((unsubscribed, ex) -> {
+                    if (ex != null) {
+                        unsubscribeFail.compareAndSet(null, ex);
+                    } else {
+                        successConsumers.add(consumer);
+                    }
+
+                    if (completed.decrementAndGet() == 0) {
+                        if (unsubscribeFail.get() == null) {
+                            successConsumers.forEach(consumer1 -> {
+                                consumers.remove(consumer1.getTopic());
+                                pausedConsumers.remove(consumer1);
+                                numberTopicPartitions.decrementAndGet();
+                            });
+
+                            topics.remove(topicName);
+                            unAckedMessageTracker.removeTopicMessages(topicName);
+
+                            unsubscribeFuture.complete(null);
+                            log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, numberTopicPartitions: {}",
+                                topicName, subscription, consumerName, numberTopicPartitions);
+                        } else {
+                            unsubscribeFuture.completeExceptionally(unsubscribeFail.get());
+                            setState(State.Failed);
+                            log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer",
+                                topicName, subscription, consumerName, unsubscribeFail.get().getCause());
+                        }
+                    }
+
+                    return null;
+                });
+            }
+        });
+
+        return unsubscribeFuture;
+    }
+
+    // get topics name
+    public List<String> getTopics() {
+        return topics.keySet().stream().collect(Collectors.toList());
+    }
+
+    // get partitioned topics name
+    public List<String> getPartitionedTopics() {
+        return consumers.keySet().stream().collect(Collectors.toList());
+    }
+
+    // get partitioned consumers
+    public List<ConsumerImpl> getConsumers() {
+        return consumers.values().stream().collect(Collectors.toList());
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(TopicsConsumerImpl.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
index 796abf65f..10bb8b89c 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/UnAckedMessageTracker.java
@@ -18,26 +18,24 @@
  */
 package org.apache.pulsar.client.impl;
 
+import io.netty.util.Timeout;
+import io.netty.util.TimerTask;
 import java.io.Closeable;
-import java.util.ArrayList;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-
+import java.util.function.Predicate;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import io.netty.util.Timeout;
-import io.netty.util.TimerTask;
-
 public class UnAckedMessageTracker implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(UnAckedMessageTracker.class);
-    private ConcurrentOpenHashSet<MessageIdImpl> currentSet;
-    private ConcurrentOpenHashSet<MessageIdImpl> oldOpenSet;
+    private ConcurrentOpenHashSet<MessageId> currentSet;
+    private ConcurrentOpenHashSet<MessageId> oldOpenSet;
     private final ReentrantReadWriteLock readWriteLock;
     private final Lock readLock;
     private final Lock writeLock;
@@ -51,17 +49,17 @@ public void clear() {
         }
 
         @Override
-        public boolean add(MessageIdImpl m) {
+        public boolean add(MessageId m) {
             return true;
         }
 
         @Override
-        public boolean remove(MessageIdImpl m) {
+        public boolean remove(MessageId m) {
             return true;
         }
 
         @Override
-        public int removeMessagesTill(MessageIdImpl msgId) {
+        public int removeMessagesTill(MessageId msgId) {
             return 0;
         }
 
@@ -77,8 +75,8 @@ public UnAckedMessageTracker() {
     }
 
     public UnAckedMessageTracker(PulsarClientImpl client, ConsumerBase consumerBase, long ackTimeoutMillis) {
-        currentSet = new ConcurrentOpenHashSet<MessageIdImpl>();
-        oldOpenSet = new ConcurrentOpenHashSet<MessageIdImpl>();
+        currentSet = new ConcurrentOpenHashSet<MessageId>();
+        oldOpenSet = new ConcurrentOpenHashSet<MessageId>();
         readWriteLock = new ReentrantReadWriteLock();
         readLock = readWriteLock.readLock();
         writeLock = readWriteLock.writeLock();
@@ -92,7 +90,7 @@ public void start(PulsarClientImpl client, ConsumerBase consumerBase, long ackTi
             public void run(Timeout t) throws Exception {
                 if (isAckTimeout()) {
                     log.warn("[{}] {} messages have timed-out", consumerBase, oldOpenSet.size());
-                    Set<MessageIdImpl> messageIds = new HashSet<>();
+                    Set<MessageId> messageIds = new HashSet<>();
                     oldOpenSet.forEach(messageIds::add);
                     oldOpenSet.clear();
                     consumerBase.redeliverUnacknowledgedMessages(messageIds);
@@ -106,7 +104,7 @@ public void run(Timeout t) throws Exception {
     void toggle() {
         writeLock.lock();
         try {
-            ConcurrentOpenHashSet<MessageIdImpl> temp = currentSet;
+            ConcurrentOpenHashSet<MessageId> temp = currentSet;
             currentSet = oldOpenSet;
             oldOpenSet = temp;
         } finally {
@@ -124,7 +122,7 @@ public void clear() {
         }
     }
 
-    public boolean add(MessageIdImpl m) {
+    public boolean add(MessageId m) {
         readLock.lock();
         try {
             oldOpenSet.remove(m);
@@ -144,7 +142,7 @@ boolean isEmpty() {
         }
     }
 
-    public boolean remove(MessageIdImpl m) {
+    public boolean remove(MessageId m) {
         readLock.lock();
         try {
             return currentSet.remove(m) || oldOpenSet.remove(m);
@@ -171,15 +169,36 @@ private boolean isAckTimeout() {
         }
     }
 
-    public int removeMessagesTill(MessageIdImpl msgId) {
+    public int removeMessagesTill(MessageId msgId) {
+        readLock.lock();
+        try {
+            int currentSetRemovedMsgCount = currentSet.removeIf(m -> (m.compareTo(msgId) <= 0));
+            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> (m.compareTo(msgId) <= 0));
+
+            return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public int removeTopicMessages(String topicName) {
         readLock.lock();
         try {
-            int currentSetRemovedMsgCount = currentSet.removeIf(m -> ((m.getLedgerId() < msgId.getLedgerId()
-                    || (m.getLedgerId() == msgId.getLedgerId() && m.getEntryId() <= msgId.getEntryId()))
-                    && m.getPartitionIndex() == msgId.getPartitionIndex()));
-            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> ((m.getLedgerId() < msgId.getLedgerId()
-                    || (m.getLedgerId() == msgId.getLedgerId() && m.getEntryId() <= msgId.getEntryId()))
-                    && m.getPartitionIndex() == msgId.getPartitionIndex()));
+            int currentSetRemovedMsgCount = currentSet.removeIf(m -> {
+                if (m instanceof TopicMessageIdImpl) {
+                    return m.getTopicName().contains(topicName);
+                } else {
+                    return false;
+                }
+            });
+            int oldSetRemovedMsgCount = oldOpenSet.removeIf(m -> {
+                if (m instanceof TopicMessageIdImpl) {
+                    return m.getTopicName().contains(topicName);
+                } else {
+                    return false;
+                }
+            });
+
             return currentSetRemovedMsgCount + oldSetRemovedMsgCount;
         } finally {
             readLock.unlock();
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index 7d847ee92..52b850d14 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -40,6 +40,8 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -595,6 +597,37 @@ public static ByteBuf newConsumerStatsResponse(CommandConsumerStatsResponse.Buil
         return res;
     }
 
+    public static ByteBuf newGetTopicsOfNamespaceRequest(String property,
+                                                         String cluster,
+                                                         String localName,
+                                                         long requestId) {
+        CommandGetTopicsOfNamespace.Builder topicsBuilder = CommandGetTopicsOfNamespace.newBuilder();
+        topicsBuilder.setProperty(property).setCluster(cluster).setLocalName(localName).setRequestId(requestId);
+
+        CommandGetTopicsOfNamespace topicsCommand = topicsBuilder.build();
+        ByteBuf res = serializeWithSize(
+            BaseCommand.newBuilder().setType(Type.GET_TOPICS_OF_NAMESPACE).setGetTopicsOfNamespace(topicsCommand));
+        topicsBuilder.recycle();
+        topicsCommand.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetTopicsOfNamespaceResponse(List<String> topics, long requestId) {
+        CommandGetTopicsOfNamespaceResponse.Builder topicsResponseBuilder =
+            CommandGetTopicsOfNamespaceResponse.newBuilder();
+
+        topicsResponseBuilder.setRequestId(requestId).addAllTopics(topics);
+
+        CommandGetTopicsOfNamespaceResponse topicsOfNamespaceResponse = topicsResponseBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_TOPICS_OF_NAMESPACE_RESPONSE)
+            .setGetTopicsOfNamespaceResponse(topicsOfNamespaceResponse));
+
+        topicsResponseBuilder.recycle();
+        topicsOfNamespaceResponse.recycle();
+        return res;
+    }
+
     private final static ByteBuf cmdPing;
 
     static {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index e9e113a57..1996cfc4f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -30,6 +30,8 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
@@ -253,6 +255,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
                 handleReachedEndOfTopic(cmd.getReachedEndOfTopic());
                 cmd.getReachedEndOfTopic().recycle();
                 break;
+
+            case GET_TOPICS_OF_NAMESPACE:
+                checkArgument(cmd.hasGetTopicsOfNamespace());
+                handleGetTopicsOfNamespace(cmd.getGetTopicsOfNamespace());
+                cmd.getGetTopicsOfNamespace().recycle();
+                break;
+
+            case GET_TOPICS_OF_NAMESPACE_RESPONSE:
+                checkArgument(cmd.hasGetTopicsOfNamespaceResponse());
+                handleGetTopicsOfNamespaceSuccess(cmd.getGetTopicsOfNamespaceResponse());
+                cmd.getGetTopicsOfNamespaceResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -377,5 +391,13 @@ protected void handleReachedEndOfTopic(CommandReachedEndOfTopic commandReachedEn
         throw new UnsupportedOperationException();
     }
 
+    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse response) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index eef365188..aa9de818a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -19437,6 +19437,1083 @@ public Builder clearMsgBacklog() {
     // @@protoc_insertion_point(class_scope:pulsar.proto.CommandConsumerStatsResponse)
   }
   
+  public interface CommandGetTopicsOfNamespaceOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // required string property = 2;
+    boolean hasProperty();
+    String getProperty();
+    
+    // required string cluster = 3;
+    boolean hasCluster();
+    String getCluster();
+    
+    // required string local_name = 4;
+    boolean hasLocalName();
+    String getLocalName();
+  }
+  public static final class CommandGetTopicsOfNamespace extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetTopicsOfNamespaceOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use CommandGetTopicsOfNamespace.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespace> handle;
+    private CommandGetTopicsOfNamespace(io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespace> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetTopicsOfNamespace> RECYCLER = new io.netty.util.Recycler<CommandGetTopicsOfNamespace>() {
+            protected CommandGetTopicsOfNamespace newObject(Handle<CommandGetTopicsOfNamespace> handle) {
+              return new CommandGetTopicsOfNamespace(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetTopicsOfNamespace(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetTopicsOfNamespace defaultInstance;
+    public static CommandGetTopicsOfNamespace getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetTopicsOfNamespace getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // required string property = 2;
+    public static final int PROPERTY_FIELD_NUMBER = 2;
+    private java.lang.Object property_;
+    public boolean hasProperty() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public String getProperty() {
+      java.lang.Object ref = property_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          property_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getPropertyBytes() {
+      java.lang.Object ref = property_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        property_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required string cluster = 3;
+    public static final int CLUSTER_FIELD_NUMBER = 3;
+    private java.lang.Object cluster_;
+    public boolean hasCluster() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    public String getCluster() {
+      java.lang.Object ref = cluster_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          cluster_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getClusterBytes() {
+      java.lang.Object ref = cluster_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        cluster_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    // required string local_name = 4;
+    public static final int LOCAL_NAME_FIELD_NUMBER = 4;
+    private java.lang.Object localName_;
+    public boolean hasLocalName() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public String getLocalName() {
+      java.lang.Object ref = localName_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          localName_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getLocalNameBytes() {
+      java.lang.Object ref = localName_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        localName_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      property_ = "";
+      cluster_ = "";
+      localName_ = "";
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasProperty()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasCluster()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasLocalName()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getPropertyBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(3, getClusterBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeBytes(4, getLocalNameBytes());
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getPropertyBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getClusterBytes());
+      }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getLocalNameBytes());
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        property_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        cluster_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        localName_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace result = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.property_ = property_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.cluster_ = cluster_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.localName_ = localName_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance()) return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (other.hasProperty()) {
+          setProperty(other.getProperty());
+        }
+        if (other.hasCluster()) {
+          setCluster(other.getCluster());
+        }
+        if (other.hasLocalName()) {
+          setLocalName(other.getLocalName());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (!hasProperty()) {
+          
+          return false;
+        }
+        if (!hasCluster()) {
+          
+          return false;
+        }
+        if (!hasLocalName()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              property_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000004;
+              cluster_ = input.readBytes();
+              break;
+            }
+            case 34: {
+              bitField0_ |= 0x00000008;
+              localName_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // required string property = 2;
+      private java.lang.Object property_ = "";
+      public boolean hasProperty() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getProperty() {
+        java.lang.Object ref = property_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          property_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setProperty(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        property_ = value;
+        
+        return this;
+      }
+      public Builder clearProperty() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        property_ = getDefaultInstance().getProperty();
+        
+        return this;
+      }
+      void setProperty(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000002;
+        property_ = value;
+        
+      }
+      
+      // required string cluster = 3;
+      private java.lang.Object cluster_ = "";
+      public boolean hasCluster() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public String getCluster() {
+        java.lang.Object ref = cluster_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          cluster_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setCluster(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        cluster_ = value;
+        
+        return this;
+      }
+      public Builder clearCluster() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        cluster_ = getDefaultInstance().getCluster();
+        
+        return this;
+      }
+      void setCluster(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000004;
+        cluster_ = value;
+        
+      }
+      
+      // required string local_name = 4;
+      private java.lang.Object localName_ = "";
+      public boolean hasLocalName() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public String getLocalName() {
+        java.lang.Object ref = localName_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          localName_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setLocalName(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        localName_ = value;
+        
+        return this;
+      }
+      public Builder clearLocalName() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        localName_ = getDefaultInstance().getLocalName();
+        
+        return this;
+      }
+      void setLocalName(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000008;
+        localName_ = value;
+        
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetTopicsOfNamespace)
+    }
+    
+    static {
+      defaultInstance = new CommandGetTopicsOfNamespace(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetTopicsOfNamespace)
+  }
+  
+  public interface CommandGetTopicsOfNamespaceResponseOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // repeated string topics = 2;
+    java.util.List<String> getTopicsList();
+    int getTopicsCount();
+    String getTopics(int index);
+  }
+  public static final class CommandGetTopicsOfNamespaceResponse extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetTopicsOfNamespaceResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use CommandGetTopicsOfNamespaceResponse.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespaceResponse> handle;
+    private CommandGetTopicsOfNamespaceResponse(io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespaceResponse> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetTopicsOfNamespaceResponse> RECYCLER = new io.netty.util.Recycler<CommandGetTopicsOfNamespaceResponse>() {
+            protected CommandGetTopicsOfNamespaceResponse newObject(Handle<CommandGetTopicsOfNamespaceResponse> handle) {
+              return new CommandGetTopicsOfNamespaceResponse(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetTopicsOfNamespaceResponse(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetTopicsOfNamespaceResponse defaultInstance;
+    public static CommandGetTopicsOfNamespaceResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetTopicsOfNamespaceResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // repeated string topics = 2;
+    public static final int TOPICS_FIELD_NUMBER = 2;
+    private com.google.protobuf.LazyStringList topics_;
+    public java.util.List<String>
+        getTopicsList() {
+      return topics_;
+    }
+    public int getTopicsCount() {
+      return topics_.size();
+    }
+    public String getTopics(int index) {
+      return topics_.get(index);
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      for (int i = 0; i < topics_.size(); i++) {
+        output.writeBytes(2, topics_.getByteString(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < topics_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(topics_.getByteString(i));
+        }
+        size += dataSize;
+        size += 1 * getTopicsList().size();
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse result = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          topics_ = new com.google.protobuf.UnmodifiableLazyStringList(
+              topics_);
+          bitField0_ = (bitField0_ & ~0x00000002);
+        }
+        result.topics_ = topics_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance()) return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (!other.topics_.isEmpty()) {
+          if (topics_.isEmpty()) {
+            topics_ = other.topics_;
+            bitField0_ = (bitField0_ & ~0x00000002);
+          } else {
+            ensureTopicsIsMutable();
+            topics_.addAll(other.topics_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 18: {
+              ensureTopicsIsMutable();
+              topics_.add(input.readBytes());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // repeated string topics = 2;
+      private com.google.protobuf.LazyStringList topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      private void ensureTopicsIsMutable() {
+        if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+          topics_ = new com.google.protobuf.LazyStringArrayList(topics_);
+          bitField0_ |= 0x00000002;
+         }
+      }
+      public java.util.List<String>
+          getTopicsList() {
+        return java.util.Collections.unmodifiableList(topics_);
+      }
+      public int getTopicsCount() {
+        return topics_.size();
+      }
+      public String getTopics(int index) {
+        return topics_.get(index);
+      }
+      public Builder setTopics(
+          int index, String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureTopicsIsMutable();
+        topics_.set(index, value);
+        
+        return this;
+      }
+      public Builder addTopics(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureTopicsIsMutable();
+        topics_.add(value);
+        
+        return this;
+      }
+      public Builder addAllTopics(
+          java.lang.Iterable<String> values) {
+        ensureTopicsIsMutable();
+        super.addAll(values, topics_);
+        
+        return this;
+      }
+      public Builder clearTopics() {
+        topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        
+        return this;
+      }
+      void addTopics(com.google.protobuf.ByteString value) {
+        ensureTopicsIsMutable();
+        topics_.add(value);
+        
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetTopicsOfNamespaceResponse)
+    }
+    
+    static {
+      defaultInstance = new CommandGetTopicsOfNamespaceResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetTopicsOfNamespaceResponse)
+  }
+  
   public interface BaseCommandOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -19551,6 +20628,14 @@ public Builder clearMsgBacklog() {
     // optional .pulsar.proto.CommandSeek seek = 28;
     boolean hasSeek();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek getSeek();
+    
+    // optional .pulsar.proto.CommandGetTopicsOfNamespace getTopicsOfNamespace = 29;
+    boolean hasGetTopicsOfNamespace();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getGetTopicsOfNamespace();
+    
+    // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 30;
+    boolean hasGetTopicsOfNamespaceResponse();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getGetTopicsOfNamespaceResponse();
   }
   public static final class BaseCommand extends
       com.google.protobuf.GeneratedMessageLite
@@ -19617,6 +20702,8 @@ public BaseCommand getDefaultInstanceForType() {
       CONSUMER_STATS_RESPONSE(24, 26),
       REACHED_END_OF_TOPIC(25, 27),
       SEEK(26, 28),
+      GET_TOPICS_OF_NAMESPACE(27, 29),
+      GET_TOPICS_OF_NAMESPACE_RESPONSE(28, 30),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -19646,6 +20733,8 @@ public BaseCommand getDefaultInstanceForType() {
       public static final int CONSUMER_STATS_RESPONSE_VALUE = 26;
       public static final int REACHED_END_OF_TOPIC_VALUE = 27;
       public static final int SEEK_VALUE = 28;
+      public static final int GET_TOPICS_OF_NAMESPACE_VALUE = 29;
+      public static final int GET_TOPICS_OF_NAMESPACE_RESPONSE_VALUE = 30;
       
       
       public final int getNumber() { return value; }
@@ -19679,6 +20768,8 @@ public static Type valueOf(int value) {
           case 26: return CONSUMER_STATS_RESPONSE;
           case 27: return REACHED_END_OF_TOPIC;
           case 28: return SEEK;
+          case 29: return GET_TOPICS_OF_NAMESPACE;
+          case 30: return GET_TOPICS_OF_NAMESPACE_RESPONSE;
           default: return null;
         }
       }
@@ -19985,6 +21076,26 @@ public boolean hasSeek() {
       return seek_;
     }
     
+    // optional .pulsar.proto.CommandGetTopicsOfNamespace getTopicsOfNamespace = 29;
+    public static final int GETTOPICSOFNAMESPACE_FIELD_NUMBER = 29;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getTopicsOfNamespace_;
+    public boolean hasGetTopicsOfNamespace() {
+      return ((bitField0_ & 0x10000000) == 0x10000000);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getGetTopicsOfNamespace() {
+      return getTopicsOfNamespace_;
+    }
+    
+    // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 30;
+    public static final int GETTOPICSOFNAMESPACERESPONSE_FIELD_NUMBER = 30;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse_;
+    public boolean hasGetTopicsOfNamespaceResponse() {
+      return ((bitField0_ & 0x20000000) == 0x20000000);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getGetTopicsOfNamespaceResponse() {
+      return getTopicsOfNamespaceResponse_;
+    }
+    
     private void initFields() {
       type_ = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -20014,6 +21125,8 @@ private void initFields() {
       consumerStatsResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse.getDefaultInstance();
       reachedEndOfTopic_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandReachedEndOfTopic.getDefaultInstance();
       seek_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
+      getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+      getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -20174,6 +21287,18 @@ public final boolean isInitialized() {
           return false;
         }
       }
+      if (hasGetTopicsOfNamespace()) {
+        if (!getGetTopicsOfNamespace().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasGetTopicsOfNamespaceResponse()) {
+        if (!getGetTopicsOfNamespaceResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -20270,6 +21395,12 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x08000000) == 0x08000000)) {
         output.writeMessage(28, seek_);
       }
+      if (((bitField0_ & 0x10000000) == 0x10000000)) {
+        output.writeMessage(29, getTopicsOfNamespace_);
+      }
+      if (((bitField0_ & 0x20000000) == 0x20000000)) {
+        output.writeMessage(30, getTopicsOfNamespaceResponse_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -20390,6 +21521,14 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(28, seek_);
       }
+      if (((bitField0_ & 0x10000000) == 0x10000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(29, getTopicsOfNamespace_);
+      }
+      if (((bitField0_ & 0x20000000) == 0x20000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(30, getTopicsOfNamespaceResponse_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -20559,6 +21698,10 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x04000000);
         seek_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSeek.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x08000000);
+        getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x10000000);
+        getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x20000000);
         return this;
       }
       
@@ -20704,6 +21847,14 @@ public Builder clone() {
           to_bitField0_ |= 0x08000000;
         }
         result.seek_ = seek_;
+        if (((from_bitField0_ & 0x10000000) == 0x10000000)) {
+          to_bitField0_ |= 0x10000000;
+        }
+        result.getTopicsOfNamespace_ = getTopicsOfNamespace_;
+        if (((from_bitField0_ & 0x20000000) == 0x20000000)) {
+          to_bitField0_ |= 0x20000000;
+        }
+        result.getTopicsOfNamespaceResponse_ = getTopicsOfNamespaceResponse_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -20794,6 +21945,12 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BaseComman
         if (other.hasSeek()) {
           mergeSeek(other.getSeek());
         }
+        if (other.hasGetTopicsOfNamespace()) {
+          mergeGetTopicsOfNamespace(other.getGetTopicsOfNamespace());
+        }
+        if (other.hasGetTopicsOfNamespaceResponse()) {
+          mergeGetTopicsOfNamespaceResponse(other.getGetTopicsOfNamespaceResponse());
+        }
         return this;
       }
       
@@ -20952,6 +22109,18 @@ public final boolean isInitialized() {
             return false;
           }
         }
+        if (hasGetTopicsOfNamespace()) {
+          if (!getGetTopicsOfNamespace().isInitialized()) {
+            
+            return false;
+          }
+        }
+        if (hasGetTopicsOfNamespaceResponse()) {
+          if (!getGetTopicsOfNamespaceResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -21256,6 +22425,26 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 234: {
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.newBuilder();
+              if (hasGetTopicsOfNamespace()) {
+                subBuilder.mergeFrom(getGetTopicsOfNamespace());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetTopicsOfNamespace(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 242: {
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.newBuilder();
+              if (hasGetTopicsOfNamespaceResponse()) {
+                subBuilder.mergeFrom(getGetTopicsOfNamespaceResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetTopicsOfNamespaceResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
@@ -22447,6 +23636,92 @@ public Builder clearSeek() {
         return this;
       }
       
+      // optional .pulsar.proto.CommandGetTopicsOfNamespace getTopicsOfNamespace = 29;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+      public boolean hasGetTopicsOfNamespace() {
+        return ((bitField0_ & 0x10000000) == 0x10000000);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getGetTopicsOfNamespace() {
+        return getTopicsOfNamespace_;
+      }
+      public Builder setGetTopicsOfNamespace(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getTopicsOfNamespace_ = value;
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder setGetTopicsOfNamespace(
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Builder builderForValue) {
+        getTopicsOfNamespace_ = builderForValue.build();
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder mergeGetTopicsOfNamespace(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace value) {
+        if (((bitField0_ & 0x10000000) == 0x10000000) &&
+            getTopicsOfNamespace_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance()) {
+          getTopicsOfNamespace_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.newBuilder(getTopicsOfNamespace_).mergeFrom(value).buildPartial();
+        } else {
+          getTopicsOfNamespace_ = value;
+        }
+        
+        bitField0_ |= 0x10000000;
+        return this;
+      }
+      public Builder clearGetTopicsOfNamespace() {
+        getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x10000000);
+        return this;
+      }
+      
+      // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 30;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+      public boolean hasGetTopicsOfNamespaceResponse() {
+        return ((bitField0_ & 0x20000000) == 0x20000000);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getGetTopicsOfNamespaceResponse() {
+        return getTopicsOfNamespaceResponse_;
+      }
+      public Builder setGetTopicsOfNamespaceResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getTopicsOfNamespaceResponse_ = value;
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder setGetTopicsOfNamespaceResponse(
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.Builder builderForValue) {
+        getTopicsOfNamespaceResponse_ = builderForValue.build();
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder mergeGetTopicsOfNamespaceResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse value) {
+        if (((bitField0_ & 0x20000000) == 0x20000000) &&
+            getTopicsOfNamespaceResponse_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance()) {
+          getTopicsOfNamespaceResponse_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.newBuilder(getTopicsOfNamespaceResponse_).mergeFrom(value).buildPartial();
+        } else {
+          getTopicsOfNamespaceResponse_ = value;
+        }
+        
+        bitField0_ |= 0x20000000;
+        return this;
+      }
+      public Builder clearGetTopicsOfNamespaceResponse() {
+        getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x20000000);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
index a179f9af5..f196f4995 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
@@ -38,7 +38,7 @@
     private String property;
     private String cluster;
     private String localName;
-    
+
     private static final LoadingCache<String, NamespaceName> cache = CacheBuilder.newBuilder().maximumSize(100000)
             .expireAfterAccess(30, TimeUnit.MINUTES).build(new CacheLoader<String, NamespaceName>() {
                 @Override
@@ -189,4 +189,16 @@ public boolean includes(DestinationName dn) {
         return this.equals(dn.getNamespaceObject());
     }
 
+    /**
+     * Get a string suitable for namespace lookup
+     * <p>
+     * Example:
+     * <p>
+     * //property/cluster/namespace -> persistent/property/cluster/namespace
+     *
+     * @return
+     */
+    public String getLookupName() {
+        return String.format("%s/%s/%s", property, cluster, localName);
+    }
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index faa894f8c..cc72b34fe 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -312,7 +312,7 @@ message CommandUnsubscribe {
 message CommandSeek {
 	required uint64 consumer_id = 1;
 	required uint64 request_id  = 2;
-	
+
 	optional MessageIdData message_id = 3;
 }
 
@@ -346,7 +346,7 @@ message CommandSuccess {
 message CommandProducerSuccess {
 	required uint64 request_id    = 1;
 	required string producer_name = 2;
-	
+
 	// The last sequence id that was stored by this producer in the previous session
 	// This will only be meaningful if deduplication has been enabled.
 	optional int64  last_sequence_id = 3 [default = -1];
@@ -415,6 +415,18 @@ message CommandConsumerStatsResponse {
         optional uint64 msgBacklog                  = 15;
 }
 
+message CommandGetTopicsOfNamespace {
+	required uint64 request_id	= 1;
+	required string property    = 2;
+	required string cluster     = 3;
+	required string local_name  = 4;
+}
+
+message CommandGetTopicsOfNamespaceResponse {
+	required uint64 request_id	= 1;
+	repeated string topics 		= 2;
+}
+
 message BaseCommand {
 	enum Type {
 		CONNECT     = 2;
@@ -458,6 +470,9 @@ message BaseCommand {
 		REACHED_END_OF_TOPIC = 27;
 
 		SEEK = 28;
+
+		GET_TOPICS_OF_NAMESPACE 			= 29;
+		GET_TOPICS_OF_NAMESPACE_RESPONSE 	= 30;
 	}
 
 	required Type type = 1;
@@ -496,6 +511,9 @@ message BaseCommand {
 	optional CommandConsumerStatsResponse consumerStatsResponse         = 26;
 
 	optional CommandReachedEndOfTopic reachedEndOfTopic  = 27;
-	
+
 	optional CommandSeek seek = 28;
+
+	optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 29;
+	optional CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 30;
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services