You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/09/27 15:42:18 UTC

[pulsar] branch master updated: Allow for topic deletions with regex consumers (#5230)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 07bb845  Allow for topic deletions with regex consumers (#5230)
07bb845 is described below

commit 07bb84532f3da80d1717c16f377fade16b8c4360
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Sep 27 08:42:11 2019 -0700

    Allow for topic deletions with regex consumers (#5230)
    
    * Allow for topic deletions with regex consumers
    
    * Fixed test compilation
    
    * One more compile fix
    
    * Fixed BrokerServiceAutoTopicCreationTest
---
 .../broker/service/BrokerServiceException.java     |   8 ++
 .../apache/pulsar/broker/service/ServerCnx.java    |  19 +++-
 .../apache/pulsar/client/impl/RawReaderImpl.java   |   3 +-
 .../BrokerServiceAutoTopicCreationTest.java        |   3 +
 .../client/impl/PatternTopicsConsumerImplTest.java |  49 ++++++++++
 .../pulsar/client/impl/TopicsConsumerImplTest.java |   2 +-
 .../pulsar/client/api/PulsarClientException.java   |  16 ++++
 .../org/apache/pulsar/client/cli/CmdConsume.java   |  44 ++++++---
 .../org/apache/pulsar/client/impl/ClientCnx.java   |   2 +
 .../apache/pulsar/client/impl/ConsumerImpl.java    |  46 +++++++---
 .../client/impl/MultiTopicsConsumerImpl.java       | 102 ++++++++++++++++-----
 .../impl/PatternMultiTopicsConsumerImpl.java       |   7 +-
 .../pulsar/client/impl/PulsarClientImpl.java       |   7 +-
 .../org/apache/pulsar/client/impl/ReaderImpl.java  |   6 +-
 .../pulsar/client/impl/ZeroQueueConsumerImpl.java  |  13 +--
 .../pulsar/client/impl/ConsumerImplTest.java       |   4 +-
 .../client/impl/MultiTopicsConsumerImplTest.java   |   2 +-
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  57 ++++++++++++
 .../apache/pulsar/common/protocol/Commands.java    |   8 +-
 pulsar-common/src/main/proto/PulsarApi.proto       |   9 +-
 20 files changed, 326 insertions(+), 81 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
index 2e28700..3a7c542 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerServiceException.java
@@ -116,6 +116,12 @@ public class BrokerServiceException extends Exception {
         }
     }
 
+    public static class TopicNotFoundException extends BrokerServiceException {
+        public TopicNotFoundException(String msg) {
+            super(msg);
+        }
+    }
+
     public static class SubscriptionBusyException extends BrokerServiceException {
         public SubscriptionBusyException(String msg) {
             super(msg);
@@ -180,6 +186,8 @@ public class BrokerServiceException extends Exception {
         } else if (t instanceof ServiceUnitNotReadyException || t instanceof TopicFencedException
                 || t instanceof SubscriptionFencedException) {
             return PulsarApi.ServerError.ServiceNotReady;
+        } else if (t instanceof TopicNotFoundException) {
+            return PulsarApi.ServerError.TopicNotFound;
         } else if (t instanceof IncompatibleSchemaException
             || t instanceof InvalidSchemaDataException) {
             // for backward compatible with old clients, invalid schema data
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index 1cab8e6..7a3a3d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -58,6 +58,7 @@ import org.apache.pulsar.broker.authentication.AuthenticationState;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException;
+import org.apache.pulsar.broker.service.BrokerServiceException.TopicNotFoundException;
 import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
 import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
 import org.apache.pulsar.broker.web.RestException;
@@ -611,6 +612,7 @@ public class ServerCnx extends PulsarHandler {
         final InitialPosition initialPosition = subscribe.getInitialPosition();
         final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null;
         final boolean isReplicated = subscribe.hasReplicateSubscriptionState() && subscribe.getReplicateSubscriptionState();
+        final boolean forceTopicCreation = subscribe.getForceTopicCreation();
 
         CompletableFuture<Boolean> isProxyAuthorizedFuture;
         if (service.isAuthorizationEnabled() && originalPrincipal != null) {
@@ -676,8 +678,18 @@ public class ServerCnx extends PulsarHandler {
                             }
                         }
 
-                        service.getOrCreateTopic(topicName.toString())
-                                .thenCompose(topic -> {
+                        boolean createTopicIfDoesNotExist = forceTopicCreation
+                                && service.pulsar().getConfig().isAllowAutoTopicCreation();
+
+                        service.getTopic(topicName.toString(), createTopicIfDoesNotExist)
+                                .thenCompose(optTopic -> {
+                                    if (!optTopic.isPresent()) {
+                                        return FutureUtil
+                                                .failedFuture(new TopicNotFoundException("Topic does not exist"));
+                                    }
+
+                                    Topic topic = optTopic.get();
+
                                     if (schema != null) {
                                         return topic.addSchemaIfIdleOrCheckCompatible(schema)
                                             .thenCompose(isCompatible -> {
@@ -728,6 +740,9 @@ public class ServerCnx extends PulsarHandler {
                                                     remoteAddress, topicName, subscriptionName,
                                                     exception.getCause().getMessage());
                                         }
+                                    } else if (exception.getCause() instanceof BrokerServiceException) {
+                                        log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
+                                                subscriptionName, exception.getCause().getMessage());
                                     } else {
                                         log.warn("[{}][{}][{}] Failed to create consumer: {}", remoteAddress, topicName,
                                                 subscriptionName, exception.getCause().getMessage(), exception);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
index a59349d..2f9de09 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java
@@ -112,7 +112,8 @@ public class RawReaderImpl implements RawReader {
                 consumerFuture,
                 SubscriptionMode.Durable,
                 MessageId.earliest,
-                Schema.BYTES, null
+                Schema.BYTES, null,
+                true
             );
             incomingRawMessages = new GrowableArrayBlockingQueue<>();
             pendingRawReceives = new ConcurrentLinkedQueue<>();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
index cd1aec9..929f9c2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceAutoTopicCreationTest.java
@@ -22,6 +22,8 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -77,6 +79,7 @@ public class BrokerServiceAutoTopicCreationTest extends BrokerTestBase{
         final String subscriptionName = "test-topic-sub";
         try {
             pulsarClient.newConsumer().topic(topicName).subscriptionName(subscriptionName).subscribe();
+            fail("Subscribe operation should have failed");
         } catch (Exception e) {
             assertTrue(e instanceof PulsarClientException);
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
index 18cf12c..7413825 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PatternTopicsConsumerImplTest.java
@@ -27,6 +27,7 @@ import static org.testng.Assert.fail;
 import com.google.common.collect.Lists;
 
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
@@ -41,6 +42,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.NamespaceName;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -760,4 +762,51 @@ public class PatternTopicsConsumerImplTest extends ProducerConsumerBase {
         producer2.close();
         producer3.close();
     }
+
+    @Test()
+    public void testTopicDeletion() throws Exception {
+        String baseTopicName = "persistent://my-property/my-ns/pattern-topic-" + System.currentTimeMillis();
+        Pattern pattern = Pattern.compile(baseTopicName + ".*");
+
+        // Create 2 topics
+        Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
+                .topic(baseTopicName + "-1")
+                .create();
+        Producer<String> producer2 = pulsarClient.newProducer(Schema.STRING)
+                .topic(baseTopicName + "-2")
+                .create();
+
+        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
+            .topicsPattern(pattern)
+            .patternAutoDiscoveryPeriod(1)
+            .subscriptionName("sub")
+            .subscribe();
+
+        assertTrue(consumer instanceof PatternMultiTopicsConsumerImpl);
+        PatternMultiTopicsConsumerImpl<String> consumerImpl = (PatternMultiTopicsConsumerImpl<String>) consumer;
+
+        // 4. verify consumer get methods
+        assertSame(consumerImpl.getPattern(), pattern);
+        assertEquals(consumerImpl.getTopics().size(), 2);
+
+        producer1.send("msg-1");
+
+        producer1.close();
+
+        Message<String> message = consumer.receive();
+        assertEquals(message.getValue(), "msg-1");
+        consumer.acknowledge(message);
+
+        // Force delete the topic while the regex consumer is connected
+        admin.topics().delete(baseTopicName + "-1", true);
+
+        producer2.send("msg-2");
+
+        message = consumer.receive();
+        assertEquals(message.getValue(), "msg-2");
+        consumer.acknowledge(message);
+
+        assertEquals(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-1").join(), Optional.empty());
+        assertTrue(pulsar.getBrokerService().getTopicIfExists(baseTopicName + "-2").join().isPresent());
+    }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
index f2eaca9..bb70d7f 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicsConsumerImplTest.java
@@ -528,7 +528,7 @@ public class TopicsConsumerImplTest extends ProducerConsumerBase {
         assertEquals(((MultiTopicsConsumerImpl<byte[]>) consumer).getTopics().size(), 2);
 
         // 8. re-subscribe topic3
-        CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3);
+        CompletableFuture<Void> subFuture = ((MultiTopicsConsumerImpl<byte[]>)consumer).subscribeAsync(topicName3, true);
         subFuture.get();
 
         // 9. producer publish messages
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
index b125458..9f9c38f 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientException.java
@@ -180,6 +180,22 @@ public class PulsarClientException extends IOException {
     }
 
     /**
+     * Topic does not exist and cannot be created.
+     */
+    public static class TopicDoesNotExistException extends PulsarClientException {
+        /**
+         * Constructs an {@code TopicDoesNotExistException} with the specified detail message.
+         *
+         * @param msg
+         *        The detail message (which is saved for later retrieval
+         *        by the {@link #getMessage()} method)
+         */
+        public TopicDoesNotExistException(String msg) {
+            super(msg);
+        }
+    }
+
+    /**
      * Lookup exception thrown by Pulsar client.
      */
     public static class LookupException extends PulsarClientException {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
index 4f86556..cb3463d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/client/cli/CmdConsume.java
@@ -18,6 +18,14 @@
  */
 package org.apache.pulsar.client.cli;
 
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.ParameterException;
+import com.beust.jcommander.Parameters;
+import com.google.common.util.concurrent.RateLimiter;
+import com.google.gson.Gson;
+import com.google.gson.JsonObject;
+import com.google.gson.JsonPrimitive;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.net.URI;
@@ -29,6 +37,7 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.regex.Pattern;
 
 import org.apache.commons.io.HexDump;
 import org.apache.commons.lang3.StringUtils;
@@ -36,6 +45,7 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.api.AuthenticationDataProvider;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.ConsumerBuilder;
 import org.apache.pulsar.client.api.Message;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
@@ -54,14 +64,6 @@ import org.eclipse.jetty.websocket.client.WebSocketClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
-import com.beust.jcommander.Parameters;
-import com.google.common.util.concurrent.RateLimiter;
-import com.google.gson.Gson;
-import com.google.gson.JsonObject;
-import com.google.gson.JsonPrimitive;
-
 /**
  * pulsar-client consume command implementation.
  *
@@ -91,7 +93,10 @@ public class CmdConsume {
     @Parameter(names = { "-r", "--rate" }, description = "Rate (in msg/sec) at which to consume, "
             + "value 0 means to consume messages as fast as possible.")
     private double consumeRate = 0;
-    
+
+    @Parameter(names = { "--regex" }, description = "Indicate thetopic name is a regex pattern")
+    private boolean isRegex = false;
+
     private ClientBuilder clientBuilder;
     private Authentication authentication;
     private String serviceURL;
@@ -144,7 +149,7 @@ public class CmdConsume {
             throw (new ParameterException("Number of messages should be zero or positive."));
 
         String topic = this.mainOptions.get(0);
-        
+
         if(this.serviceURL.startsWith("ws")) {
             return consumeFromWebSocket(topic);
         }else {
@@ -158,8 +163,17 @@ public class CmdConsume {
 
         try {
             PulsarClient client = clientBuilder.build();
-            Consumer<byte[]> consumer = client.newConsumer().topic(topic).subscriptionName(this.subscriptionName)
-                    .subscriptionType(subscriptionType).subscribe();
+            ConsumerBuilder<byte[]> builder = client.newConsumer()
+                    .subscriptionName(this.subscriptionName)
+                    .subscriptionType(subscriptionType);
+
+            if (isRegex) {
+                builder.topicsPattern(Pattern.compile(topic));
+            } else {
+                builder.topic(topic);
+            }
+
+            Consumer<byte[]> consumer = builder.subscribe();
 
             RateLimiter limiter = (this.consumeRate > 0) ? RateLimiter.create(this.consumeRate) : null;
             while (this.numMessagesToConsume == 0 || numMessagesConsumed < this.numMessagesToConsume) {
@@ -197,13 +211,13 @@ public class CmdConsume {
         int returnCode = 0;
 
         TopicName topicName = TopicName.get(topic);
-        
+
         String wsTopic = String.format(
                 "%s/%s/" + (StringUtils.isEmpty(topicName.getCluster()) ? "" : topicName.getCluster() + "/")
                         + "%s/%s/%s?subscriptionType=%s",
                 topicName.getDomain(), topicName.getTenant(), topicName.getNamespacePortion(), topicName.getLocalName(),
                 subscriptionName, subscriptionType.toString());
-        
+
         String consumerBaseUri = serviceURL + (serviceURL.endsWith("/") ? "" : "/") + "ws/consumer/" + wsTopic;
         URI consumerUri = URI.create(consumerBaseUri);
 
@@ -252,7 +266,7 @@ public class CmdConsume {
                     LOG.debug("No message to consume after waiting for 5 seconds.");
                 } else {
                     try {
-                        System.out.println(Base64.getDecoder().decode(msg));    
+                        System.out.println(Base64.getDecoder().decode(msg));
                     }catch(Exception e) {
                         System.out.println(msg);
                     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 32f73c9..c5e0d9d 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -911,6 +911,8 @@ public class ClientCnx extends PulsarHandler {
             return new PulsarClientException.TopicTerminatedException(errorMsg);
         case IncompatibleSchema:
             return new PulsarClientException.IncompatibleSchemaException(errorMsg);
+        case TopicNotFound:
+            return new PulsarClientException.TopicDoesNotExistException(errorMsg);
         case UnknownError:
         default:
             return new PulsarClientException(errorMsg);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index b13e2fa..7a1dc1f 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -61,6 +61,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
 import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
 import org.apache.pulsar.client.impl.transaction.TransactionImpl;
 import org.apache.pulsar.common.protocol.Commands;
@@ -141,6 +142,8 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
 
     protected volatile boolean paused;
 
+    private final boolean createTopicIfDoesNotExist;
+
     enum SubscriptionMode {
         // Make the subscription to be backed by a durable cursor that will retain messages and persist the current
         // position
@@ -151,20 +154,25 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
     }
 
     static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-                                               ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-                                               SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+            ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+            SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+            boolean createTopicIfDoesNotExist) {
     	if (conf.getReceiverQueueSize() == 0) {
-            return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
-                    subscriptionMode, startMessageId, schema, interceptors);
+            return new ZeroQueueConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
+                    subscribeFuture,
+                    subscriptionMode, startMessageId, schema, interceptors,
+                    createTopicIfDoesNotExist);
         } else {
-            return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture,
-                    subscriptionMode, startMessageId, schema, interceptors);
+            return new ConsumerImpl<>(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer,
+                    subscribeFuture,
+                    subscriptionMode, startMessageId, schema, interceptors, createTopicIfDoesNotExist);
         }
     }
 
     protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-                           ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-                           SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors) {
+                 ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
+                 SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema, ConsumerInterceptors<T> interceptors,
+                 boolean createTopicIfDoesNotExist) {
         super(client, topic, conf, conf.getReceiverQueueSize(), listenerExecutor, subscribeFuture, schema, interceptors);
         this.consumerId = client.newConsumerId();
         this.subscriptionMode = subscriptionMode;
@@ -179,6 +187,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
         this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
         this.resetIncludeHead = conf.isResetIncludeHead();
+        this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
 
         if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
             stats = new ConsumerStatsRecorderImpl(client, conf, this);
@@ -510,7 +519,7 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
         ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel,
                 consumerName, isDurable, startMessageIdData, metadata, readCompacted,
                 conf.isReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
-                si);
+                si, createTopicIfDoesNotExist);
         if (startMessageIdData != null) {
             startMessageIdData.recycle();
         }
@@ -548,18 +557,27 @@ public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandle
                 return null;
             }
             log.warn("[{}][{}] Failed to subscribe to topic on {}", topic, subscription, cnx.channel().remoteAddress());
-            if (e.getCause() instanceof PulsarClientException && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
+
+            if (e.getCause() instanceof PulsarClientException
+                    && getConnectionHandler().isRetriableError((PulsarClientException) e.getCause())
                     && System.currentTimeMillis() < subscribeTimeout) {
                 reconnectLater(e.getCause());
-                return null;
-            }
-
-            if (!subscribeFuture.isDone()) {
+            } else if (!subscribeFuture.isDone()) {
                 // unable to create new consumer, fail operation
                 setState(State.Failed);
                 closeConsumerTasks();
                 subscribeFuture.completeExceptionally(e);
                 client.cleanupConsumer(this);
+            } else if (e.getCause() instanceof TopicDoesNotExistException) {
+                // The topic was deleted after the consumer was created, and we're
+                // not allowed to recreate the topic. This can happen in few cases:
+                //  * Regex consumer getting error after topic gets deleted
+                //  * Regular consumer after topic is manually delete and with
+                //    auto-topic-creation set to false
+                // No more retries are needed in this case.
+                setState(State.Failed);
+                client.cleanupConsumer(this);
+                log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}", topic, subscription, cnx.channel().remoteAddress());
             } else {
                 // consumer was subscribed and connected but we got some error, keep trying
                 reconnectLater(e.getCause());
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 3986285..f950f16 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -45,6 +45,8 @@ import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.ConsumerStats;
@@ -101,14 +103,14 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-            ConsumerInterceptors<T> interceptors) {
+            ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
         this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor,
-                subscribeFuture, schema, interceptors);
+                subscribeFuture, schema, interceptors, createTopicIfDoesNotExist);
     }
 
     MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
-            ConsumerInterceptors<T> interceptors) {
+            ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
         super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
                 schema, interceptors);
 
@@ -152,7 +154,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         this.namespaceName = conf.getTopicNames().stream().findFirst()
                 .flatMap(s -> Optional.of(TopicName.get(s).getNamespaceObject())).get();
 
-        List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(this::subscribeAsync)
+        List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
                 .collect(Collectors.toList());
         FutureUtil.waitForAll(futures)
             .thenAccept(finalFuture -> {
@@ -661,7 +663,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
     }
 
     // subscribe one more given topic
-    public CompletableFuture<Void> subscribeAsync(String topicName) {
+    public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
         if (!topicNameValid(topicName)) {
             return FutureUtil.failedFuture(
                 new PulsarClientException.AlreadyClosedException("Topic name not valid"));
@@ -675,12 +677,13 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
 
         client.getPartitionedTopicMetadata(topicName)
-            .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions))
-            .exceptionally(ex1 -> {
-                log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
-                subscribeResult.completeExceptionally(ex1);
-                return null;
-            });
+                .thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, topicName, metadata.partitions,
+                        createTopicIfDoesNotExist))
+                .exceptionally(ex1 -> {
+                    log.warn("[{}] Failed to get partitioned topic metadata: {}", topicName, ex1.getMessage());
+                    subscribeResult.completeExceptionally(ex1);
+                    return null;
+                });
 
         return subscribeResult;
     }
@@ -702,7 +705,7 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
 
         CompletableFuture<Consumer> future = new CompletableFuture<>();
         MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, listenerExecutor,
-                future, schema, interceptors);
+                future, schema, interceptors, true /* createTopicIfDoesNotExist */);
 
         future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
             .thenRun(()-> subscribeFuture.complete(consumer))
@@ -728,22 +731,24 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         }
 
         CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
-        subscribeTopicPartitions(subscribeResult, topicName, numberPartitions);
+        subscribeTopicPartitions(subscribeResult, topicName, numberPartitions, true /* createTopicIfDoesNotExist */);
 
         return subscribeResult;
     }
 
-    private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
+    private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
+            boolean createIfDoesNotExist) {
         client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((ignored, cause) -> {
             if (null == cause) {
-                doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions);
+                doSubscribeTopicPartitions(subscribeResult, topicName, numPartitions, createIfDoesNotExist);
             } else {
                 subscribeResult.completeExceptionally(cause);
             }
         });
     }
 
-    private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions) {
+    private void doSubscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
+            boolean createIfDoesNotExist) {
         if (log.isDebugEnabled()) {
             log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
         }
@@ -767,8 +772,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
                                 configurationData, client.externalExecutorProvider().getExecutor(),
                                 partitionIndex, true, subFuture,
-                                SubscriptionMode.Durable, null, schema, interceptors
-                        );
+                                SubscriptionMode.Durable, null, schema, interceptors,
+                                createIfDoesNotExist);
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         return subFuture;
                     })
@@ -780,8 +785,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
             CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
             ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
                     client.externalExecutorProvider().getExecutor(), -1, true, subFuture, SubscriptionMode.Durable, null,
-                    schema, interceptors
-            );
+                    schema, interceptors,
+                    createIfDoesNotExist);
             consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
 
             futureList = Collections.singletonList(subFuture);
@@ -913,6 +918,59 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
         return unsubscribeFuture;
     }
 
+    // Remove a consumer for a topic
+    public CompletableFuture<Void> removeConsumerAsync(String topicName) {
+        checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
+
+        if (getState() == State.Closing || getState() == State.Closed) {
+            return FutureUtil.failedFuture(
+                new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
+        }
+
+        CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
+        String topicPartName = TopicName.get(topicName).getPartitionedTopicName();
+
+
+        List<ConsumerImpl<T>> consumersToClose = consumers.values().stream()
+            .filter(consumer -> {
+                String consumerTopicName = consumer.getTopic();
+                if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
+                    return true;
+                } else {
+                    return false;
+                }
+            }).collect(Collectors.toList());
+
+        List<CompletableFuture<Void>> futureList = consumersToClose.stream()
+            .map(ConsumerImpl::closeAsync).collect(Collectors.toList());
+
+        FutureUtil.waitForAll(futureList)
+            .whenComplete((r, ex) -> {
+                if (ex == null) {
+                    consumersToClose.forEach(consumer1 -> {
+                        consumers.remove(consumer1.getTopic());
+                        pausedConsumers.remove(consumer1);
+                        allTopicPartitionsNumber.decrementAndGet();
+                    });
+
+                    topics.remove(topicName);
+                    ((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
+
+                    unsubscribeFuture.complete(null);
+                    log.info("[{}] [{}] [{}] Removed Topics Consumer, allTopicPartitionsNumber: {}",
+                        topicName, subscription, consumerName, allTopicPartitionsNumber);
+                } else {
+                    unsubscribeFuture.completeExceptionally(ex);
+                    setState(State.Failed);
+                    log.error("[{}] [{}] [{}] Could not remove Topics Consumer",
+                        topicName, subscription, consumerName, ex.getCause());
+                }
+            });
+
+        return unsubscribeFuture;
+    }
+
+
     // get topics name
     public List<String> getTopics() {
         return topics.keySet().stream().collect(Collectors.toList());
@@ -997,8 +1055,8 @@ public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
                         ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
                             client, partitionName, configurationData,
                             client.externalExecutorProvider().getExecutor(),
-                            partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors
-                        );
+                            partitionIndex, true, subFuture, SubscriptionMode.Durable, null, schema, interceptors,
+                            true /* createTopicIfDoesNotExist */);
                         consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
                         if (log.isDebugEnabled()) {
                             log.debug("[{}] create consumer {} for partitionName: {}",
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
index 47e3236..0941a58 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PatternMultiTopicsConsumerImpl.java
@@ -54,7 +54,8 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
                                           ExecutorService listenerExecutor,
                                           CompletableFuture<Consumer<T>> subscribeFuture,
                                           Schema<T> schema, Mode subscriptionMode, ConsumerInterceptors<T> interceptors) {
-        super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors);
+        super(client, conf, listenerExecutor, subscribeFuture, schema, interceptors,
+                false /* createTopicIfDoesNotExist */);
         this.topicsPattern = topicsPattern;
         this.subscriptionMode = subscriptionMode;
 
@@ -129,7 +130,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
             }
 
             List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
-            removedTopics.stream().forEach(topic -> futures.add(unsubscribeAsync(topic)));
+            removedTopics.stream().forEach(topic -> futures.add(removeConsumerAsync(topic)));
             FutureUtil.waitForAll(futures)
                 .thenAccept(finalFuture -> removeFuture.complete(null))
                 .exceptionally(ex -> {
@@ -150,7 +151,7 @@ public class PatternMultiTopicsConsumerImpl<T> extends MultiTopicsConsumerImpl<T
             }
 
             List<CompletableFuture<Void>> futures = Lists.newArrayListWithExpectedSize(topics.size());
-            addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic)));
+            addedTopics.stream().forEach(topic -> futures.add(subscribeAsync(topic, false /* createTopicIfDoesNotExist */)));
             FutureUtil.waitForAll(futures)
                 .thenAccept(finalFuture -> addFuture.complete(null))
                 .exceptionally(ex -> {
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index ea23f46..0728c66 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -350,8 +350,8 @@ public class PulsarClientImpl implements PulsarClient {
             } else {
                 int partitionIndex = TopicName.getPartitionIndex(topic);
                 consumer = ConsumerImpl.newConsumerImpl(PulsarClientImpl.this, topic, conf, listenerThread, partitionIndex, false,
-                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors
-                );
+                        consumerSubscribedFuture, SubscriptionMode.Durable, null, schema, interceptors,
+                        true /* createTopicIfDoesNotExist */);
             }
 
             synchronized (consumers) {
@@ -370,7 +370,8 @@ public class PulsarClientImpl implements PulsarClient {
         CompletableFuture<Consumer<T>> consumerSubscribedFuture = new CompletableFuture<>();
 
         ConsumerBase<T> consumer = new MultiTopicsConsumerImpl<>(PulsarClientImpl.this, conf,
-                externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors);
+                externalExecutorProvider.getExecutor(), consumerSubscribedFuture, schema, interceptors,
+                true /* createTopicIfDoesNotExist */);
 
         synchronized (consumers) {
             consumers.put(consumer, Boolean.TRUE);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
index fd9db1a..9788d84 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java
@@ -83,10 +83,8 @@ public class ReaderImpl<T> implements Reader<T> {
 
         final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName());
         consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, listenerExecutor,
-                partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null
-
-        );
-        
+                partitionIdx, false, consumerFuture, SubscriptionMode.NonDurable, readerConfiguration.getStartMessageId(), schema, null,
+                true /* createTopicIfDoesNotExist */);
     }
 
     @Override
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
index 3b0b257..3d669e7 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java
@@ -49,17 +49,10 @@ public class ZeroQueueConsumerImpl<T> extends ConsumerImpl<T> {
     public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
             ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
             SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
-            ConsumerInterceptors<T> interceptors) {
-    	this(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
-    		 schema, interceptors, Backoff.DEFAULT_INTERVAL_IN_NANOSECONDS, Backoff.MAX_BACKOFF_INTERVAL_NANOSECONDS);
-    }
-
-    public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
-            ExecutorService listenerExecutor, int partitionIndex, boolean hasParentConsumer, CompletableFuture<Consumer<T>> subscribeFuture,
-            SubscriptionMode subscriptionMode, MessageId startMessageId, Schema<T> schema,
-            ConsumerInterceptors<T> interceptors, long backoffIntervalNanos, long maxBackoffIntervalNanos) {
+            ConsumerInterceptors<T> interceptors,
+            boolean createTopicIfDoesNotExist) {
         super(client, topic, conf, listenerExecutor, partitionIndex, hasParentConsumer, subscribeFuture, subscriptionMode, startMessageId,
-              schema, interceptors);
+              schema, interceptors, createTopicIfDoesNotExist);
     }
 
     @Override
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
index 34112e5..6a96215 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -60,8 +60,8 @@ public class ConsumerImplTest {
 
         consumerConf.setSubscriptionName("test-sub");
         consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf,
-                executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null
-        );
+                executorService, -1, false, subscribeFuture, SubscriptionMode.Durable, null, null, null,
+                true);
     }
 
     @Test(invocationTimeOut = 1000)
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
index 816dc85..bc7c264 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImplTest.java
@@ -57,7 +57,7 @@ public class MultiTopicsConsumerImplTest {
 
         MultiTopicsConsumerImpl impl = new MultiTopicsConsumerImpl(
             clientImpl, consumerConfData,
-            listenerExecutor, null, null, null);
+            listenerExecutor, null, null, null, true);
 
         impl.getStats();
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index d6a5bc6..5bd50c0 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -8804,6 +8804,10 @@ public final class PulsarApi {
     // optional bool replicate_subscription_state = 14;
     boolean hasReplicateSubscriptionState();
     boolean getReplicateSubscriptionState();
+    
+    // optional bool force_topic_creation = 15 [default = true];
+    boolean hasForceTopicCreation();
+    boolean getForceTopicCreation();
   }
   public static final class CommandSubscribe extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -9145,6 +9149,16 @@ public final class PulsarApi {
       return replicateSubscriptionState_;
     }
     
+    // optional bool force_topic_creation = 15 [default = true];
+    public static final int FORCE_TOPIC_CREATION_FIELD_NUMBER = 15;
+    private boolean forceTopicCreation_;
+    public boolean hasForceTopicCreation() {
+      return ((bitField0_ & 0x00002000) == 0x00002000);
+    }
+    public boolean getForceTopicCreation() {
+      return forceTopicCreation_;
+    }
+    
     private void initFields() {
       topic_ = "";
       subscription_ = "";
@@ -9160,6 +9174,7 @@ public final class PulsarApi {
       schema_ = org.apache.pulsar.common.api.proto.PulsarApi.Schema.getDefaultInstance();
       initialPosition_ = org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition.Latest;
       replicateSubscriptionState_ = false;
+      forceTopicCreation_ = true;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -9258,6 +9273,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00001000) == 0x00001000)) {
         output.writeBool(14, replicateSubscriptionState_);
       }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        output.writeBool(15, forceTopicCreation_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -9322,6 +9340,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeBoolSize(14, replicateSubscriptionState_);
       }
+      if (((bitField0_ & 0x00002000) == 0x00002000)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(15, forceTopicCreation_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -9463,6 +9485,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00001000);
         replicateSubscriptionState_ = false;
         bitField0_ = (bitField0_ & ~0x00002000);
+        forceTopicCreation_ = true;
+        bitField0_ = (bitField0_ & ~0x00004000);
         return this;
       }
       
@@ -9553,6 +9577,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00001000;
         }
         result.replicateSubscriptionState_ = replicateSubscriptionState_;
+        if (((from_bitField0_ & 0x00004000) == 0x00004000)) {
+          to_bitField0_ |= 0x00002000;
+        }
+        result.forceTopicCreation_ = forceTopicCreation_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -9608,6 +9636,9 @@ public final class PulsarApi {
         if (other.hasReplicateSubscriptionState()) {
           setReplicateSubscriptionState(other.getReplicateSubscriptionState());
         }
+        if (other.hasForceTopicCreation()) {
+          setForceTopicCreation(other.getForceTopicCreation());
+        }
         return this;
       }
       
@@ -9764,6 +9795,11 @@ public final class PulsarApi {
               replicateSubscriptionState_ = input.readBool();
               break;
             }
+            case 120: {
+              bitField0_ |= 0x00004000;
+              forceTopicCreation_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -10227,6 +10263,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool force_topic_creation = 15 [default = true];
+      private boolean forceTopicCreation_ = true;
+      public boolean hasForceTopicCreation() {
+        return ((bitField0_ & 0x00004000) == 0x00004000);
+      }
+      public boolean getForceTopicCreation() {
+        return forceTopicCreation_;
+      }
+      public Builder setForceTopicCreation(boolean value) {
+        bitField0_ |= 0x00004000;
+        forceTopicCreation_ = value;
+        
+        return this;
+      }
+      public Builder clearForceTopicCreation() {
+        bitField0_ = (bitField0_ & ~0x00004000);
+        forceTopicCreation_ = true;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandSubscribe)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 2262225..e3ebde4 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -467,13 +467,15 @@ public class Commands {
             SubType subType, int priorityLevel, String consumerName) {
         return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName,
                 true /* isDurable */, null /* startMessageId */, Collections.emptyMap(), false,
-                false /* isReplicated */, InitialPosition.Earliest, null);
+                false /* isReplicated */, InitialPosition.Earliest, null,
+                true /* createTopicIfDoesNotExist */);
     }
 
     public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId,
             SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId,
             Map<String, String> metadata, boolean readCompacted, boolean isReplicated,
-            InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo) {
+            InitialPosition subscriptionInitialPosition, SchemaInfo schemaInfo,
+            boolean createTopicIfDoesNotExist) {
         CommandSubscribe.Builder subscribeBuilder = CommandSubscribe.newBuilder();
         subscribeBuilder.setTopic(topic);
         subscribeBuilder.setSubscription(subscription);
@@ -486,6 +488,8 @@ public class Commands {
         subscribeBuilder.setReadCompacted(readCompacted);
         subscribeBuilder.setInitialPosition(subscriptionInitialPosition);
         subscribeBuilder.setReplicateSubscriptionState(isReplicated);
+        subscribeBuilder.setForceTopicCreation(createTopicIfDoesNotExist);
+
         if (startMessageId != null) {
             subscribeBuilder.setStartMessageId(startMessageId);
         }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 9cb510d..f4bcec1 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -268,7 +268,7 @@ message CommandSubscribe {
     // Signal wether the subscription should be backed by a
     // durable cursor or not
     optional bool durable = 8 [default = true];
-
+    
     // If specified, the subscription will position the cursor
     // markd-delete position  on the particular message id and
     // will send messages from that point
@@ -292,6 +292,13 @@ message CommandSubscribe {
     // to periodically sync the state of replicated subscriptions
     // across different clusters (when using geo-replication).
     optional bool replicate_subscription_state = 14;
+    
+    // If true, the subscribe operation will cause a topic to be 
+    // created if it does not exist already (and if topic auto-creation
+    // is allowed by broker.
+    // If false, the subscribe operation will fail if the topic 
+    // does not exist.
+    optional bool force_topic_creation = 15 [default = true];
 }
 
 message CommandPartitionedTopicMetadata {