You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/02/27 06:00:29 UTC

[GitHub] merlimat closed pull request #1279: PIP-13-2/3: support regex based subscription

merlimat closed pull request #1279: PIP-13-2/3: support regex based subscription
URL: https://github.com/apache/incubator-pulsar/pull/1279
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 935cb1fad..a386ece9a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -26,6 +26,7 @@
 import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;
 
 import java.net.SocketAddress;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -41,6 +42,7 @@
 import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
 import org.apache.pulsar.broker.web.RestException;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -58,6 +60,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadata;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandProducer;
@@ -73,6 +76,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.ServerError;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.naming.Metadata;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.ConsumerStats;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
@@ -1010,7 +1014,7 @@ protected void handleSeek(CommandSeek seek) {
             MessageIdData msgIdData = seek.getMessageId();
 
             Position position = new PositionImpl(msgIdData.getLedgerId(), msgIdData.getEntryId());
-            
+
 
             subscription.resetCursor(position).thenRun(() -> {
                 log.info("[{}] [{}][{}] Reset subscription to message id {}", remoteAddress,
@@ -1142,6 +1146,32 @@ protected void handleGetLastMessageId(CommandGetLastMessageId getLastMessageId)
         }
     }
 
+    @Override
+    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
+        final long requestId = commandGetTopicsOfNamespace.getRequestId();
+        final String namespace = commandGetTopicsOfNamespace.getNamespace();
+
+        try {
+            List<String> topics = getBrokerService().pulsar()
+                .getNamespaceService()
+                .getListOfTopics(NamespaceName.get(namespace));
+
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Received CommandGetTopicsOfNamespace for namespace [//{}] by {}, size:{}",
+                    remoteAddress, namespace, requestId, topics.size());
+            }
+
+            ctx.writeAndFlush(Commands.newGetTopicsOfNamespaceResponse(topics, requestId));
+        } catch (Exception e) {
+            log.warn("[{]] Error GetTopicsOfNamespace for namespace [//{}] by {}",
+                remoteAddress, namespace, requestId);
+            ctx.writeAndFlush(
+                Commands.newError(requestId,
+                    BrokerServiceException.getClientErrorCode(new ServerMetadataException(e)),
+                    e.getMessage()));
+        }
+    }
+
     @Override
     protected boolean isHandshakeCompleted() {
         return state == State.Connected;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index 092dc3617..2083cc786 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -31,7 +31,7 @@
 import org.apache.pulsar.broker.service.Consumer;
 import org.apache.pulsar.broker.service.Subscription;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 
 public final class NonPersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer implements NonPersistentDispatcher {
@@ -70,7 +70,7 @@ protected boolean isConsumersExceededOnTopic() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -88,7 +88,7 @@ protected boolean isConsumersExceededOnSubscription() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index d8510df7e..7c06cb3ea 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -268,7 +268,7 @@ private boolean isProducersExceeded() {
         Policies policies;
         try {
             policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -591,7 +591,7 @@ protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic
         }
         return isReplicatorStarted.get();
     }
-    
+
     CompletableFuture<Void> removeReplicator(String remoteCluster) {
         log.info("[{}] Removing replicator to {}", topic, remoteCluster);
         final CompletableFuture<Void> future = new CompletableFuture<>();
@@ -976,7 +976,7 @@ public void markBatchMessagePublished() {
         this.hasBatchMessagePublished = true;
     }
 
-    
-    
+
+
     private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 2e59811b8..8b32d54aa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -46,7 +46,7 @@
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.apache.pulsar.common.util.collections.ConcurrentLongPairSet;
@@ -139,7 +139,7 @@ private boolean isConsumersExceededOnTopic() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -157,7 +157,7 @@ private boolean isConsumersExceededOnSubscription() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic.getName()).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index bd197e84c..8d42a319f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -42,7 +42,7 @@
 import org.apache.pulsar.broker.service.Dispatcher;
 import org.apache.pulsar.client.impl.Backoff;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
-import org.apache.pulsar.common.naming.DestinationName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.util.Codec;
 import org.slf4j.Logger;
@@ -53,7 +53,7 @@
     private final PersistentTopic topic;
     private final ManagedCursor cursor;
     private final String name;
-    
+
     private boolean havePendingRead = false;
 
     private static final int MaxReadBatchSize = 100;
@@ -122,7 +122,7 @@ protected boolean isConsumersExceededOnTopic() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -140,7 +140,7 @@ protected boolean isConsumersExceededOnSubscription() {
         Policies policies;
         try {
             policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topicName).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topicName).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -201,7 +201,7 @@ public synchronized void readEntriesComplete(final List<Entry> entries, Object o
                 if (future.isSuccess()) {
                     // acquire message-dispatch permits for already delivered messages
                     if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                        topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);    
+                        topic.getDispatchRateLimiter().tryDispatchPermit(totalMessagesSent, totalBytesSent);
                     }
                     // Schedule a new read batch operation only after the previous batch has been written to the socket
                     synchronized (PersistentDispatcherSingleActiveConsumer.this) {
@@ -303,7 +303,7 @@ protected void readMoreEntries(Consumer consumer) {
             }
 
             int messagesToRead = Math.min(availablePermits, readBatchSize);
-            
+
             // throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
             // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
             // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 4d5d9746c..62017f0f6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -351,7 +351,7 @@ private boolean isProducersExceeded() {
         Policies policies;
         try {
             policies =  brokerService.pulsar().getConfigurationCache().policiesCache()
-                    .get(AdminResource.path(POLICIES, DestinationName.get(topic).getNamespace()))
+                    .get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
                     .orElseGet(() -> new Policies());
         } catch (Exception e) {
             policies = new Policies();
@@ -911,7 +911,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
                 String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
                 boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, cursor, localCluster);
                 if (isReplicatorStarted) {
-                    future.complete(null);    
+                    future.complete(null);
                 } else {
                     future.completeExceptionally(new NamingException(
                             PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index d0277ef73..df60d65d3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -405,7 +405,7 @@ public void testMaxProducersForNamespace() throws Exception {
         Policies policies = new Policies();
         policies.max_producers_per_topic = 2;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
         testMaxProducers();
     }
@@ -576,7 +576,7 @@ public void testMaxConsumersSharedForNamespace() throws Exception {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
 
         testMaxConsumersShared();
@@ -667,7 +667,7 @@ public void testMaxConsumersFailoverForNamespace() throws Exception {
         policies.max_consumers_per_subscription = 2;
         policies.max_consumers_per_topic = 3;
         when(pulsar.getConfigurationCache().policiesCache()
-                .get(AdminResource.path(POLICIES, DestinationName.get(successTopicName).getNamespace())))
+                .get(AdminResource.path(POLICIES, TopicName.get(successTopicName).getNamespace())))
                 .thenReturn(Optional.of(policies));
 
         testMaxConsumersFailover();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
new file mode 100644
index 000000000..26f3277b9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
+import java.util.stream.IntStream;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.ProducerConfiguration;
+import org.apache.pulsar.client.api.ProducerConfiguration.MessageRoutingMode;
+import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.PropertyAdmin;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
+    private static final long testTimeout = 90000; // 1.5 min
+    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImplTest.class);
+    private final long ackTimeOutMillis = TimeUnit.SECONDS.toMillis(2);
+
+    @Override
+    @BeforeMethod
+    public void setup() throws Exception {
+        // set isTcpLookup = true, to use BinaryProtoLookupService to get topics for a pattern.
+        isTcpLookup = true;
+        super.internalSetup();
+    }
+
+    @Override
+    @AfterMethod
+    public void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+    @Test(timeOut = testTimeout)
+    public void testPatternTopicsSubscribeWithBuilderFail() throws Exception {
+        String key = "PatternTopicsSubscribeWithBuilderFail";
+        final String subscriptionName = "my-ex-subscription-" + key;
+
+        final String topicName1 = "persistent://prop/use/ns-abc/topic-1-" + key;
+        final String topicName2 = "persistent://prop/use/ns-abc/topic-2-" + key;
+        final String topicName3 = "persistent://prop/use/ns-abc/topic-3-" + key;
+        List<String> topicNames = Lists.newArrayList(topicName1, topicName2, topicName3);
+        final String patternString = "persistent://prop/use/ns-abc/pattern-topic.*";
+        Pattern pattern = Pattern.compile(patternString);
+
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // test failing builder with pattern and topic should fail
+        try {
+            Consumer consumer1 = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                .topic(topicName1)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+            fail("subscribe1 with pattern and topic should fail.");
+        } catch (PulsarClientException e) {
+            // expected
+        }
+
+        // test failing builder with pattern and topics should fail
+        try {
+            Consumer consumer2 = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                .topics(topicNames)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+            fail("subscribe2 with pattern and topics should fail.");
+        } catch (PulsarClientException e) {
+            // expected
+        }
+
+        // test failing builder with pattern and patternString should fail
+        try {
+            Consumer consumer3 = pulsarClient.newConsumer()
+                .topicsPattern(pattern)
+                .topicsPattern(patternString)
+                .subscriptionName(subscriptionName)
+                .subscriptionType(SubscriptionType.Shared)
+                .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+                .subscribe();
+            fail("subscribe3 with pattern and patternString should fail.");
+        } catch (IllegalArgumentException e) {
+            // expected
+        }
+    }
+
+    // verify consumer create success, and works well.
+    @Test(timeOut = testTimeout)
+    public void testBinaryProtoToGetTopicsOfNamespace() throws Exception {
+        String key = "BinaryProtoToGetTopics";
+        String subscriptionName = "my-ex-subscription-" + key;
+        String topicName1 = "persistent://prop/use/ns-abc/pattern-topic-1-" + key;
+        String topicName2 = "persistent://prop/use/ns-abc/pattern-topic-2-" + key;
+        String topicName3 = "persistent://prop/use/ns-abc/pattern-topic-3-" + key;
+        Pattern pattern = Pattern.compile("persistent://prop/use/ns-abc/pattern-topic.*");
+
+        // 1. create partition
+        admin.properties().createProperty("prop", new PropertyAdmin());
+        admin.persistentTopics().createPartitionedTopic(topicName2, 2);
+        admin.persistentTopics().createPartitionedTopic(topicName3, 3);
+
+        // 2. create producer
+        ProducerConfiguration producerConfiguration = new ProducerConfiguration();
+        producerConfiguration.setMessageRoutingMode(MessageRoutingMode.RoundRobinPartition);
+        String messagePredicate = "my-message-" + key + "-";
+        int totalMessages = 30;
+
+        Producer producer1 = pulsarClient.newProducer().topic(topicName1)
+            .create();
+        Producer producer2 = pulsarClient.newProducer().topic(topicName2)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+        Producer producer3 = pulsarClient.newProducer().topic(topicName3)
+            .messageRoutingMode(org.apache.pulsar.client.api.MessageRoutingMode.RoundRobinPartition)
+            .create();
+
+        Consumer consumer = pulsarClient.newConsumer()
+            .topicsPattern(pattern)
+            .subscriptionName(subscriptionName)
+            .subscriptionType(SubscriptionType.Shared)
+            .ackTimeout(ackTimeOutMillis, TimeUnit.MILLISECONDS)
+            .receiverQueueSize(4)
+            .subscribe();
+
+        // 4. verify consumer get methods, to get right number of partitions and topics.
+        assertSame(pattern, ((PatternTopicsConsumerImpl) consumer).getPattern());
+        List<String> topics = ((PatternTopicsConsumerImpl) consumer).getPartitionedTopics();
+        List<ConsumerImpl> consumers = ((PatternTopicsConsumerImpl) consumer).getConsumers();
+
+        assertEquals(topics.size(), 6);
+        assertEquals(consumers.size(), 6);
+        assertEquals(((PatternTopicsConsumerImpl) consumer).getTopics().size(), 3);
+
+        topics.forEach(topic -> log.debug("topic: {}", topic));
+        consumers.forEach(c -> log.debug("consumer: {}", c.getTopic()));
+
+        IntStream.range(0, topics.size()).forEach(index ->
+            assertTrue(topics.get(index).equals(consumers.get(index).getTopic())));
+
+        ((PatternTopicsConsumerImpl) consumer).getTopics().forEach(topic -> log.debug("getTopics topic: {}", topic));
+
+        // 5. produce data
+        for (int i = 0; i < totalMessages / 3; i++) {
+            producer1.send((messagePredicate + "producer1-" + i).getBytes());
+            producer2.send((messagePredicate + "producer2-" + i).getBytes());
+            producer3.send((messagePredicate + "producer3-" + i).getBytes());
+        }
+
+        // 6. should receive all the message
+        int messageSet = 0;
+        Message message = consumer.receive();
+        do {
+            assertTrue(message instanceof TopicMessageImpl);
+            messageSet ++;
+            consumer.acknowledge(message);
+            log.debug("Consumer acknowledged : " + new String(message.getData()));
+            message = consumer.receive(500, TimeUnit.MILLISECONDS);
+        } while (message != null);
+        assertEquals(messageSet, totalMessages);
+
+        consumer.unsubscribe();
+        consumer.close();
+        producer1.close();
+        producer2.close();
+        producer3.close();
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
index be97ab696..cf633209d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java
@@ -23,6 +23,7 @@
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 /**
  * {@link ConsumerBuilder} is used to configure and create instances of {@link Consumer}.
@@ -80,7 +81,6 @@
     /**
      * Specify the topics this consumer will subscribe on.
      * <p>
-     * This argument is required when constructing the consumer.
      *
      * @param topicNames
      */
@@ -89,12 +89,30 @@
     /**
      * Specify a list of topics that this consumer will subscribe on.
      * <p>
-     * This argument is required when constructing the consumer.
      *
      * @param topicNames
      */
     ConsumerBuilder topics(List<String> topicNames);
 
+    /**
+     * Specify a pattern for topics that this consumer will subscribe on.
+     * <p>
+     *
+     * @param topicsPattern
+     */
+    ConsumerBuilder topicsPattern(Pattern topicsPattern);
+
+    /**
+     * Specify a pattern for topics that this consumer will subscribe on.
+     * It accepts regular expression and will be compiled into a pattern internally.
+     * Eg. "persistent://prop/use/ns-abc/pattern-topic-.*"
+     * <p>
+     *
+     * @param topicsPattern
+     *            given regular expression for topics pattern
+     */
+    ConsumerBuilder topicsPattern(String topicsPattern);
+
     /**
      * Specify the subscription name for this consumer.
      * <p>
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
index feb0ddcfa..6d8f67850 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
@@ -20,17 +20,24 @@
 
 import static java.lang.String.format;
 
+import com.google.common.collect.Lists;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.nio.channels.ClosedChannelException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.common.api.Commands;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,11 +49,13 @@
     private final PulsarClientImpl client;
     protected final InetSocketAddress serviceAddress;
     private final boolean useTls;
+    private final ExecutorService executor;
 
-    public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls)
+    public BinaryProtoLookupService(PulsarClientImpl client, String serviceUrl, boolean useTls, ExecutorService executor)
             throws PulsarClientException {
         this.client = client;
         this.useTls = useTls;
+        this.executor = executor;
         URI uri;
         try {
             uri = new URI(serviceUrl);
@@ -176,6 +185,66 @@ public String getServiceUrl() {
         return serviceAddress.toString();
     }
 
+    @Override
+    public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace) {
+        CompletableFuture<List<String>> topicsFuture = new CompletableFuture<List<String>>();
+
+        AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
+        Backoff backoff = new Backoff(100, TimeUnit.MILLISECONDS,
+            opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS,
+            0 , TimeUnit.MILLISECONDS);
+        getTopicsUnderNamespace(serviceAddress, namespace, backoff, opTimeoutMs, topicsFuture);
+        return topicsFuture;
+    }
+
+    private void getTopicsUnderNamespace(InetSocketAddress socketAddress,
+                                         NamespaceName namespace,
+                                         Backoff backoff,
+                                         AtomicLong remainingTime,
+                                         CompletableFuture<List<String>> topicsFuture) {
+        client.getCnxPool().getConnection(socketAddress).thenAccept(clientCnx -> {
+            long requestId = client.newRequestId();
+            ByteBuf request = Commands.newGetTopicsOfNamespaceRequest(
+                namespace.toString(), requestId);
+
+            clientCnx.newGetTopicsOfNamespace(request, requestId).thenAccept(topicsList -> {
+                if (log.isDebugEnabled()) {
+                    log.debug("[namespace: {}] Success get topics list in request: {}", namespace.toString(), requestId);
+                }
+
+                // do not keep partition part of topic name
+                List<String> result = Lists.newArrayList();
+                topicsList.forEach(topic -> {
+                    String filtered = TopicName.get(topic).getPartitionedTopicName();
+                    if (!result.contains(filtered)) {
+                        result.add(filtered);
+                    }
+                });
+
+                topicsFuture.complete(result);
+            }).exceptionally((e) -> {
+                topicsFuture.completeExceptionally(e);
+                return null;
+            });
+        }).exceptionally((e) -> {
+            long nextDelay = Math.min(backoff.next(), remainingTime.get());
+            if (nextDelay <= 0) {
+                topicsFuture.completeExceptionally(new PulsarClientException
+                    .TimeoutException("Could not getTopicsUnderNamespace within configured timeout."));
+                return null;
+            }
+
+            ((ScheduledExecutorService) executor).schedule(() -> {
+                log.warn("[namespace: {}] Could not get connection while getTopicsUnderNamespace -- Will try again in {} ms",
+                    namespace, nextDelay);
+                remainingTime.addAndGet(-nextDelay);
+                getTopicsUnderNamespace(socketAddress, namespace, backoff, remainingTime, topicsFuture);
+            }, nextDelay, TimeUnit.MILLISECONDS);
+            return null;
+        });
+    }
+
+
     @Override
     public void close() throws Exception {
         // no-op
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 7505addb0..6c2d758c7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -24,6 +24,7 @@
 import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.nio.channels.ClosedChannelException;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
@@ -47,6 +48,7 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConnected;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandPartitionedTopicMetadataResponse;
@@ -81,6 +83,9 @@
             16, 1);
     private final ConcurrentLongHashMap<CompletableFuture<MessageIdData>> pendingGetLastMessageIdRequests = new ConcurrentLongHashMap<>(
         16, 1);
+    private final ConcurrentLongHashMap<CompletableFuture<List<String>>> pendingGetTopicsRequests = new ConcurrentLongHashMap<>(
+        16, 1);
+
     private final ConcurrentLongHashMap<ProducerImpl> producers = new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl> consumers = new ConcurrentLongHashMap<>(16, 1);
 
@@ -164,6 +169,7 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
         pendingRequests.forEach((key, future) -> future.completeExceptionally(e));
         pendingLookupRequests.forEach((key, future) -> future.completeExceptionally(e));
         pendingGetLastMessageIdRequests.forEach((key, future) -> future.completeExceptionally(e));
+        pendingGetTopicsRequests.forEach((key, future) -> future.completeExceptionally(e));
 
         // Notify all attached producers/consumers so they have a chance to reconnect
         producers.forEach((id, producer) -> producer.connectionClosed(this));
@@ -513,6 +519,42 @@ protected boolean isHandshakeCompleted() {
         return future;
     }
 
+    public CompletableFuture<List<String>> newGetTopicsOfNamespace(ByteBuf request, long requestId) {
+        CompletableFuture<List<String>> future = new CompletableFuture<>();
+
+        pendingGetTopicsRequests.put(requestId, future);
+        ctx.writeAndFlush(request).addListener(writeFuture -> {
+            if (!writeFuture.isSuccess()) {
+                log.warn("{} Failed to send request {} to broker: {}", ctx.channel(), requestId,
+                    writeFuture.cause().getMessage());
+                pendingGetTopicsRequests.remove(requestId);
+                future.completeExceptionally(writeFuture.cause());
+            }
+        });
+
+        return future;
+    }
+
+    @Override
+    protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse success) {
+        checkArgument(state == State.Ready);
+
+        long requestId = success.getRequestId();
+        List<String> topics = success.getTopicsList();
+
+        if (log.isDebugEnabled()) {
+            log.debug("{} Received get topics of namespace success response from server: {} - topics.size: {}",
+                ctx.channel(), success.getRequestId(), topics.size());
+        }
+
+        CompletableFuture<List<String>> requestFuture = pendingGetTopicsRequests.remove(requestId);
+        if (requestFuture != null) {
+            requestFuture.complete(topics);
+        } else {
+            log.warn("{} Received unknown request id from server: {}", ctx.channel(), success.getRequestId());
+        }
+    }
+
     Promise<Void> newPromise() {
         return ctx.newPromise();
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
index 6cac9b177..edaeb5b27 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java
@@ -26,6 +26,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
@@ -80,7 +81,7 @@ public Consumer subscribe() throws PulsarClientException {
 
     @Override
     public CompletableFuture<Consumer> subscribeAsync() {
-        if (conf.getTopicNames().isEmpty()) {
+        if (conf.getTopicNames().isEmpty() && conf.getTopicsPattern() == null) {
             return FutureUtil
                     .failedFuture(new IllegalArgumentException("Topic name must be set on the consumer builder"));
         }
@@ -104,7 +105,20 @@ public ConsumerBuilder topic(String... topicNames) {
     public ConsumerBuilder topics(List<String> topicNames) {
         checkArgument(topicNames != null && !topicNames.isEmpty(), "Passed in topicNames list should not be empty.");
         conf.getTopicNames().addAll(topicNames);
+        return this;
+    }
 
+    @Override
+    public ConsumerBuilder topicsPattern(Pattern topicsPattern) {
+        checkArgument(conf.getTopicsPattern() == null, "Pattern has already been set.");
+        conf.setTopicsPattern(topicsPattern);
+        return this;
+    }
+
+    @Override
+    public ConsumerBuilder topicsPattern(String topicsPattern) {
+        checkArgument(conf.getTopicsPattern() == null, "Pattern has already been set.");
+        conf.setTopicsPattern(Pattern.compile(topicsPattern));
         return this;
     }
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 954f18977..a8ec61137 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -18,8 +18,11 @@
  */
 package org.apache.pulsar.client.impl;
 
+import com.google.common.collect.Lists;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
@@ -27,6 +30,7 @@
 import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
 import org.apache.pulsar.common.lookup.data.LookupData;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.slf4j.Logger;
@@ -88,6 +92,29 @@ public String getServiceUrl() {
     	return httpClient.url.toString();
     }
 
+    @Override
+    public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace) {
+        CompletableFuture<List<String>> future = new CompletableFuture<>();
+        httpClient
+            .get(String.format("admin/namespaces/%s/destinations", namespace), String[].class)
+            .thenAccept(topics -> {
+                List<String> result = Lists.newArrayList();
+                // do not keep partition part of topic name
+                Arrays.asList(topics).forEach(topic -> {
+                    String filtered = TopicName.get(topic).getPartitionedTopicName();
+                    if (!result.contains(filtered)) {
+                        result.add(filtered);
+                    }
+                });
+                future.complete(result);})
+            .exceptionally(ex -> {
+                log.warn("Failed to getTopicsUnderNamespace namespace: {}.", namespace, ex.getMessage());
+                future.completeExceptionally(ex);
+                return null;
+            });
+        return future;
+    }
+
     @Override
     public void close() throws Exception {
         httpClient.close();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
index bcdd758f4..aa00e54b3 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/LookupService.java
@@ -19,9 +19,11 @@
 package org.apache.pulsar.client.impl;
 
 import java.net.InetSocketAddress;
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 
@@ -62,4 +64,13 @@
 	 * @return
 	 */
 	public String getServiceUrl();
+
+	/**
+	 * Returns all the topics name for a given namespace.
+	 *
+	 * @param namespace : namespace-name
+	 * @return
+	 */
+	public CompletableFuture<List<String>> getTopicsUnderNamespace(NamespaceName namespace);
+
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
new file mode 100644
index 000000000..b09ad48af
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImpl.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.regex.Pattern;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PatternTopicsConsumerImpl extends TopicsConsumerImpl {
+    private final Pattern topicsPattern;
+
+    public PatternTopicsConsumerImpl(Pattern topicsPattern,
+                              PulsarClientImpl client,
+                              ConsumerConfigurationData conf,
+                              ExecutorService listenerExecutor,
+                              CompletableFuture<Consumer> subscribeFuture) {
+        super(client, conf, listenerExecutor, subscribeFuture);
+        this.topicsPattern = topicsPattern;
+    }
+
+    public Pattern getPattern() {
+        return this.topicsPattern;
+    }
+
+    private static final Logger log = LoggerFactory.getLogger(PatternTopicsConsumerImpl.class);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ea089a42a..a4b50ed12 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.client.impl;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static com.google.common.base.Preconditions.checkState;
 
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -30,6 +31,7 @@
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.api.ClientConfiguration;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerBuilder;
@@ -49,6 +51,7 @@
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.client.impl.conf.ReaderConfigurationData;
 import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.naming.TopicDomain;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
@@ -124,13 +127,13 @@ public PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup eventLoopGr
         this.conf = conf;
         conf.getAuthentication().start();
         this.cnxPool = cnxPool;
+        externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
         if (conf.getServiceUrl().startsWith("http")) {
             lookup = new HttpLookupService(conf, eventLoopGroup);
         } else {
-            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls());
+            lookup = new BinaryProtoLookupService(this, conf.getServiceUrl(), conf.isUseTls(), externalExecutorProvider.getExecutor());
         }
         timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
-        externalExecutorProvider = new ExecutorProvider(conf.getNumListenerThreads(), "pulsar-external-listener");
         producers = Maps.newIdentityHashMap();
         consumers = Maps.newIdentityHashMap();
         state.set(State.Open);
@@ -331,7 +334,14 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
                     "Active consumer listener is only supported for failover subscription"));
         }
 
-        if (conf.getTopicNames().size() == 1) {
+        if (conf.getTopicsPattern() != null) {
+            // If use topicsPattern, we should not use topic(), and topics() method.
+            if (!conf.getTopicNames().isEmpty()){
+                return FutureUtil
+                    .failedFuture(new IllegalArgumentException("Topic names list must be null when use topicsPattern"));
+            }
+            return patternTopicSubscribeAsync(conf);
+        } else if (conf.getTopicNames().size() == 1) {
             return singleTopicSubscribeAsysnc(conf);
         } else {
             return multiTopicSubscribeAsync(conf);
@@ -384,6 +394,41 @@ public Consumer subscribe(String topic, String subscription, ConsumerConfigurati
         return consumerSubscribedFuture;
     }
 
+    public CompletableFuture<Consumer> patternTopicSubscribeAsync(ConsumerConfigurationData conf) {
+        String regex = conf.getTopicsPattern().pattern();
+        TopicName destination = TopicName.get(regex);
+        NamespaceName namespaceName = destination.getNamespaceObject();
+
+        CompletableFuture<Consumer> consumerSubscribedFuture = new CompletableFuture<>();
+        lookup.getTopicsUnderNamespace(namespaceName)
+            .thenAccept(topics -> {
+                List<String> topicsList = topics.stream()
+                    .filter(topic -> {
+                        TopicName destinationName = TopicName.get(topic);
+                        checkState(destinationName.getNamespaceObject().equals(namespaceName));
+                        return conf.getTopicsPattern().matcher(destinationName.toString()).matches();
+                    })
+                    .collect(Collectors.toList());
+                conf.getTopicNames().addAll(topicsList);
+                ConsumerBase consumer = new PatternTopicsConsumerImpl(conf.getTopicsPattern(),
+                    PulsarClientImpl.this,
+                    conf,
+                    externalExecutorProvider.getExecutor(),
+                    consumerSubscribedFuture);
+
+                synchronized (consumers) {
+                    consumers.put(consumer, Boolean.TRUE);
+                }
+            })
+            .exceptionally(ex -> {
+                log.warn("[{}] Failed to get topics under namespace", namespaceName);
+                consumerSubscribedFuture.completeExceptionally(ex);
+                return null;
+            });
+
+        return consumerSubscribedFuture;
+    }
+
     @Override
     public Reader createReader(String topic, MessageId startMessageId, ReaderConfiguration conf)
             throws PulsarClientException {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
index 89ab5d71a..5acfe9dec 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java
@@ -25,6 +25,7 @@
 import java.util.SortedMap;
 import java.util.TreeMap;
 
+import java.util.regex.Pattern;
 import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
 import org.apache.pulsar.client.api.ConsumerEventListener;
 import org.apache.pulsar.client.api.CryptoKeyReader;
@@ -43,6 +44,8 @@
 
     private Set<String> topicNames = Sets.newTreeSet();
 
+    private Pattern topicsPattern;
+
     private String subscriptionName;
 
     private SubscriptionType subscriptionType = SubscriptionType.Exclusive;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
index a395233cc..6fdda729a 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/Commands.java
@@ -45,6 +45,8 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse.LookupType;
@@ -619,6 +621,34 @@ public static ByteBuf newConsumerStatsResponse(CommandConsumerStatsResponse.Buil
         return res;
     }
 
+    public static ByteBuf newGetTopicsOfNamespaceRequest(String namespace, long requestId) {
+        CommandGetTopicsOfNamespace.Builder topicsBuilder = CommandGetTopicsOfNamespace.newBuilder();
+        topicsBuilder.setNamespace(namespace).setRequestId(requestId);
+
+        CommandGetTopicsOfNamespace topicsCommand = topicsBuilder.build();
+        ByteBuf res = serializeWithSize(
+            BaseCommand.newBuilder().setType(Type.GET_TOPICS_OF_NAMESPACE).setGetTopicsOfNamespace(topicsCommand));
+        topicsBuilder.recycle();
+        topicsCommand.recycle();
+        return res;
+    }
+
+    public static ByteBuf newGetTopicsOfNamespaceResponse(List<String> topics, long requestId) {
+        CommandGetTopicsOfNamespaceResponse.Builder topicsResponseBuilder =
+            CommandGetTopicsOfNamespaceResponse.newBuilder();
+
+        topicsResponseBuilder.setRequestId(requestId).addAllTopics(topics);
+
+        CommandGetTopicsOfNamespaceResponse topicsOfNamespaceResponse = topicsResponseBuilder.build();
+        ByteBuf res = serializeWithSize(BaseCommand.newBuilder()
+            .setType(Type.GET_TOPICS_OF_NAMESPACE_RESPONSE)
+            .setGetTopicsOfNamespaceResponse(topicsOfNamespaceResponse));
+
+        topicsResponseBuilder.recycle();
+        topicsOfNamespaceResponse.recycle();
+        return res;
+    }
+
     private final static ByteBuf cmdPing;
 
     static {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
index 205619983..7ccb12ba1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/PulsarDecoder.java
@@ -32,6 +32,8 @@
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandConsumerStatsResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandError;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandFlow;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace;
+import org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopic;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandLookupTopicResponse;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandMessage;
@@ -271,6 +273,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception
                 handleActiveConsumerChange(cmd.getActiveConsumerChange());
                 cmd.getActiveConsumerChange().recycle();
                 break;
+
+            case GET_TOPICS_OF_NAMESPACE:
+                checkArgument(cmd.hasGetTopicsOfNamespace());
+                handleGetTopicsOfNamespace(cmd.getGetTopicsOfNamespace());
+                cmd.getGetTopicsOfNamespace().recycle();
+                break;
+
+            case GET_TOPICS_OF_NAMESPACE_RESPONSE:
+                checkArgument(cmd.hasGetTopicsOfNamespaceResponse());
+                handleGetTopicsOfNamespaceSuccess(cmd.getGetTopicsOfNamespaceResponse());
+                cmd.getGetTopicsOfNamespaceResponse().recycle();
+                break;
             }
         } finally {
             if (cmdBuilder != null) {
@@ -406,5 +420,13 @@ protected void handleGetLastMessageIdSuccess(PulsarApi.CommandGetLastMessageIdRe
         throw new UnsupportedOperationException();
     }
 
+    protected void handleGetTopicsOfNamespace(CommandGetTopicsOfNamespace commandGetTopicsOfNamespace) {
+        throw new UnsupportedOperationException();
+    }
+
+    protected void handleGetTopicsOfNamespaceSuccess(CommandGetTopicsOfNamespaceResponse response) {
+        throw new UnsupportedOperationException();
+    }
+
     private static final Logger log = LoggerFactory.getLogger(PulsarDecoder.class);
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 80df9b821..96174feca 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -21282,6 +21282,879 @@ public Builder clearRequestId() {
     // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetLastMessageIdResponse)
   }
   
+  public interface CommandGetTopicsOfNamespaceOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // required string namespace = 2;
+    boolean hasNamespace();
+    String getNamespace();
+  }
+  public static final class CommandGetTopicsOfNamespace extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetTopicsOfNamespaceOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use CommandGetTopicsOfNamespace.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespace> handle;
+    private CommandGetTopicsOfNamespace(io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespace> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetTopicsOfNamespace> RECYCLER = new io.netty.util.Recycler<CommandGetTopicsOfNamespace>() {
+            protected CommandGetTopicsOfNamespace newObject(Handle<CommandGetTopicsOfNamespace> handle) {
+              return new CommandGetTopicsOfNamespace(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetTopicsOfNamespace(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetTopicsOfNamespace defaultInstance;
+    public static CommandGetTopicsOfNamespace getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetTopicsOfNamespace getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // required string namespace = 2;
+    public static final int NAMESPACE_FIELD_NUMBER = 2;
+    private java.lang.Object namespace_;
+    public boolean hasNamespace() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public String getNamespace() {
+      java.lang.Object ref = namespace_;
+      if (ref instanceof String) {
+        return (String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        String s = bs.toStringUtf8();
+        if (com.google.protobuf.Internal.isValidUtf8(bs)) {
+          namespace_ = s;
+        }
+        return s;
+      }
+    }
+    private com.google.protobuf.ByteString getNamespaceBytes() {
+      java.lang.Object ref = namespace_;
+      if (ref instanceof String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8((String) ref);
+        namespace_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      namespace_ = "";
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      if (!hasNamespace()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(2, getNamespaceBytes());
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getNamespaceBytes());
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        namespace_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace result = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.namespace_ = namespace_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance()) return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (other.hasNamespace()) {
+          setNamespace(other.getNamespace());
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        if (!hasNamespace()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000002;
+              namespace_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // required string namespace = 2;
+      private java.lang.Object namespace_ = "";
+      public boolean hasNamespace() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      public String getNamespace() {
+        java.lang.Object ref = namespace_;
+        if (!(ref instanceof String)) {
+          String s = ((com.google.protobuf.ByteString) ref).toStringUtf8();
+          namespace_ = s;
+          return s;
+        } else {
+          return (String) ref;
+        }
+      }
+      public Builder setNamespace(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        namespace_ = value;
+        
+        return this;
+      }
+      public Builder clearNamespace() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        namespace_ = getDefaultInstance().getNamespace();
+        
+        return this;
+      }
+      void setNamespace(com.google.protobuf.ByteString value) {
+        bitField0_ |= 0x00000002;
+        namespace_ = value;
+        
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetTopicsOfNamespace)
+    }
+    
+    static {
+      defaultInstance = new CommandGetTopicsOfNamespace(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetTopicsOfNamespace)
+  }
+  
+  public interface CommandGetTopicsOfNamespaceResponseOrBuilder
+      extends com.google.protobuf.MessageLiteOrBuilder {
+    
+    // required uint64 request_id = 1;
+    boolean hasRequestId();
+    long getRequestId();
+    
+    // repeated string topics = 2;
+    java.util.List<String> getTopicsList();
+    int getTopicsCount();
+    String getTopics(int index);
+  }
+  public static final class CommandGetTopicsOfNamespaceResponse extends
+      com.google.protobuf.GeneratedMessageLite
+      implements CommandGetTopicsOfNamespaceResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream.ByteBufGeneratedMessage  {
+    // Use CommandGetTopicsOfNamespaceResponse.newBuilder() to construct.
+    private final io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespaceResponse> handle;
+    private CommandGetTopicsOfNamespaceResponse(io.netty.util.Recycler.Handle<CommandGetTopicsOfNamespaceResponse> handle) {
+      this.handle = handle;
+    }
+    
+     private static final io.netty.util.Recycler<CommandGetTopicsOfNamespaceResponse> RECYCLER = new io.netty.util.Recycler<CommandGetTopicsOfNamespaceResponse>() {
+            protected CommandGetTopicsOfNamespaceResponse newObject(Handle<CommandGetTopicsOfNamespaceResponse> handle) {
+              return new CommandGetTopicsOfNamespaceResponse(handle);
+            }
+          };
+        
+        public void recycle() {
+            this.initFields();
+            this.memoizedIsInitialized = -1;
+            this.bitField0_ = 0;
+            this.memoizedSerializedSize = -1;
+            handle.recycle(this);
+        }
+         
+    private CommandGetTopicsOfNamespaceResponse(boolean noInit) {
+        this.handle = null;
+    }
+    
+    private static final CommandGetTopicsOfNamespaceResponse defaultInstance;
+    public static CommandGetTopicsOfNamespaceResponse getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CommandGetTopicsOfNamespaceResponse getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    private int bitField0_;
+    // required uint64 request_id = 1;
+    public static final int REQUEST_ID_FIELD_NUMBER = 1;
+    private long requestId_;
+    public boolean hasRequestId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public long getRequestId() {
+      return requestId_;
+    }
+    
+    // repeated string topics = 2;
+    public static final int TOPICS_FIELD_NUMBER = 2;
+    private com.google.protobuf.LazyStringList topics_;
+    public java.util.List<String>
+        getTopicsList() {
+      return topics_;
+    }
+    public int getTopicsCount() {
+      return topics_.size();
+    }
+    public String getTopics(int index) {
+      return topics_.get(index);
+    }
+    
+    private void initFields() {
+      requestId_ = 0L;
+      topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasRequestId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+        throw new RuntimeException("Cannot use CodedOutputStream");
+    }
+    
+    public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeUInt64(1, requestId_);
+      }
+      for (int i = 0; i < topics_.size(); i++) {
+        output.writeBytes(2, topics_.getByteString(i));
+      }
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt64Size(1, requestId_);
+      }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < topics_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(topics_.getByteString(i));
+        }
+        size += dataSize;
+        size += 1 * getTopicsList().size();
+      }
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+         throw new RuntimeException("Disabled");
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessageLite.Builder<
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse, Builder>
+        implements org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponseOrBuilder, org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.ByteBufMessageBuilder  {
+      // Construct using org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.newBuilder()
+      private final io.netty.util.Recycler.Handle<Builder> handle;
+      private Builder(io.netty.util.Recycler.Handle<Builder> handle) {
+        this.handle = handle;
+        maybeForceBuilderInitialization();
+      }
+      private final static io.netty.util.Recycler<Builder> RECYCLER = new io.netty.util.Recycler<Builder>() {
+         protected Builder newObject(io.netty.util.Recycler.Handle<Builder> handle) {
+               return new Builder(handle);
+             }
+            };
+      
+       public void recycle() {
+                clear();
+                handle.recycle(this);
+            }
+      
+      private void maybeForceBuilderInitialization() {
+      }
+      private static Builder create() {
+        return RECYCLER.get();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        requestId_ = 0L;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getDefaultInstanceForType() {
+        return org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse build() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse buildPartial() {
+        org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse result = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.RECYCLER.get();
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.requestId_ = requestId_;
+        if (((bitField0_ & 0x00000002) == 0x00000002)) {
+          topics_ = new com.google.protobuf.UnmodifiableLazyStringList(
+              topics_);
+          bitField0_ = (bitField0_ & ~0x00000002);
+        }
+        result.topics_ = topics_;
+        result.bitField0_ = to_bitField0_;
+        return result;
+      }
+      
+      public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse other) {
+        if (other == org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance()) return this;
+        if (other.hasRequestId()) {
+          setRequestId(other.getRequestId());
+        }
+        if (!other.topics_.isEmpty()) {
+          if (topics_.isEmpty()) {
+            topics_ = other.topics_;
+            bitField0_ = (bitField0_ & ~0x00000002);
+          } else {
+            ensureTopicsIsMutable();
+            topics_.addAll(other.topics_);
+          }
+          
+        }
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasRequestId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.CodedInputStream input,
+                              com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+                              throws java.io.IOException {
+         throw new java.io.IOException("Merge from CodedInputStream is disabled");
+                              }
+      public Builder mergeFrom(
+          org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              
+              return this;
+            default: {
+              if (!input.skipField(tag)) {
+                
+                return this;
+              }
+              break;
+            }
+            case 8: {
+              bitField0_ |= 0x00000001;
+              requestId_ = input.readUInt64();
+              break;
+            }
+            case 18: {
+              ensureTopicsIsMutable();
+              topics_.add(input.readBytes());
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required uint64 request_id = 1;
+      private long requestId_ ;
+      public boolean hasRequestId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public long getRequestId() {
+        return requestId_;
+      }
+      public Builder setRequestId(long value) {
+        bitField0_ |= 0x00000001;
+        requestId_ = value;
+        
+        return this;
+      }
+      public Builder clearRequestId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        requestId_ = 0L;
+        
+        return this;
+      }
+      
+      // repeated string topics = 2;
+      private com.google.protobuf.LazyStringList topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      private void ensureTopicsIsMutable() {
+        if (!((bitField0_ & 0x00000002) == 0x00000002)) {
+          topics_ = new com.google.protobuf.LazyStringArrayList(topics_);
+          bitField0_ |= 0x00000002;
+         }
+      }
+      public java.util.List<String>
+          getTopicsList() {
+        return java.util.Collections.unmodifiableList(topics_);
+      }
+      public int getTopicsCount() {
+        return topics_.size();
+      }
+      public String getTopics(int index) {
+        return topics_.get(index);
+      }
+      public Builder setTopics(
+          int index, String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureTopicsIsMutable();
+        topics_.set(index, value);
+        
+        return this;
+      }
+      public Builder addTopics(String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureTopicsIsMutable();
+        topics_.add(value);
+        
+        return this;
+      }
+      public Builder addAllTopics(
+          java.lang.Iterable<String> values) {
+        ensureTopicsIsMutable();
+        super.addAll(values, topics_);
+        
+        return this;
+      }
+      public Builder clearTopics() {
+        topics_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000002);
+        
+        return this;
+      }
+      void addTopics(com.google.protobuf.ByteString value) {
+        ensureTopicsIsMutable();
+        topics_.add(value);
+        
+      }
+      
+      // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandGetTopicsOfNamespaceResponse)
+    }
+    
+    static {
+      defaultInstance = new CommandGetTopicsOfNamespaceResponse(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:pulsar.proto.CommandGetTopicsOfNamespaceResponse)
+  }
+  
   public interface BaseCommandOrBuilder
       extends com.google.protobuf.MessageLiteOrBuilder {
     
@@ -21408,6 +22281,14 @@ public Builder clearRequestId() {
     // optional .pulsar.proto.CommandActiveConsumerChange active_consumer_change = 31;
     boolean hasActiveConsumerChange();
     org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange getActiveConsumerChange();
+    
+    // optional .pulsar.proto.CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
+    boolean hasGetTopicsOfNamespace();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getGetTopicsOfNamespace();
+    
+    // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33;
+    boolean hasGetTopicsOfNamespaceResponse();
+    org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getGetTopicsOfNamespaceResponse();
   }
   public static final class BaseCommand extends
       com.google.protobuf.GeneratedMessageLite
@@ -21428,6 +22309,7 @@ public void recycle() {
             this.initFields();
             this.memoizedIsInitialized = -1;
             this.bitField0_ = 0;
+            this.bitField1_ = 0;
             this.memoizedSerializedSize = -1;
             handle.recycle(this);
         }
@@ -21477,6 +22359,8 @@ public BaseCommand getDefaultInstanceForType() {
       GET_LAST_MESSAGE_ID(27, 29),
       GET_LAST_MESSAGE_ID_RESPONSE(28, 30),
       ACTIVE_CONSUMER_CHANGE(29, 31),
+      GET_TOPICS_OF_NAMESPACE(30, 32),
+      GET_TOPICS_OF_NAMESPACE_RESPONSE(31, 33),
       ;
       
       public static final int CONNECT_VALUE = 2;
@@ -21509,6 +22393,8 @@ public BaseCommand getDefaultInstanceForType() {
       public static final int GET_LAST_MESSAGE_ID_VALUE = 29;
       public static final int GET_LAST_MESSAGE_ID_RESPONSE_VALUE = 30;
       public static final int ACTIVE_CONSUMER_CHANGE_VALUE = 31;
+      public static final int GET_TOPICS_OF_NAMESPACE_VALUE = 32;
+      public static final int GET_TOPICS_OF_NAMESPACE_RESPONSE_VALUE = 33;
       
       
       public final int getNumber() { return value; }
@@ -21545,6 +22431,8 @@ public static Type valueOf(int value) {
           case 29: return GET_LAST_MESSAGE_ID;
           case 30: return GET_LAST_MESSAGE_ID_RESPONSE;
           case 31: return ACTIVE_CONSUMER_CHANGE;
+          case 32: return GET_TOPICS_OF_NAMESPACE;
+          case 33: return GET_TOPICS_OF_NAMESPACE_RESPONSE;
           default: return null;
         }
       }
@@ -21571,6 +22459,7 @@ private Type(int index, int value) {
     }
     
     private int bitField0_;
+    private int bitField1_;
     // required .pulsar.proto.BaseCommand.Type type = 1;
     public static final int TYPE_FIELD_NUMBER = 1;
     private org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type type_;
@@ -21881,6 +22770,26 @@ public boolean hasActiveConsumerChange() {
       return activeConsumerChange_;
     }
     
+    // optional .pulsar.proto.CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
+    public static final int GETTOPICSOFNAMESPACE_FIELD_NUMBER = 32;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getTopicsOfNamespace_;
+    public boolean hasGetTopicsOfNamespace() {
+      return ((bitField0_ & 0x80000000) == 0x80000000);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getGetTopicsOfNamespace() {
+      return getTopicsOfNamespace_;
+    }
+    
+    // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33;
+    public static final int GETTOPICSOFNAMESPACERESPONSE_FIELD_NUMBER = 33;
+    private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse_;
+    public boolean hasGetTopicsOfNamespaceResponse() {
+      return ((bitField1_ & 0x00000001) == 0x00000001);
+    }
+    public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getGetTopicsOfNamespaceResponse() {
+      return getTopicsOfNamespaceResponse_;
+    }
+    
     private void initFields() {
       type_ = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
       connect_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandConnect.getDefaultInstance();
@@ -21913,6 +22822,8 @@ private void initFields() {
       getLastMessageId_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageId.getDefaultInstance();
       getLastMessageIdResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetLastMessageIdResponse.getDefaultInstance();
       activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
+      getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+      getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -22091,6 +23002,18 @@ public final boolean isInitialized() {
           return false;
         }
       }
+      if (hasGetTopicsOfNamespace()) {
+        if (!getGetTopicsOfNamespace().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
+      if (hasGetTopicsOfNamespaceResponse()) {
+        if (!getGetTopicsOfNamespaceResponse().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -22196,6 +23119,12 @@ public void writeTo(org.apache.pulsar.common.util.protobuf.ByteBufCodedOutputStr
       if (((bitField0_ & 0x40000000) == 0x40000000)) {
         output.writeMessage(31, activeConsumerChange_);
       }
+      if (((bitField0_ & 0x80000000) == 0x80000000)) {
+        output.writeMessage(32, getTopicsOfNamespace_);
+      }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        output.writeMessage(33, getTopicsOfNamespaceResponse_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -22328,6 +23257,14 @@ public int getSerializedSize() {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(31, activeConsumerChange_);
       }
+      if (((bitField0_ & 0x80000000) == 0x80000000)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(32, getTopicsOfNamespace_);
+      }
+      if (((bitField1_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(33, getTopicsOfNamespaceResponse_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -22503,6 +23440,10 @@ public Builder clear() {
         bitField0_ = (bitField0_ & ~0x20000000);
         activeConsumerChange_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandActiveConsumerChange.getDefaultInstance();
         bitField0_ = (bitField0_ & ~0x40000000);
+        getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+        bitField0_ = (bitField0_ & ~0x80000000);
+        getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+        bitField1_ = (bitField1_ & ~0x00000001);
         return this;
       }
       
@@ -22535,7 +23476,9 @@ public Builder clone() {
       public org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand buildPartial() {
         org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand result = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.RECYCLER.get();
         int from_bitField0_ = bitField0_;
+        int from_bitField1_ = bitField1_;
         int to_bitField0_ = 0;
+        int to_bitField1_ = 0;
         if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
           to_bitField0_ |= 0x00000001;
         }
@@ -22660,7 +23603,16 @@ public Builder clone() {
           to_bitField0_ |= 0x40000000;
         }
         result.activeConsumerChange_ = activeConsumerChange_;
+        if (((from_bitField0_ & 0x80000000) == 0x80000000)) {
+          to_bitField0_ |= 0x80000000;
+        }
+        result.getTopicsOfNamespace_ = getTopicsOfNamespace_;
+        if (((from_bitField1_ & 0x00000001) == 0x00000001)) {
+          to_bitField1_ |= 0x00000001;
+        }
+        result.getTopicsOfNamespaceResponse_ = getTopicsOfNamespaceResponse_;
         result.bitField0_ = to_bitField0_;
+        result.bitField1_ = to_bitField1_;
         return result;
       }
       
@@ -22759,6 +23711,12 @@ public Builder mergeFrom(org.apache.pulsar.common.api.proto.PulsarApi.BaseComman
         if (other.hasActiveConsumerChange()) {
           mergeActiveConsumerChange(other.getActiveConsumerChange());
         }
+        if (other.hasGetTopicsOfNamespace()) {
+          mergeGetTopicsOfNamespace(other.getGetTopicsOfNamespace());
+        }
+        if (other.hasGetTopicsOfNamespaceResponse()) {
+          mergeGetTopicsOfNamespaceResponse(other.getGetTopicsOfNamespaceResponse());
+        }
         return this;
       }
       
@@ -22935,6 +23893,18 @@ public final boolean isInitialized() {
             return false;
           }
         }
+        if (hasGetTopicsOfNamespace()) {
+          if (!getGetTopicsOfNamespace().isInitialized()) {
+            
+            return false;
+          }
+        }
+        if (hasGetTopicsOfNamespaceResponse()) {
+          if (!getGetTopicsOfNamespaceResponse().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -23269,11 +24239,32 @@ public Builder mergeFrom(
               subBuilder.recycle();
               break;
             }
+            case 258: {
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.newBuilder();
+              if (hasGetTopicsOfNamespace()) {
+                subBuilder.mergeFrom(getGetTopicsOfNamespace());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetTopicsOfNamespace(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
+            case 266: {
+              org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.Builder subBuilder = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.newBuilder();
+              if (hasGetTopicsOfNamespaceResponse()) {
+                subBuilder.mergeFrom(getGetTopicsOfNamespaceResponse());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setGetTopicsOfNamespaceResponse(subBuilder.buildPartial());
+              subBuilder.recycle();
+              break;
+            }
           }
         }
       }
       
       private int bitField0_;
+      private int bitField1_;
       
       // required .pulsar.proto.BaseCommand.Type type = 1;
       private org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type type_ = org.apache.pulsar.common.api.proto.PulsarApi.BaseCommand.Type.CONNECT;
@@ -24589,6 +25580,92 @@ public Builder clearActiveConsumerChange() {
         return this;
       }
       
+      // optional .pulsar.proto.CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+      public boolean hasGetTopicsOfNamespace() {
+        return ((bitField0_ & 0x80000000) == 0x80000000);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace getGetTopicsOfNamespace() {
+        return getTopicsOfNamespace_;
+      }
+      public Builder setGetTopicsOfNamespace(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getTopicsOfNamespace_ = value;
+        
+        bitField0_ |= 0x80000000;
+        return this;
+      }
+      public Builder setGetTopicsOfNamespace(
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.Builder builderForValue) {
+        getTopicsOfNamespace_ = builderForValue.build();
+        
+        bitField0_ |= 0x80000000;
+        return this;
+      }
+      public Builder mergeGetTopicsOfNamespace(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace value) {
+        if (((bitField0_ & 0x80000000) == 0x80000000) &&
+            getTopicsOfNamespace_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance()) {
+          getTopicsOfNamespace_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.newBuilder(getTopicsOfNamespace_).mergeFrom(value).buildPartial();
+        } else {
+          getTopicsOfNamespace_ = value;
+        }
+        
+        bitField0_ |= 0x80000000;
+        return this;
+      }
+      public Builder clearGetTopicsOfNamespace() {
+        getTopicsOfNamespace_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespace.getDefaultInstance();
+        
+        bitField0_ = (bitField0_ & ~0x80000000);
+        return this;
+      }
+      
+      // optional .pulsar.proto.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33;
+      private org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+      public boolean hasGetTopicsOfNamespaceResponse() {
+        return ((bitField1_ & 0x00000001) == 0x00000001);
+      }
+      public org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse getGetTopicsOfNamespaceResponse() {
+        return getTopicsOfNamespaceResponse_;
+      }
+      public Builder setGetTopicsOfNamespaceResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse value) {
+        if (value == null) {
+          throw new NullPointerException();
+        }
+        getTopicsOfNamespaceResponse_ = value;
+        
+        bitField1_ |= 0x00000001;
+        return this;
+      }
+      public Builder setGetTopicsOfNamespaceResponse(
+          org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.Builder builderForValue) {
+        getTopicsOfNamespaceResponse_ = builderForValue.build();
+        
+        bitField1_ |= 0x00000001;
+        return this;
+      }
+      public Builder mergeGetTopicsOfNamespaceResponse(org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse value) {
+        if (((bitField1_ & 0x00000001) == 0x00000001) &&
+            getTopicsOfNamespaceResponse_ != org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance()) {
+          getTopicsOfNamespaceResponse_ =
+            org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.newBuilder(getTopicsOfNamespaceResponse_).mergeFrom(value).buildPartial();
+        } else {
+          getTopicsOfNamespaceResponse_ = value;
+        }
+        
+        bitField1_ |= 0x00000001;
+        return this;
+      }
+      public Builder clearGetTopicsOfNamespaceResponse() {
+        getTopicsOfNamespaceResponse_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandGetTopicsOfNamespaceResponse.getDefaultInstance();
+        
+        bitField1_ = (bitField1_ & ~0x00000001);
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.BaseCommand)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
index b77309c57..aba4d8ffa 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/naming/NamespaceName.java
@@ -206,5 +206,4 @@ public NamespaceName getNamespaceObject() {
     public boolean includes(TopicName topicName) {
         return this.equals(topicName.getNamespaceObject());
     }
-
 }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 4a1112d0a..826e55082 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -136,7 +136,8 @@ enum ProtocolVersion {
 	v10 = 10;// Added proxy to broker
 	v11 = 11;// C++ consumers before this version are not correctly handling the checksum field
 	v12 = 12;// Added get topic's last messageId from broker
-                 // Added CommandActiveConsumerChange
+			 // Added CommandActiveConsumerChange
+			 // Added CommandGetTopicsOfNamespace
 }
 
 message CommandConnect {
@@ -461,6 +462,16 @@ message CommandGetLastMessageIdResponse {
 	required uint64 request_id  = 2;
 }
 
+message CommandGetTopicsOfNamespace {
+	required uint64 request_id	= 1;
+	required string namespace	= 2;
+}
+
+message CommandGetTopicsOfNamespaceResponse {
+	required uint64 request_id	= 1;
+	repeated string topics 		= 2;
+}
+
 message BaseCommand {
 	enum Type {
 		CONNECT     = 2;
@@ -508,9 +519,14 @@ message BaseCommand {
 		GET_LAST_MESSAGE_ID = 29;
 		GET_LAST_MESSAGE_ID_RESPONSE = 30;
 
-                ACTIVE_CONSUMER_CHANGE = 31;
+		ACTIVE_CONSUMER_CHANGE = 31;
+
+
+		GET_TOPICS_OF_NAMESPACE 			= 32;
+		GET_TOPICS_OF_NAMESPACE_RESPONSE 	= 33;
 	}
 
+
 	required Type type = 1;
 
 	optional CommandConnect connect          = 2;
@@ -553,6 +569,9 @@ message BaseCommand {
 	optional CommandGetLastMessageId getLastMessageId = 29;
 	optional CommandGetLastMessageIdResponse getLastMessageIdResponse = 30;
 
-        optional CommandActiveConsumerChange active_consumer_change = 31;
+	optional CommandActiveConsumerChange active_consumer_change = 31;
+
+	optional CommandGetTopicsOfNamespace getTopicsOfNamespace = 32;
+	optional CommandGetTopicsOfNamespaceResponse getTopicsOfNamespaceResponse = 33;
 
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services