You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/12/18 22:21:24 UTC

[pulsar] branch master updated: PIP-68: WaitForExclusive producer access mode (#8992)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 99476d3  PIP-68: WaitForExclusive producer access mode (#8992)
99476d3 is described below

commit 99476d3196ffe5968d52d571b0be59f8eeeb8fea
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Dec 18 14:20:50 2020 -0800

    PIP-68: WaitForExclusive producer access mode (#8992)
    
    * PIP-68: WaitForExclusive producer access mode
    
    * Fixed checkstyle issues
    
    * Fixed log level to info
    
    * Update pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
---
 .../pulsar/broker/service/AbstractTopic.java       | 102 +++++++++++++++++---
 .../pulsar/broker/service/PulsarCommandSender.java |   3 +-
 .../broker/service/PulsarCommandSenderImpl.java    |   5 +-
 .../apache/pulsar/broker/service/ServerCnx.java    |  16 ++-
 .../org/apache/pulsar/broker/service/Topic.java    |   4 +-
 .../service/nonpersistent/NonPersistentTopic.java  |   1 +
 .../broker/service/persistent/PersistentTopic.java |   8 +-
 .../broker/service/ExclusiveProducerTest.java      | 107 +++++++++++++++++++--
 .../pulsar/broker/service/PersistentTopicTest.java |  32 +++---
 .../pulsar/client/api/ProducerAccessMode.java      |   9 +-
 .../org/apache/pulsar/client/impl/ClientCnx.java   |  34 +++++--
 .../pulsar/client/util/TimedCompletableFuture.java |  29 +++---
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  57 +++++++++++
 .../pulsar/common/policies/data/TopicStats.java    |   4 +
 .../apache/pulsar/common/protocol/Commands.java    |  20 ++--
 pulsar-common/src/main/proto/PulsarApi.proto       |   5 +
 16 files changed, 347 insertions(+), 89 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index bbbde6c..32e074e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -25,8 +25,10 @@ import com.google.common.base.MoreObjects;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
@@ -34,6 +36,7 @@ import java.util.concurrent.atomic.LongAdder;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.bookkeeper.mledger.util.StatsBuckets;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
@@ -111,6 +114,8 @@ public abstract class AbstractTopic implements Topic {
 
     protected volatile Optional<Long> topicEpoch = Optional.empty();
     private volatile boolean hasExclusiveProducer;
+    private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers =
+            new ConcurrentLinkedQueue<>();
 
     private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER =
             AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
@@ -337,14 +342,15 @@ public abstract class AbstractTopic implements Topic {
     }
 
     @Override
-    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer,
+            CompletableFuture<Void> producerQueuedFuture) {
         checkArgument(producer.getTopic() == this);
 
         CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
 
-        incrementTopicEpochIfNeeded(producer)
-                .thenAccept(epoch -> {
-                    lock.readLock().lock();
+        incrementTopicEpochIfNeeded(producer, producerQueuedFuture)
+                .thenAccept(producerEpoch -> {
+                    lock.writeLock().lock();
                     try {
                         brokerService.checkTopicNsOwnership(getName());
                         checkTopicFenced();
@@ -360,11 +366,11 @@ public abstract class AbstractTopic implements Topic {
                                     USAGE_COUNT_UPDATER.get(this));
                         }
 
-                        future.complete(epoch);
+                        future.complete(producerEpoch);
                     } catch (Throwable e) {
                         future.completeExceptionally(e);
                     } finally {
-                        lock.readLock().unlock();
+                        lock.writeLock().unlock();
                     }
                 }).exceptionally(ex -> {
                     future.completeExceptionally(ex);
@@ -374,12 +380,13 @@ public abstract class AbstractTopic implements Topic {
         return future;
     }
 
-    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer) {
+    protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer,
+            CompletableFuture<Void> producerQueuedFuture) {
         lock.writeLock().lock();
         try {
             switch (producer.getAccessMode()) {
             case Shared:
-                if (hasExclusiveProducer) {
+                if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
                     return FutureUtil.failedFuture(new ProducerBusyException(
                             "Topic has an existing exclusive producer: " + producers.keys().nextElement()));
                 } else {
@@ -388,7 +395,7 @@ public abstract class AbstractTopic implements Topic {
                 }
 
             case Exclusive:
-                if (hasExclusiveProducer) {
+                if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
                     return FutureUtil.failedFuture(new ProducerFencedException(
                             "Topic has an existing exclusive producer: " + producers.keys().nextElement()));
                 } else if (!producers.isEmpty()) {
@@ -410,17 +417,52 @@ public abstract class AbstractTopic implements Topic {
                     } else {
                         future = incrementTopicEpoch(topicEpoch);
                     }
+                    future.exceptionally(ex -> {
+                        hasExclusiveProducer = false;
+                        return null;
+                    });
+
                     return future.thenApply(epoch -> {
                         topicEpoch = Optional.of(epoch);
                         return topicEpoch;
-                    }).exceptionally(ex -> {
+                    });
+                }
+
+            case WaitForExclusive: {
+                if (hasExclusiveProducer || !producers.isEmpty()) {
+                    CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
+                    log.info("[{}] Queuing producer {} since there's already a producer", topic, producer);
+                    waitingExclusiveProducers.add(Pair.of(producer, future));
+                    producerQueuedFuture.complete(null);
+                    return future;
+                } else if (producer.getTopicEpoch().isPresent()
+                        && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
+                    // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
+                    // to be fenced, because a new producer had been present in between.
+                    return FutureUtil.failedFuture(new ProducerFencedException(
+                            String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
+                                    topicEpoch.get(), producer.getTopicEpoch().get())));
+                } else {
+                    // There are currently no existing producers
+                    hasExclusiveProducer = true;
+
+                    CompletableFuture<Long> future;
+                    if (producer.getTopicEpoch().isPresent()) {
+                        future = setTopicEpoch(producer.getTopicEpoch().get());
+                    } else {
+                        future = incrementTopicEpoch(topicEpoch);
+                    }
+                    future.exceptionally(ex -> {
                         hasExclusiveProducer = false;
                         return null;
                     });
-                }
 
-                // case WaitForExclusive:
-                // TODO: Implementation
+                    return future.thenApply(epoch -> {
+                        topicEpoch = Optional.of(epoch);
+                        return topicEpoch;
+                    });
+                }
+            }
 
             default:
                 return FutureUtil.failedFuture(
@@ -584,7 +626,35 @@ public abstract class AbstractTopic implements Topic {
         // decrement usage only if this was a valid producer close
         long newCount = USAGE_COUNT_UPDATER.decrementAndGet(this);
         if (newCount == 0) {
-            hasExclusiveProducer = false;
+            lock.writeLock().lock();
+            try {
+                hasExclusiveProducer = false;
+                Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer =
+                        waitingExclusiveProducers.poll();
+                if (nextWaitingProducer != null) {
+                    Producer nextProducer = nextWaitingProducer.getKey();
+                    CompletableFuture<Optional<Long>> producerFuture = nextWaitingProducer.getValue();
+                    hasExclusiveProducer = true;
+
+                    CompletableFuture<Long> future;
+                    if (nextProducer.getTopicEpoch().isPresent()) {
+                        future = setTopicEpoch(nextProducer.getTopicEpoch().get());
+                    } else {
+                        future = incrementTopicEpoch(topicEpoch);
+                    }
+
+                    future.thenAccept(epoch -> {
+                        topicEpoch = Optional.of(epoch);
+                        producerFuture.complete(topicEpoch);
+                    }).exceptionally(ex -> {
+                        hasExclusiveProducer = false;
+                        producerFuture.completeExceptionally(ex);
+                        return null;
+                    });
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
         }
         if (log.isDebugEnabled()) {
             log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
@@ -733,6 +803,10 @@ public abstract class AbstractTopic implements Topic {
         }
     }
 
+    protected int getWaitingProducersCount() {
+        return waitingExclusiveProducers.size();
+    }
+
     protected boolean isExceedMaximumMessageSize(int size) {
         Integer maxMessageSize = null;
         TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
index fd841ad..cece56b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSender.java
@@ -39,7 +39,8 @@ public interface PulsarCommandSender {
     void sendProducerSuccessResponse(long requestId, String producerName, SchemaVersion schemaVersion);
 
     void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
-                                     SchemaVersion schemaVersion, Optional<Long> topicEpoch);
+                                     SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+                                     boolean isProducerReady);
 
     void sendSendReceiptResponse(long producerId, long sequenceId, long highestId, long ledgerId,
                                  long entryId);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
index 2d199be..5d812c4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PulsarCommandSenderImpl.java
@@ -98,9 +98,10 @@ public class PulsarCommandSenderImpl implements PulsarCommandSender {
 
     @Override
     public void sendProducerSuccessResponse(long requestId, String producerName, long lastSequenceId,
-                                            SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
+                                            SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+                                            boolean isProducerReady) {
         PulsarApi.BaseCommand command = Commands.newProducerSuccessCommand(requestId, producerName, lastSequenceId,
-                schemaVersion, topicEpoch);
+                schemaVersion, topicEpoch, isProducerReady);
         safeIntercept(command, cnx);
         ByteBuf outBuf = Commands.serializeWithSize(command);
         command.getProducerSuccess().recycle();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index cd73845..9b69244 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1152,17 +1152,18 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                             });
 
                             schemaVersionFuture.thenAccept(schemaVersion -> {
+                                CompletableFuture<Void> producerQueuedFuture = new CompletableFuture<>();
                                 Producer producer = new Producer(topic, ServerCnx.this, producerId, producerName,
                                         getPrincipal(), isEncrypted, metadata, schemaVersion, epoch,
                                         userProvidedProducerName, producerAccessMode, topicEpoch);
 
-                                topic.addProducer(producer).thenAccept(newTopicEpoch -> {
+                                topic.addProducer(producer, producerQueuedFuture).thenAccept(newTopicEpoch -> {
                                     if (isActive()) {
                                         if (producerFuture.complete(producer)) {
                                             log.info("[{}] Created new producer: {}", remoteAddress, producer);
                                             commandSender.sendProducerSuccessResponse(requestId, producerName,
                                                     producer.getLastSequenceId(), producer.getSchemaVersion(),
-                                                    newTopicEpoch);
+                                                    newTopicEpoch, true /* producer is ready now */);
                                             return;
                                         } else {
                                             // The producer's future was completed before by
@@ -1192,6 +1193,17 @@ public class ServerCnx extends PulsarHandler implements TransportCnx {
                                     }
                                     return null;
                                 });
+
+                                producerQueuedFuture.thenRun(() -> {
+                                    // If the producer is queued waiting, we will get an immediate notification
+                                    // that we need to pass to client
+                                    if (isActive()) {
+                                        log.info("[{}] Producer is waiting in queue: {}", remoteAddress, producer);
+                                        commandSender.sendProducerSuccessResponse(requestId, producerName,
+                                                producer.getLastSequenceId(), producer.getSchemaVersion(),
+                                                Optional.empty(), false/* producer is not ready now */);
+                                    }
+                                });
                             });
                         }).exceptionally(exception -> {
                             Throwable cause = exception.getCause();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 9e50fcf..af88ce8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -98,9 +98,11 @@ public interface Topic {
      * Tries to add a producer to the topic. Several validations will be performed.
      *
      * @param producer
+     * @param producerQueuedFuture
+     *            a future that will be triggered if the producer is being queued up prior of getting established
      * @return the "topic epoch" if there is one or empty
      */
-    CompletableFuture<Optional<Long>> addProducer(Producer producer);
+    CompletableFuture<Optional<Long>> addProducer(Producer producer, CompletableFuture<Void> producerQueuedFuture);
 
     void removeProducer(Producer producer);
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 004e656..ac49a96 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -761,6 +761,7 @@ public class NonPersistentTopic extends AbstractTopic implements Topic {
         stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
         stats.msgInCounter = getMsgInCounter();
         stats.bytesInCounter = getBytesInCounter();
+        stats.waitingPublishers = getWaitingProducersCount();
 
         subscriptions.forEach((name, subscription) -> {
             NonPersistentSubscriptionStats subStats = subscription.getStats();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 938dde5..676f98c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -496,13 +496,14 @@ public class PersistentTopic extends AbstractTopic
     }
 
     @Override
-    public CompletableFuture<Optional<Long>> addProducer(Producer producer) {
-        return super.addProducer(producer).thenApply(epoch -> {
+    public CompletableFuture<Optional<Long>> addProducer(Producer producer,
+            CompletableFuture<Void> producerQueuedFuture) {
+        return super.addProducer(producer, producerQueuedFuture).thenApply(topicEpoch -> {
             messageDeduplication.producerAdded(producer.getProducerName());
 
             // Start replication producers if not already
             startReplProducers();
-            return epoch;
+            return topicEpoch;
         });
     }
 
@@ -1642,6 +1643,7 @@ public class PersistentTopic extends AbstractTopic
         stats.msgInCounter = getMsgInCounter();
         stats.bytesInCounter = getBytesInCounter();
         stats.msgChunkPublished = this.msgChunkPublished;
+        stats.waitingPublishers = getWaitingProducersCount();
 
         subscriptions.forEach((name, subscription) -> {
             SubscriptionStats subStats = subscription.getStats(getPreciseBacklog);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 0ab3b70..75340c1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -18,15 +18,18 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.fail;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 
 import lombok.Cleanup;
 
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerAccessMode;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerBusyException;
 import org.apache.pulsar.client.api.PulsarClientException.ProducerFencedException;
 import org.apache.pulsar.client.api.Schema;
@@ -59,9 +62,15 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         };
     }
 
-    @DataProvider(name = "partitioned")
-    public static Object[][] partitioned() {
-        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    @DataProvider(name = "accessMode")
+    public static Object[][] accessMode() {
+        return new Object[][] {
+                // ProducerAccessMode, partitioned
+                { ProducerAccessMode.Exclusive, Boolean.TRUE},
+                { ProducerAccessMode.Exclusive, Boolean.FALSE },
+                { ProducerAccessMode.WaitForExclusive, Boolean.TRUE },
+                { ProducerAccessMode.WaitForExclusive, Boolean.FALSE },
+        };
     }
 
     @Test(dataProvider = "topics")
@@ -124,13 +133,13 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         }
     }
 
-    @Test(dataProvider = "topics")
-    public void producerReconnection(String type, boolean partitioned) throws Exception {
-        String topic = newTopic(type, partitioned);
+    @Test(dataProvider = "accessMode")
+    public void producerReconnection(ProducerAccessMode accessMode, boolean partitioned) throws Exception {
+        String topic = newTopic("persistent", partitioned);
 
         Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
-                .accessMode(ProducerAccessMode.Exclusive)
+                .accessMode(accessMode)
                 .create();
 
         p1.send("msg-1");
@@ -140,13 +149,13 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         p1.send("msg-2");
     }
 
-    @Test(dataProvider = "partitioned")
-    public void producerFenced(boolean partitioned) throws Exception {
+    @Test(dataProvider = "accessMode")
+    public void producerFenced(ProducerAccessMode accessMode, boolean partitioned) throws Exception {
         String topic = newTopic("persistent", partitioned);
 
         Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
                 .topic(topic)
-                .accessMode(ProducerAccessMode.Exclusive)
+                .accessMode(accessMode)
                 .create();
 
         p1.send("msg-1");
@@ -200,6 +209,84 @@ public class ExclusiveProducerTest extends BrokerTestBase {
         p1.send("msg-2");
     }
 
+    @Test(dataProvider = "topics")
+    public void waitForExclusiveTest(String type, boolean partitioned) throws Exception {
+        String topic = newTopic(type, partitioned);
+
+        Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p1")
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .create();
+
+        CompletableFuture<Producer<String>> fp2 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p2")
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .createAsync();
+
+        // This sleep is made to ensure P2 is enqueued before P3. Because of the lookups
+        // there's no strict guarantee they would be attempted in the same order otherwise
+        Thread.sleep(1000);
+
+        CompletableFuture<Producer<String>> fp3 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .producerName("p3")
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .createAsync();
+
+        Thread.sleep(1000);
+        // The second producer should get queued
+        assertFalse(fp2.isDone());
+        assertFalse(fp3.isDone());
+
+        p1.close();
+
+        // Now P2 should get created
+        Producer<String> p2 = fp2.get(1, TimeUnit.SECONDS);
+
+        assertFalse(fp3.isDone());
+
+        p2.close();
+
+        // Now P3 should get created
+        Producer<String> p3 = fp3.get(1, TimeUnit.SECONDS);
+        p3.close();
+    }
+
+    @Test(dataProvider = "topics")
+    public void waitForExclusiveWithClientTimeout(String type, boolean partitioned) throws Exception {
+        String topic = newTopic(type, partitioned);
+
+        @Cleanup
+        PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .operationTimeout(1, TimeUnit.SECONDS)
+                .build();
+
+        Producer<String> p1 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .create();
+
+        CompletableFuture<Producer<String>> fp2 = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .accessMode(ProducerAccessMode.WaitForExclusive)
+                .createAsync();
+
+        // Wait enough time to have caused an operation timeout on p2
+        Thread.sleep(2000);
+
+        // There should be timeout error, since the broker should reply immediately
+        // with the instruction to wait
+        assertFalse(fp2.isDone());
+
+        p1.close();
+
+        // Now P2 should get created
+        fp2.get(1, TimeUnit.SECONDS);
+    }
+
     private String newTopic(String type, boolean isPartitioned) throws Exception {
         String topic = type + "://" + newTopicName();
         if (isPartitioned) {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
index 6117c8f..5ca7123 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicTest.java
@@ -374,12 +374,12 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
                 role, false, null, SchemaVersion.Latest, 0, false,
                 ProducerAccessMode.Shared, Optional.empty());
-        topic.addProducer(producer);
+        topic.addProducer(producer, new CompletableFuture<>());
         assertEquals(topic.getProducers().size(), 1);
 
         // 2. duplicate add
         try {
-            topic.addProducer(producer).join();
+            topic.addProducer(producer, new CompletableFuture<>()).join();
             fail("Should have failed with naming exception because producer 'null' is already connected to the topic");
         } catch (Exception e) {
             assertEquals(e.getCause().getClass(), BrokerServiceException.NamingException.class);
@@ -392,7 +392,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
                 role, false, null, SchemaVersion.Latest,0, false,
                 ProducerAccessMode.Shared, Optional.empty());
         try {
-            topic.addProducer(failProducer);
+            topic.addProducer(failProducer, new CompletableFuture<>());
             fail("should have failed");
         } catch (IllegalArgumentException e) {
             // OK
@@ -415,8 +415,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
                 role, false, null, SchemaVersion.Latest, 0, true, ProducerAccessMode.Shared, Optional.empty());
         try {
-            topic.addProducer(producer1).join();
-            topic.addProducer(producer2).join();
+            topic.addProducer(producer1, new CompletableFuture<>()).join();
+            topic.addProducer(producer2, new CompletableFuture<>()).join();
             fail("should have failed");
         } catch (Exception e) {
             // OK
@@ -429,7 +429,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
                 role, false, null, SchemaVersion.Latest, 1, false, ProducerAccessMode.Shared, Optional.empty());
 
         try {
-            topic.addProducer(producer3).join();
+            topic.addProducer(producer3, new CompletableFuture<>()).join();
             fail("should have failed");
         } catch (Exception e) {
             // OK
@@ -444,8 +444,8 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         Producer producer4 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name",
                 role, false, null, SchemaVersion.Latest, 2, false, ProducerAccessMode.Shared, Optional.empty());
 
-        topic.addProducer(producer3);
-        topic.addProducer(producer4);
+        topic.addProducer(producer3, new CompletableFuture<>());
+        topic.addProducer(producer4, new CompletableFuture<>());
 
         Assert.assertEquals(topic.getProducers().size(), 1);
 
@@ -457,13 +457,13 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         Producer producer5 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
                 role, false, null, SchemaVersion.Latest, 1, false, ProducerAccessMode.Shared, Optional.empty());
 
-        topic.addProducer(producer5);
+        topic.addProducer(producer5, new CompletableFuture<>());
         Assert.assertEquals(topic.getProducers().size(), 1);
 
         Producer producer6 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
                 role, false, null, SchemaVersion.Latest, 2, false, ProducerAccessMode.Shared, Optional.empty());
 
-        topic.addProducer(producer6);
+        topic.addProducer(producer6, new CompletableFuture<>());
         Assert.assertEquals(topic.getProducers().size(), 1);
 
         topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 2));
@@ -471,7 +471,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         Producer producer7 = new Producer(topic, serverCnx, 2 /* producer id */, "pulsar.repl.cluster1",
                 role, false, null, SchemaVersion.Latest, 3, true, ProducerAccessMode.Shared, Optional.empty());
 
-        topic.addProducer(producer7);
+        topic.addProducer(producer7, new CompletableFuture<>());
         Assert.assertEquals(topic.getProducers().size(), 1);
         topic.getProducers().values().forEach(producer -> Assert.assertEquals(producer.getEpoch(), 3));
     }
@@ -482,20 +482,20 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         // 1. add producer1
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name1", role,
                 false, null, SchemaVersion.Latest,0, false, ProducerAccessMode.Shared, Optional.empty());
-        topic.addProducer(producer);
+        topic.addProducer(producer, new CompletableFuture<>());
         assertEquals(topic.getProducers().size(), 1);
 
         // 2. add producer2
         Producer producer2 = new Producer(topic, serverCnx, 2 /* producer id */, "prod-name2", role,
                 false, null, SchemaVersion.Latest,0, false, ProducerAccessMode.Shared, Optional.empty());
-        topic.addProducer(producer2);
+        topic.addProducer(producer2, new CompletableFuture<>());
         assertEquals(topic.getProducers().size(), 2);
 
         // 3. add producer3 but reached maxProducersPerTopic
         try {
             Producer producer3 = new Producer(topic, serverCnx, 3 /* producer id */, "prod-name3", role,
                     false, null, SchemaVersion.Latest,0, false, ProducerAccessMode.Shared, Optional.empty());
-            topic.addProducer(producer3).join();
+            topic.addProducer(producer3, new CompletableFuture<>()).join();
             fail("should have failed");
         } catch (Exception e) {
             assertEquals(e.getCause().getClass(), BrokerServiceException.ProducerBusyException.class);
@@ -953,7 +953,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
         topic = (PersistentTopic) brokerService.getOrCreateTopic(successTopicName).get();
         Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
                 role, false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty());
-        topic.addProducer(producer).join();
+        topic.addProducer(producer, new CompletableFuture<>()).join();
 
         assertTrue(topic.delete().isCompletedExceptionally());
         assertFalse((boolean) isFencedField.get(topic));
@@ -1116,7 +1116,7 @@ public class PersistentTopicTest extends MockedBookKeeperTestCase {
             Thread.sleep(10); /* delay to ensure that the delete gets executed first */
             Producer producer = new Producer(topic, serverCnx, 1 /* producer id */, "prod-name",
                     role, false, null, SchemaVersion.Latest, 0, false, ProducerAccessMode.Shared, Optional.empty());
-            topic.addProducer(producer).join();
+            topic.addProducer(producer, new CompletableFuture<>()).join();
             fail("Should have failed");
         } catch (Exception e) {
             assertEquals(e.getCause().getClass(), BrokerServiceException.TopicFencedException.class);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
index 1a69da8..85199e7 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
@@ -33,9 +33,8 @@ public enum ProducerAccessMode {
      */
     Exclusive,
 
-// TODO
-//    /**
-//     * Producer creation is pending until it can acquire exclusive access.
-//     */
-//    WaitForExclusive,
+    /**
+     * Producer creation is pending until it can acquire exclusive access.
+     */
+    WaitForExclusive,
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
index 3c66065..4f5b3ed 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientCnx.java
@@ -91,6 +91,7 @@ import org.apache.pulsar.common.protocol.schema.SchemaInfoUtil;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap;
 import org.apache.pulsar.client.impl.transaction.TransactionBufferHandler;
+import org.apache.pulsar.client.util.TimedCompletableFuture;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -100,10 +101,10 @@ public class ClientCnx extends PulsarHandler {
     protected final Authentication authentication;
     private State state;
 
-    private final ConcurrentLongHashMap<CompletableFuture<? extends Object>> pendingRequests =
+    private final ConcurrentLongHashMap<TimedCompletableFuture<? extends Object>> pendingRequests =
         new ConcurrentLongHashMap<>(16, 1);
     // LookupRequests that waiting in client side.
-    private final Queue<Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>>> waitingLookupRequests;
+    private final Queue<Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>>> waitingLookupRequests;
 
     private final ConcurrentLongHashMap<ProducerImpl<?>> producers = new ConcurrentLongHashMap<>(16, 1);
     private final ConcurrentLongHashMap<ConsumerImpl<?>> consumers = new ConcurrentLongHashMap<>(16, 1);
@@ -468,6 +469,19 @@ public class ClientCnx extends PulsarHandler {
                     success.getRequestId(), success.getProducerName());
         }
         long requestId = success.getRequestId();
+        if (!success.getProducerReady()) {
+            // We got a success operation but the producer is not ready. This means that the producer has been queued up
+            // in broker. We need to leave the future pending until we get the final confirmation. We just mark that
+            // we have received a response, in order to avoid the timeout.
+            TimedCompletableFuture<?> requestFuture = (TimedCompletableFuture<?>) pendingRequests.get(requestId);
+            if (requestFuture != null) {
+                log.info("{} Producer {} has been queued up at broker. request: {}", ctx.channel(),
+                        success.getProducerName(), requestId);
+                requestFuture.markAsResponded();
+            }
+            return;
+        }
+
         CompletableFuture<ProducerResponse> requestFuture = (CompletableFuture<ProducerResponse>) pendingRequests.remove(requestId);
         if (requestFuture != null) {
             ProducerResponse pr = new ProducerResponse(success.getProducerName(),
@@ -564,7 +578,7 @@ public class ClientCnx extends PulsarHandler {
     }
 
     // caller of this method needs to be protected under pendingLookupRequestSemaphore
-    private void addPendingLookupRequests(long requestId, CompletableFuture<LookupDataResult> future) {
+    private void addPendingLookupRequests(long requestId, TimedCompletableFuture<LookupDataResult> future) {
         pendingRequests.put(requestId, future);
         eventLoopGroup.schedule(() -> {
             if (!future.isDone()) {
@@ -577,13 +591,13 @@ public class ClientCnx extends PulsarHandler {
     private CompletableFuture<LookupDataResult> getAndRemovePendingLookupRequest(long requestId) {
         CompletableFuture<LookupDataResult> result = (CompletableFuture<LookupDataResult>) pendingRequests.remove(requestId);
         if (result != null) {
-            Pair<Long, Pair<ByteBuf, CompletableFuture<LookupDataResult>>> firstOneWaiting = waitingLookupRequests.poll();
+            Pair<Long, Pair<ByteBuf, TimedCompletableFuture<LookupDataResult>>> firstOneWaiting = waitingLookupRequests.poll();
             if (firstOneWaiting != null) {
                 maxLookupRequestSemaphore.release();
                 // schedule a new lookup in.
                 eventLoopGroup.submit(() -> {
                     long newId = firstOneWaiting.getLeft();
-                    CompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
+                    TimedCompletableFuture<LookupDataResult> newFuture = firstOneWaiting.getRight().getRight();
                     addPendingLookupRequests(newId, newFuture);
                     ctx.writeAndFlush(firstOneWaiting.getRight().getLeft()).addListener(writeFuture -> {
                         if (!writeFuture.isSuccess()) {
@@ -680,7 +694,7 @@ public class ClientCnx extends PulsarHandler {
     }
 
     public CompletableFuture<LookupDataResult> newLookup(ByteBuf request, long requestId) {
-        CompletableFuture<LookupDataResult> future = new CompletableFuture<>();
+        TimedCompletableFuture<LookupDataResult> future = new TimedCompletableFuture<>();
 
         if (pendingLookupRequestSemaphore.tryAcquire()) {
             addPendingLookupRequests(requestId, future);
@@ -789,7 +803,7 @@ public class ClientCnx extends PulsarHandler {
     }
 
     private <T> CompletableFuture<T> sendRequestAndHandleTimeout(ByteBuf requestMessage, long requestId, RequestType requestType) {
-        CompletableFuture<T> future = new CompletableFuture<>();
+        TimedCompletableFuture<T> future = new TimedCompletableFuture<>();
         pendingRequests.put(requestId, future);
         ctx.writeAndFlush(requestMessage).addListener(writeFuture -> {
             if (!writeFuture.isSuccess()) {
@@ -1064,8 +1078,10 @@ public class ClientCnx extends PulsarHandler {
                 break;
             }
             request = requestTimeoutQueue.poll();
-            CompletableFuture<?> requestFuture = pendingRequests.remove(request.requestId);
-            if (requestFuture != null && !requestFuture.isDone()) {
+            TimedCompletableFuture<?> requestFuture = pendingRequests.remove(request.requestId);
+            if (requestFuture != null
+                    && !requestFuture.isDone()
+                    && !requestFuture.hasGotResponse()) {
                 String timeoutMessage = String.format("%d %s timedout after ms %d", request.requestId, request.requestType.getDescription(), operationTimeoutMs);
                 if (requestFuture.completeExceptionally(new TimeoutException(timeoutMessage))) {
                     log.warn("{} {}", ctx.channel(), timeoutMessage);
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TimedCompletableFuture.java
similarity index 62%
copy from pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
copy to pulsar-client/src/main/java/org/apache/pulsar/client/util/TimedCompletableFuture.java
index 1a69da8..30c1eb3 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerAccessMode.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/util/TimedCompletableFuture.java
@@ -16,26 +16,19 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.pulsar.client.util;
 
-package org.apache.pulsar.client.api;
+import java.util.concurrent.CompletableFuture;
 
-/**
- * The type of access to the topic that the producer requires.
- */
-public enum ProducerAccessMode {
-    /**
-     * By default multiple producers can publish on a topic.
-     */
-    Shared,
+public class TimedCompletableFuture<T> extends CompletableFuture<T> {
+
+    private volatile boolean hasGotResponse = false;
 
-    /**
-     * Require exclusive access for producer. Fail immediately if there's already a producer connected.
-     */
-    Exclusive,
+    public void markAsResponded() {
+        this.hasGotResponse = true;
+    }
 
-// TODO
-//    /**
-//     * Producer creation is pending until it can acquire exclusive access.
-//     */
-//    WaitForExclusive,
+    public boolean hasGotResponse() {
+        return this.hasGotResponse;
+    }
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 3d5d422..aa9f2c1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -25116,6 +25116,10 @@ public final class PulsarApi {
     // optional uint64 topic_epoch = 5;
     boolean hasTopicEpoch();
     long getTopicEpoch();
+    
+    // optional bool producer_ready = 6 [default = true];
+    boolean hasProducerReady();
+    boolean getProducerReady();
   }
   public static final class CommandProducerSuccess extends
       org.apache.pulsar.shaded.com.google.protobuf.v241.GeneratedMessageLite
@@ -25224,12 +25228,23 @@ public final class PulsarApi {
       return topicEpoch_;
     }
     
+    // optional bool producer_ready = 6 [default = true];
+    public static final int PRODUCER_READY_FIELD_NUMBER = 6;
+    private boolean producerReady_;
+    public boolean hasProducerReady() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public boolean getProducerReady() {
+      return producerReady_;
+    }
+    
     private void initFields() {
       requestId_ = 0L;
       producerName_ = "";
       lastSequenceId_ = -1L;
       schemaVersion_ = org.apache.pulsar.shaded.com.google.protobuf.v241.ByteString.EMPTY;
       topicEpoch_ = 0L;
+      producerReady_ = true;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -25271,6 +25286,9 @@ public final class PulsarApi {
       if (((bitField0_ & 0x00000010) == 0x00000010)) {
         output.writeUInt64(5, topicEpoch_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeBool(6, producerReady_);
+      }
     }
     
     private int memoizedSerializedSize = -1;
@@ -25299,6 +25317,10 @@ public final class PulsarApi {
         size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
           .computeUInt64Size(5, topicEpoch_);
       }
+      if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        size += org.apache.pulsar.shaded.com.google.protobuf.v241.CodedOutputStream
+          .computeBoolSize(6, producerReady_);
+      }
       memoizedSerializedSize = size;
       return size;
     }
@@ -25422,6 +25444,8 @@ public final class PulsarApi {
         bitField0_ = (bitField0_ & ~0x00000008);
         topicEpoch_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000010);
+        producerReady_ = true;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
       
@@ -25475,6 +25499,10 @@ public final class PulsarApi {
           to_bitField0_ |= 0x00000010;
         }
         result.topicEpoch_ = topicEpoch_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000020;
+        }
+        result.producerReady_ = producerReady_;
         result.bitField0_ = to_bitField0_;
         return result;
       }
@@ -25496,6 +25524,9 @@ public final class PulsarApi {
         if (other.hasTopicEpoch()) {
           setTopicEpoch(other.getTopicEpoch());
         }
+        if (other.hasProducerReady()) {
+          setProducerReady(other.getProducerReady());
+        }
         return this;
       }
       
@@ -25558,6 +25589,11 @@ public final class PulsarApi {
               topicEpoch_ = input.readUInt64();
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000020;
+              producerReady_ = input.readBool();
+              break;
+            }
           }
         }
       }
@@ -25687,6 +25723,27 @@ public final class PulsarApi {
         return this;
       }
       
+      // optional bool producer_ready = 6 [default = true];
+      private boolean producerReady_ = true;
+      public boolean hasProducerReady() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public boolean getProducerReady() {
+        return producerReady_;
+      }
+      public Builder setProducerReady(boolean value) {
+        bitField0_ |= 0x00000020;
+        producerReady_ = value;
+        
+        return this;
+      }
+      public Builder clearProducerReady() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        producerReady_ = true;
+        
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:pulsar.proto.CommandProducerSuccess)
     }
     
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
index a287897..9633e0f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/TopicStats.java
@@ -69,6 +69,8 @@ public class TopicStats {
     /** List of connected publishers on this topic w/ their stats. */
     public List<PublisherStats> publishers;
 
+    public int waitingPublishers;
+
     /** Map of subscriptions with their individual statistics. */
     public Map<String, SubscriptionStats> subscriptions;
 
@@ -107,6 +109,7 @@ public class TopicStats {
         this.msgOutCounter = 0;
         this.publishers.clear();
         this.subscriptions.clear();
+        this.waitingPublishers = 0;
         this.replication.clear();
         this.deduplicationStatus = null;
         this.topicEpoch = null;
@@ -127,6 +130,7 @@ public class TopicStats {
         this.msgInCounter += stats.msgInCounter;
         this.bytesOutCounter += stats.bytesOutCounter;
         this.msgOutCounter += stats.msgOutCounter;
+        this.waitingPublishers += stats.waitingPublishers;
         double newAverageMsgSize = (this.averageMsgSize * (this.count - 1) + stats.averageMsgSize) / this.count;
         this.averageMsgSize = newAverageMsgSize;
         this.storageSize += stats.storageSize;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
index 4e8c058..7ec9132 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java
@@ -364,20 +364,22 @@ public class Commands {
 
     public static BaseCommand newProducerSuccessCommand(long requestId, String producerName,
             SchemaVersion schemaVersion) {
-        return newProducerSuccessCommand(requestId, producerName, -1, schemaVersion, Optional.empty());
+        return newProducerSuccessCommand(requestId, producerName, -1, schemaVersion, Optional.empty(), true);
     }
 
     public static ByteBuf newProducerSuccess(long requestId, String producerName, SchemaVersion schemaVersion) {
-        return newProducerSuccess(requestId, producerName, -1, schemaVersion, Optional.empty());
+        return newProducerSuccess(requestId, producerName, -1, schemaVersion, Optional.empty(), true);
     }
 
     public static BaseCommand newProducerSuccessCommand(long requestId, String producerName, long lastSequenceId,
-                                                 SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
+                                                 SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+                                                 boolean isProducerReady) {
         CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder();
         producerSuccessBuilder.setRequestId(requestId);
         producerSuccessBuilder.setProducerName(producerName);
         producerSuccessBuilder.setLastSequenceId(lastSequenceId);
         producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes()));
+        producerSuccessBuilder.setProducerReady(isProducerReady);
         topicEpoch.ifPresent(producerSuccessBuilder::setTopicEpoch);
         CommandProducerSuccess producerSuccess = producerSuccessBuilder.build();
         BaseCommand.Builder builder = BaseCommand.newBuilder();
@@ -388,12 +390,14 @@ public class Commands {
     }
 
     public static ByteBuf newProducerSuccess(long requestId, String producerName, long lastSequenceId,
-        SchemaVersion schemaVersion, Optional<Long> topicEpoch) {
+        SchemaVersion schemaVersion, Optional<Long> topicEpoch,
+        boolean isProducerReady) {
         CommandProducerSuccess.Builder producerSuccessBuilder = CommandProducerSuccess.newBuilder();
         producerSuccessBuilder.setRequestId(requestId);
         producerSuccessBuilder.setProducerName(producerName);
         producerSuccessBuilder.setLastSequenceId(lastSequenceId);
         producerSuccessBuilder.setSchemaVersion(ByteString.copyFrom(schemaVersion.bytes()));
+        producerSuccessBuilder.setProducerReady(isProducerReady);
         topicEpoch.ifPresent(producerSuccessBuilder::setTopicEpoch);
         CommandProducerSuccess producerSuccess = producerSuccessBuilder.build();
         ByteBuf res = serializeWithSize(
@@ -2251,8 +2255,8 @@ public class Commands {
             return PulsarApi.ProducerAccessMode.Exclusive;
         case Shared:
             return PulsarApi.ProducerAccessMode.Shared;
-//        case WaitForExclusive:
-//            return PulsarApi.ProducerAccessMode.WaitForExclusive;
+        case WaitForExclusive:
+            return PulsarApi.ProducerAccessMode.WaitForExclusive;
         default:
             throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
         }
@@ -2264,8 +2268,8 @@ public class Commands {
             return ProducerAccessMode.Exclusive;
         case Shared:
             return ProducerAccessMode.Shared;
-//        case WaitForExclusive:
-//            return ProducerAccessMode.WaitForExclusive;
+        case WaitForExclusive:
+            return ProducerAccessMode.WaitForExclusive;
         default:
             throw new IllegalArgumentException("Unknonw access mode: " + accessMode);
         }
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index bc5f902..b998828 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -622,6 +622,11 @@ message CommandProducerSuccess {
     // The topic epoch assigned by the broker. This field will only be set if we
     // were requiring exclusive access when creating the producer.
     optional uint64 topic_epoch = 5;
+
+    // If producer is not "ready", the client will avoid to timeout the request
+    // for creating the producer. Instead it will wait indefinitely until it gets 
+    // a subsequent  `CommandProducerSuccess` with `producer_ready==true`.
+    optional bool producer_ready = 6 [default = true];
 }
 
 message CommandError {