You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/09/10 12:37:03 UTC

[pulsar] branch branch-2.8 updated: [Issue 11496][C++] Allow partitioned producers to start lazily (#11570)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 649e15a  [Issue 11496][C++] Allow partitioned producers to start lazily (#11570)
649e15a is described below

commit 649e15a4b7193ee9d6deabd5c26c3566cf82a21d
Author: Jack Vanlightly <va...@gmail.com>
AuthorDate: Mon Aug 16 04:12:24 2021 +0200

    [Issue 11496][C++] Allow partitioned producers to start lazily (#11570)
    
    Fixes #11496 also matches part of PIP 79.
    
    C++ implementation that closely matches the proposed Java client changes from reducing partitioned producer connections and lookups: PR 10279
    
    ### Motivation
    
    Producers that send messages to partitioned topics start a producer per partition, even when using single partition routing. For topics that have the combination of a large number of producers and a large number of partitions, this can put strain on the brokers. With say 1000 partitions and single partition routing with non-keyed messages, 999 topic owner lookups and producer registrations are performed that could be avoided.
    
    PIP 79 also describes this. I wrote this before realising that PIP 79 also covers this. This implementation can be reviewed and contrasted to the Java client implementation in https://github.com/apache/pulsar/pull/10279.
    
    ### Modifications
    
    Allows partitioned producers to start producers for individual partitions lazily. Starting a producer involves a topic owner
    lookup to find out which broker is the owner of the partition, then registering the producer for that partition with the owner
    broker. For topics with many partitions and when using SinglePartition routing without keyed messages, all of these
    lookups and producer registrations are a waste except for the single chosen partition.
    
    This change allows the user to control whether a producer on a partitioned topic uses this lazy start or not, via a new config
    in ProducerConfiguration. When ProducerConfiguration.setLazyStartPartitionedProducers(true) is set, the PartitionedProducerImpl.start() becomes a synchronous operation that only does housekeeping (no network operations).
    The producer of any given partition is started (which includes a topic owner lookup and registration) upon sending the first message to that partition. While the producer starts, messages are buffered.
    
    The sendTimeout timer is only activated once a producer has been fully started, which should give enough time for any buffered messages to be sent. For very short send timeouts, this setting could cause send timeouts during the start phase. The default of 30s should however not cause this issue.
    
    (cherry picked from commit 9577b841de2fa1d3597af39b3697567d8829ccdc)
---
 .../include/pulsar/ProducerConfiguration.h         |  21 ++
 .../include/pulsar/c/producer_configuration.h      |   6 +
 pulsar-client-cpp/lib/HandlerBase.cc               |  14 +-
 pulsar-client-cpp/lib/HandlerBase.h                |   1 +
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   |  91 ++++++--
 pulsar-client-cpp/lib/PartitionedProducerImpl.h    |   5 +-
 pulsar-client-cpp/lib/Producer.cc                  |   2 +-
 pulsar-client-cpp/lib/ProducerConfiguration.cc     |  10 +
 pulsar-client-cpp/lib/ProducerConfigurationImpl.h  |   1 +
 pulsar-client-cpp/lib/ProducerImpl.cc              | 117 ++++++++---
 pulsar-client-cpp/lib/ProducerImpl.h               |   5 +-
 pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc |  10 +
 pulsar-client-cpp/python/pulsar/__init__.py        |  14 ++
 pulsar-client-cpp/python/src/config.cc             |   2 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       | 110 +++++++++-
 pulsar-client-cpp/tests/PartitionsUpdateTest.cc    |  44 ++--
 pulsar-client-cpp/tests/ProducerTest.cc            | 115 ++++++++++
 pulsar-client-cpp/tests/PulsarFriend.h             |   5 +
 site2/docs/client-libraries-cpp.md                 | 231 ++++++++++++++++++---
 19 files changed, 700 insertions(+), 104 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 3306b27..5c2792a 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -263,6 +263,27 @@ class PULSAR_PUBLIC ProducerConfiguration {
     HashingScheme getHashingScheme() const;
 
     /**
+     * This config affects producers of partitioned topics only. It controls whether
+     * producers register and connect immediately to the owner broker of each partition
+     * or start lazily on demand. The internal producer of one partition is always
+     * started eagerly, chosen by the routing policy, but the internal producers of
+     * any additional partitions are started on demand, upon receiving their first
+     * message.
+     * Using this mode can reduce the strain on brokers for topics with large numbers of
+     * partitions and when the SinglePartition routing policy is used without keyed messages.
+     * Because producer connection can be on demand, this can produce extra send latency
+     * for the first messages of a given partition.
+     * @param true/false as to whether to start partition producers lazily
+     * @return
+     */
+    ProducerConfiguration& setLazyStartPartitionedProducers(bool);
+
+    /**
+     * The getter associated with setLazyStartPartitionedProducers()
+     */
+    bool getLazyStartPartitionedProducers() const;
+
+    /**
      * The setter associated with getBlockIfQueueFull()
      */
     ProducerConfiguration& setBlockIfQueueFull(bool);
diff --git a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
index 17653d4..cf62baa 100644
--- a/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
+++ b/pulsar-client-cpp/include/pulsar/c/producer_configuration.h
@@ -144,6 +144,12 @@ PULSAR_PUBLIC void pulsar_producer_configuration_set_hashing_scheme(pulsar_produ
 PULSAR_PUBLIC pulsar_hashing_scheme
 pulsar_producer_configuration_get_hashing_scheme(pulsar_producer_configuration_t *conf);
 
+PULSAR_PUBLIC void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
+    pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers);
+
+PULSAR_PUBLIC int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
+    pulsar_producer_configuration_t *conf);
+
 PULSAR_PUBLIC void pulsar_producer_configuration_set_block_if_queue_full(
     pulsar_producer_configuration_t *conf, int blockIfQueueFull);
 
diff --git a/pulsar-client-cpp/lib/HandlerBase.cc b/pulsar-client-cpp/lib/HandlerBase.cc
index de9929d..d7025ad 100644
--- a/pulsar-client-cpp/lib/HandlerBase.cc
+++ b/pulsar-client-cpp/lib/HandlerBase.cc
@@ -35,14 +35,23 @@ HandlerBase::HandlerBase(const ClientImplPtr& client, const std::string& topic,
       mutex_(),
       creationTimestamp_(TimeUtils::now()),
       operationTimeut_(seconds(client->conf().getOperationTimeoutSeconds())),
-      state_(Pending),
+      state_(NotStarted),
       backoff_(backoff),
       epoch_(0),
       timer_(executor_->createDeadlineTimer()) {}
 
 HandlerBase::~HandlerBase() { timer_->cancel(); }
 
-void HandlerBase::start() { grabCnx(); }
+void HandlerBase::start() {
+    Lock lock(mutex_);
+    // guard against concurrent state changes such as closing
+    if (state_ == NotStarted) {
+        state_ = Pending;
+        lock.unlock();
+
+        grabCnx();
+    }
+}
 
 void HandlerBase::grabCnx() {
     Lock lock(mutex_);
@@ -106,6 +115,7 @@ void HandlerBase::handleDisconnection(Result result, ClientConnectionWeakPtr con
             scheduleReconnection(handler);
             break;
 
+        case NotStarted:
         case Closing:
         case Closed:
         case Failed:
diff --git a/pulsar-client-cpp/lib/HandlerBase.h b/pulsar-client-cpp/lib/HandlerBase.h
index 93abba9..eeb8ebe 100644
--- a/pulsar-client-cpp/lib/HandlerBase.h
+++ b/pulsar-client-cpp/lib/HandlerBase.h
@@ -97,6 +97,7 @@ class HandlerBase {
 
     enum State
     {
+        NotStarted,
         Pending,
         Ready,
         Closing,
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 4e01263..94bf353 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -87,13 +87,18 @@ unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
     return getNumPartitions();
 }
 
-ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
+ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition, bool lazy) {
     using namespace std::placeholders;
     std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
     auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_, partition);
-    producer->getProducerCreatedFuture().addListener(
-        std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
-                  const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
+
+    if (lazy) {
+        createLazyPartitionProducer(partition);
+    } else {
+        producer->getProducerCreatedFuture().addListener(
+            std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
+                      const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
+    }
 
     LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
     return producer;
@@ -104,12 +109,28 @@ void PartitionedProducerImpl::start() {
     // create producer per partition
     // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
     // when `state_` is Ready
-    for (unsigned int i = 0; i < getNumPartitions(); i++) {
-        producers_.push_back(newInternalProducer(i));
-    }
 
-    for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
-        (*prod)->start();
+    if (conf_.getLazyStartPartitionedProducers()) {
+        // start one producer now, to ensure authz errors occur now
+        // if the SinglePartition router is used, then this producer will serve
+        // all non-keyed messages in the future
+        Message msg = MessageBuilder().setContent("x").build();
+        short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
+
+        for (unsigned int i = 0; i < getNumPartitions(); i++) {
+            bool lazy = (short)i != partition;
+            producers_.push_back(newInternalProducer(i, lazy));
+        }
+
+        producers_[partition]->start();
+    } else {
+        for (unsigned int i = 0; i < getNumPartitions(); i++) {
+            producers_.push_back(newInternalProducer(i, false));
+        }
+
+        for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
+            (*prod)->start();
+        }
     }
 }
 
@@ -147,8 +168,27 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
     }
 }
 
+void PartitionedProducerImpl::createLazyPartitionProducer(unsigned int partitionIndex) {
+    const auto numPartitions = getNumPartitions();
+    assert(numProducersCreated_ <= numPartitions);
+    assert(partitionIndex <= numPartitions);
+    numProducersCreated_++;
+    if (numProducersCreated_ == numPartitions) {
+        state_ = Ready;
+        if (partitionsUpdateTimer_) {
+            runPartitionUpdateTask();
+        }
+        partitionedProducerCreatedPromise_.setValue(shared_from_this());
+    }
+}
+
 // override
 void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
+    if (!assertState(Ready)) {
+        callback(ResultAlreadyClosed, msg.getMessageId());
+        return;
+    }
+
     // get partition for this message from router policy
     Lock producersLock(producersMutex_);
     short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
@@ -161,7 +201,14 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
     }
     // find a producer for that partition, index should start from 0
     ProducerImplPtr producer = producers_[partition];
+
+    // if the producer is not started (lazy producer), then kick-off the start process
+    if (!producer->isStarted()) {
+        producer->start();
+    }
+
     producersLock.unlock();
+
     // send message on that partition
     producer->sendAsync(msg, callback);
 }
@@ -175,6 +222,11 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
     lock.unlock();
 }
 
+bool PartitionedProducerImpl::assertState(const PartitionedProducerState state) {
+    Lock lock(mutex_);
+    return state_ == state;
+}
+
 const std::string& PartitionedProducerImpl::getProducerName() const {
     Lock producersLock(producersMutex_);
     return producers_[0]->getProducerName();
@@ -285,7 +337,9 @@ bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
 void PartitionedProducerImpl::triggerFlush() {
     Lock producersLock(producersMutex_);
     for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
-        (*prod)->triggerFlush();
+        if ((*prod)->isStarted()) {
+            (*prod)->triggerFlush();
+        }
     }
 }
 
@@ -322,7 +376,11 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
     };
 
     for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
-        (*prod)->flushAsync(subFlushCallback);
+        if ((*prod)->isStarted()) {
+            (*prod)->flushAsync(subFlushCallback);
+        } else {
+            subFlushCallback(ResultOk);
+        }
     }
 }
 
@@ -355,8 +413,11 @@ void PartitionedProducerImpl::handleGetPartitions(Result result,
             topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));
 
             for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
-                auto producer = newInternalProducer(i);
-                producer->start();
+                auto producer = newInternalProducer(i, conf_.getLazyStartPartitionedProducers());
+
+                if (!conf_.getLazyStartPartitionedProducers()) {
+                    producer->start();
+                }
                 producers_.push_back(producer);
             }
             // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
@@ -379,8 +440,8 @@ bool PartitionedProducerImpl::isConnected() const {
     Lock producersLock(producersMutex_);
     const auto producers = producers_;
     producersLock.unlock();
-    for (const auto& producer : producers_) {
-        if (!producer->isConnected()) {
+    for (const auto& producer : producers) {
+        if (producer->isStarted() && !producer->isConnected()) {
             return false;
         }
     }
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index c097190..60881f2 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -67,7 +67,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     uint64_t getNumberOfConnectedProducer() override;
     void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                               const unsigned int partitionIndex);
-
+    void createLazyPartitionProducer(const unsigned int partitionIndex);
     void handleSinglePartitionProducerClose(Result result, const unsigned int partitionIndex,
                                             CloseCallback callback);
 
@@ -104,7 +104,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     unsigned int getNumPartitions() const;
     unsigned int getNumPartitionsWithLock() const;
 
-    ProducerImplPtr newInternalProducer(unsigned int partition) const;
+    ProducerImplPtr newInternalProducer(unsigned int partition, bool lazy);
 
     MessageRoutingPolicyPtr routerPolicy_;
 
@@ -129,6 +129,7 @@ class PartitionedProducerImpl : public ProducerImplBase,
     void runPartitionUpdateTask();
     void getPartitionMetadata();
     void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
+    bool assertState(const PartitionedProducerState state);
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc
index 26f6ee8..acd021b 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -114,7 +114,7 @@ void Producer::flushAsync(FlushCallback callback) {
 void Producer::producerFailMessages(Result result) {
     if (impl_) {
         ProducerImpl* producerImpl = static_cast<ProducerImpl*>(impl_.get());
-        producerImpl->failPendingMessages(result);
+        producerImpl->failPendingMessages(result, true);
     }
 }
 
diff --git a/pulsar-client-cpp/lib/ProducerConfiguration.cc b/pulsar-client-cpp/lib/ProducerConfiguration.cc
index 61217f5..3e027ee 100644
--- a/pulsar-client-cpp/lib/ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/ProducerConfiguration.cc
@@ -204,6 +204,16 @@ ProducerConfiguration& ProducerConfiguration::addEncryptionKey(std::string key)
     return *this;
 }
 
+ProducerConfiguration& ProducerConfiguration::setLazyStartPartitionedProducers(
+    bool useLazyStartPartitionedProducers) {
+    impl_->useLazyStartPartitionedProducers = useLazyStartPartitionedProducers;
+    return *this;
+}
+
+bool ProducerConfiguration::getLazyStartPartitionedProducers() const {
+    return impl_->useLazyStartPartitionedProducers;
+}
+
 ProducerConfiguration& ProducerConfiguration::setSchema(const SchemaInfo& schemaInfo) {
     impl_->schemaInfo = schemaInfo;
     return *this;
diff --git a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
index fa6b755..a41b250 100644
--- a/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ProducerConfigurationImpl.h
@@ -37,6 +37,7 @@ struct ProducerConfigurationImpl {
     ProducerConfiguration::PartitionsRoutingMode routingMode{ProducerConfiguration::UseSinglePartition};
     MessageRoutingPolicyPtr messageRouter;
     ProducerConfiguration::HashingScheme hashingScheme{ProducerConfiguration::BoostHash};
+    bool useLazyStartPartitionedProducers{false};
     bool blockIfQueueFull{false};
     bool batchingEnabled{true};
     unsigned int batchingMaxMessages{1000};
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc
index c7a4551..f81e205 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -109,7 +109,7 @@ ProducerImpl::~ProducerImpl() {
     LOG_DEBUG(getName() << "~ProducerImpl");
     cancelTimers();
     printStats();
-    if (state_ == Ready) {
+    if (state_ == Ready || state_ == Pending) {
         LOG_WARN(getName() << "Destroyed producer which was not properly closed");
     }
 }
@@ -159,7 +159,11 @@ void ProducerImpl::connectionFailed(Result result) {
     // Keep a reference to ensure object is kept alive
     ProducerImplPtr ptr = shared_from_this();
 
-    if (producerCreatedPromise_.setFailed(result)) {
+    if (conf_.getLazyStartPartitionedProducers()) {
+        // if producers are lazy, then they should always try to restart
+        // so don't change the state and allow reconnections
+        return;
+    } else if (producerCreatedPromise_.setFailed(result)) {
         Lock lock(mutex_);
         state_ = Failed;
     }
@@ -169,12 +173,20 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                                         const ResponseData& responseData) {
     LOG_DEBUG(getName() << "ProducerImpl::handleCreateProducer res: " << strResult(result));
 
+    // make sure we're still in the Pending/Ready state, closeAsync could have been invoked
+    // while waiting for this response if using lazy producers
+    Lock lock(mutex_);
+    if (state_ != Ready && state_ != Pending) {
+        LOG_DEBUG("Producer created response received but producer already closed");
+        failPendingMessages(ResultAlreadyClosed, false);
+        return;
+    }
+
     if (result == ResultOk) {
         // We are now reconnected to broker and clear to send messages. Re-send all pending messages and
         // set the cnx pointer so that new messages will be sent immediately
         LOG_INFO(getName() << "Created producer on broker " << cnx->cnxString());
 
-        Lock lock(mutex_);
         cnx->registerProducer(producerId_, shared_from_this());
         producerName_ = responseData.producerName;
         schemaVersion_ = responseData.schemaVersion;
@@ -197,19 +209,16 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                                                    shared_from_this(), std::placeholders::_1));
         }
 
-        // Initialize the sendTimer only once per producer and only when producer timeout is
-        // configured. Set the timeout as configured value and asynchronously wait for the
-        // timeout to happen.
-        if (!sendTimer_ && conf_.getSendTimeout() > 0) {
-            sendTimer_ = executor_->createDeadlineTimer();
-            sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
-            sendTimer_->async_wait(
-                std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
+        // if the producer is lazy the send timeout timer is already running
+        if (!conf_.getLazyStartPartitionedProducers()) {
+            startSendTimeoutTimer();
         }
 
         producerCreatedPromise_.setValue(shared_from_this());
 
     } else {
+        lock.unlock();
+
         // Producer creation failed
         if (result == ResultTimeout) {
             // Creating the producer has timed out. We need to ensure the broker closes the producer
@@ -222,7 +231,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
         if (producerCreatedPromise_.isComplete()) {
             if (result == ResultProducerBlockedQuotaExceededException) {
                 LOG_WARN(getName() << "Backlog is exceeded on topic. Sending exception to producer");
-                failPendingMessages(ResultProducerBlockedQuotaExceededException);
+                failPendingMessages(ResultProducerBlockedQuotaExceededException, true);
             } else if (result == ResultProducerBlockedQuotaExceededError) {
                 LOG_WARN(getName() << "Producer is blocked on creation because backlog is exceeded on topic");
             }
@@ -237,6 +246,7 @@ void ProducerImpl::handleCreateProducer(const ClientConnectionPtr& cnx, Result r
                 scheduleReconnection(shared_from_this());
             } else {
                 LOG_ERROR(getName() << "Failed to create producer: " << strResult(result));
+                failPendingMessages(result, true);
                 producerCreatedPromise_.setFailed(result);
                 Lock lock(mutex_);
                 state_ = Failed;
@@ -276,8 +286,12 @@ std::shared_ptr<ProducerImpl::PendingCallbacks> ProducerImpl::getPendingCallback
     return getPendingCallbacksWhenFailed();
 }
 
-void ProducerImpl::failPendingMessages(Result result) {
-    getPendingCallbacksWhenFailedWithLock()->complete(result);
+void ProducerImpl::failPendingMessages(Result result, bool withLock) {
+    if (withLock) {
+        getPendingCallbacksWhenFailedWithLock()->complete(result);
+    } else {
+        getPendingCallbacksWhenFailed()->complete(result);
+    }
 }
 
 void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
@@ -320,9 +334,14 @@ void ProducerImpl::statsCallBackHandler(Result res, const MessageId& msgId, Send
 void ProducerImpl::flushAsync(FlushCallback callback) {
     if (batchMessageContainer_) {
         Lock lock(mutex_);
-        auto failures = batchMessageAndSend(callback);
-        lock.unlock();
-        failures.complete();
+
+        if (state_ == Ready) {
+            auto failures = batchMessageAndSend(callback);
+            lock.unlock();
+            failures.complete();
+        } else {
+            callback(ResultAlreadyClosed);
+        }
     } else {
         callback(ResultOk);
     }
@@ -331,9 +350,11 @@ void ProducerImpl::flushAsync(FlushCallback callback) {
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer_) {
         Lock lock(mutex_);
-        auto failures = batchMessageAndSend();
-        lock.unlock();
-        failures.complete();
+        if (state_ == Ready) {
+            auto failures = batchMessageAndSend();
+            lock.unlock();
+            failures.complete();
+        }
     }
 }
 
@@ -389,7 +410,8 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     }
 
     Lock lock(mutex_);
-    if (state_ != Ready) {
+    // producers may be lazily starting and be in the pending state
+    if (state_ != Ready && state_ != Pending) {
         lock.unlock();
         releaseSemaphore(payloadSize);
         cb(ResultAlreadyClosed, msg.getMessageId());
@@ -550,10 +572,14 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e
         return;
     }
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
+
+    // ignore if the producer is already closing/closed
     Lock lock(mutex_);
-    auto failures = batchMessageAndSend();
-    lock.unlock();
-    failures.complete();
+    if (state_ == Pending || state_ == Ready) {
+        auto failures = batchMessageAndSend();
+        lock.unlock();
+        failures.complete();
+    }
 }
 
 void ProducerImpl::printStats() {
@@ -568,16 +594,28 @@ void ProducerImpl::printStats() {
 void ProducerImpl::closeAsync(CloseCallback callback) {
     Lock lock(mutex_);
 
+    // if the producer was never started then there is nothing to clean up
+    if (state_ == NotStarted) {
+        state_ = Closed;
+        callback(ResultOk);
+        return;
+    }
+
     // Keep a reference to ensure object is kept alive
     ProducerImplPtr ptr = shared_from_this();
 
     cancelTimers();
 
-    if (state_ != Ready) {
+    // ensure any remaining send callbacks are called before calling the close callback
+    failPendingMessages(ResultAlreadyClosed, false);
+
+    if (state_ != Ready && state_ != Pending) {
+        state_ = Closed;
         lock.unlock();
         if (callback) {
             callback(ResultAlreadyClosed);
         }
+
         return;
     }
     LOG_INFO(getName() << "Closing producer for topic " << topic_);
@@ -631,6 +669,7 @@ void ProducerImpl::handleClose(Result result, ResultCallback callback, ProducerI
     } else {
         LOG_ERROR(getName() << "Failed to close producer: " << strResult(result));
     }
+
     if (callback) {
         callback(result);
     }
@@ -644,7 +683,7 @@ uint64_t ProducerImpl::getProducerId() const { return producerId_; }
 
 void ProducerImpl::handleSendTimeout(const boost::system::error_code& err) {
     Lock lock(mutex_);
-    if (state_ != Ready) {
+    if (state_ != Pending && state_ != Ready) {
         return;
     }
 
@@ -786,7 +825,15 @@ void ProducerImpl::disconnectProducer() {
     scheduleReconnection(shared_from_this());
 }
 
-void ProducerImpl::start() { HandlerBase::start(); }
+void ProducerImpl::start() {
+    HandlerBase::start();
+
+    if (conf_.getLazyStartPartitionedProducers()) {
+        // we need to kick it off now as it is possible that the connection may take
+        // longer than sendTimeout to connect
+        startSendTimeoutTimer();
+    }
+}
 
 void ProducerImpl::shutdown() {
     Lock lock(mutex_);
@@ -828,5 +875,21 @@ bool ProducerImpl::isConnected() const {
 
 uint64_t ProducerImpl::getNumberOfConnectedProducer() { return isConnected() ? 1 : 0; }
 
+bool ProducerImpl::isStarted() const {
+    Lock lock(mutex_);
+    return state_ != NotStarted;
+}
+void ProducerImpl::startSendTimeoutTimer() {
+    // Initialize the sendTimer only once per producer and only when producer timeout is
+    // configured. Set the timeout as configured value and asynchronously wait for the
+    // timeout to happen.
+    if (!sendTimer_ && conf_.getSendTimeout() > 0) {
+        sendTimer_ = executor_->createDeadlineTimer();
+        sendTimer_->expires_from_now(milliseconds(conf_.getSendTimeout()));
+        sendTimer_->async_wait(
+            std::bind(&ProducerImpl::handleSendTimeout, shared_from_this(), std::placeholders::_1));
+    }
+}
+
 }  // namespace pulsar
 /* namespace pulsar */
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h
index 2c51d41..d29efed 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -71,6 +71,7 @@ class ProducerImpl : public HandlerBase,
     void flushAsync(FlushCallback callback) override;
     bool isConnected() const override;
     uint64_t getNumberOfConnectedProducer() override;
+    bool isStarted() const;
 
     bool removeCorruptMessage(uint64_t sequenceId);
 
@@ -93,6 +94,8 @@ class ProducerImpl : public HandlerBase,
 
     void batchMessageTimeoutHandler(const boost::system::error_code& ec);
 
+    void startSendTimeoutTimer();
+
     friend class PulsarFriend;
 
     friend class Producer;
@@ -159,7 +162,7 @@ class ProducerImpl : public HandlerBase,
     std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailed();
     std::shared_ptr<PendingCallbacks> getPendingCallbacksWhenFailedWithLock();
 
-    void failPendingMessages(Result result);
+    void failPendingMessages(Result result, bool withLock);
 
     MessageCryptoPtr msgCrypto_;
     DeadlineTimerPtr dataKeyGenTImer_;
diff --git a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
index 7bc7915..f26f63a 100644
--- a/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
+++ b/pulsar-client-cpp/lib/c/c_ProducerConfiguration.cc
@@ -135,6 +135,16 @@ void pulsar_producer_configuration_set_message_router(pulsar_producer_configurat
     conf->conf.setMessageRouter(std::make_shared<MessageRoutingPolicy>(router, ctx));
 }
 
+void pulsar_producer_configuration_set_lazy_start_partitioned_producers(
+    pulsar_producer_configuration_t *conf, int useLazyStartPartitionedProducers) {
+    conf->conf.setLazyStartPartitionedProducers(useLazyStartPartitionedProducers);
+}
+
+int pulsar_producer_configuration_get_lazy_start_partitioned_producers(
+    pulsar_producer_configuration_t *conf) {
+    return conf->conf.getLazyStartPartitionedProducers();
+}
+
 void pulsar_producer_configuration_set_block_if_queue_full(pulsar_producer_configuration_t *conf,
                                                            int blockIfQueueFull) {
     conf->conf.setBlockIfQueueFull(blockIfQueueFull);
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 9570cbe..18891c7 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -464,6 +464,7 @@ class Client:
                         batching_max_allowed_size_in_bytes=128*1024,
                         batching_max_publish_delay_ms=10,
                         message_routing_mode=PartitionsRoutingMode.RoundRobinDistribution,
+                        lazy_start_partitioned_producers=False,
                         properties=None,
                         batching_type=BatchingType.Default,
                         encryption_key=None,
@@ -518,6 +519,17 @@ class Client:
         * `message_routing_mode`:
           Set the message routing mode for the partitioned producer. Default is `PartitionsRoutingMode.RoundRobinDistribution`,
           other option is `PartitionsRoutingMode.UseSinglePartition`
+        * `lazy_start_partitioned_producers`:
+          This config affects producers of partitioned topics only. It controls whether
+          producers register and connect immediately to the owner broker of each partition
+          or start lazily on demand. The internal producer of one partition is always
+          started eagerly, chosen by the routing policy, but the internal producers of
+          any additional partitions are started on demand, upon receiving their first
+          message.
+          Using this mode can reduce the strain on brokers for topics with large numbers of
+          partitions and when the SinglePartition routing policy is used without keyed messages.
+          Because producer connection can be on demand, this can produce extra send latency
+          for the first messages of a given partition.
         * `properties`:
           Sets the properties for the producer. The properties associated with a producer
           can be used for identify a producer at broker side.
@@ -558,6 +570,7 @@ class Client:
         _check_type(BatchingType, batching_type, 'batching_type')
         _check_type_or_none(str, encryption_key, 'encryption_key')
         _check_type_or_none(CryptoKeyReader, crypto_key_reader, 'crypto_key_reader')
+        _check_type(bool, lazy_start_partitioned_producers, 'lazy_start_partitioned_producers')
 
         conf = _pulsar.ProducerConfiguration()
         conf.send_timeout_millis(send_timeout_millis)
@@ -571,6 +584,7 @@ class Client:
         conf.batching_max_publish_delay_ms(batching_max_publish_delay_ms)
         conf.partitions_routing_mode(message_routing_mode)
         conf.batching_type(batching_type)
+        conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers)
         if producer_name:
             conf.producer_name(producer_name)
         if initial_sequence_id:
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index c34e818..f9b9c3c 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -245,6 +245,8 @@ void export_config() {
             .def("block_if_queue_full", &ProducerConfiguration::setBlockIfQueueFull, return_self<>())
             .def("partitions_routing_mode", &ProducerConfiguration::getPartitionsRoutingMode)
             .def("partitions_routing_mode", &ProducerConfiguration::setPartitionsRoutingMode, return_self<>())
+            .def("lazy_start_partitioned_producers", &ProducerConfiguration::getLazyStartPartitionedProducers)
+            .def("lazy_start_partitioned_producers", &ProducerConfiguration::setLazyStartPartitionedProducers, return_self<>())
             .def("batching_enabled", &ProducerConfiguration::getBatchingEnabled, return_value_policy<copy_const_reference>())
             .def("batching_enabled", &ProducerConfiguration::setBatchingEnabled, return_self<>())
             .def("batching_max_messages", &ProducerConfiguration::getBatchingMaxMessages, return_value_policy<copy_const_reference>())
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 8e7eb6b..f3ff78a 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -521,20 +521,21 @@ TEST(BasicEndToEndTest, testInvalidUrlPassed) {
     ASSERT_EQ(ResultConnectError, result);
 }
 
-TEST(BasicEndToEndTest, testPartitionedProducerConsumer) {
+void testPartitionedProducerConsumer(bool lazyStartPartitionedProducers, std::string topicName) {
     Client client(lookupUrl);
-    std::string topicName = "testPartitionedProducerConsumer";
 
     // call admin api to make it partitioned
-    std::string url =
-        adminUrl + "admin/v2/persistent/public/default/testPartitionedProducerConsumer/partitions";
+    std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+    makeDeleteRequest(url);
     int res = makePutRequest(url, "3");
 
     LOG_INFO("res = " << res);
     ASSERT_FALSE(res != 204 && res != 409);
 
+    ProducerConfiguration conf;
+    conf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
     Producer producer;
-    Result result = client.createProducer(topicName, producer);
+    Result result = client.createProducer(topicName, conf, producer);
     ASSERT_EQ(ResultOk, result);
 
     Consumer consumer;
@@ -561,6 +562,14 @@ TEST(BasicEndToEndTest, testPartitionedProducerConsumer) {
     client.shutdown();
 }
 
+TEST(BasicEndToEndTest, testPartitionedProducerConsumer) {
+    testPartitionedProducerConsumer(false, "testPartitionedProducerConsumer");
+}
+
+TEST(BasicEndToEndTest, testPartitionedLazyProducerConsumer) {
+    testPartitionedProducerConsumer(true, "testPartitionedProducerConsumerLazy");
+}
+
 TEST(BasicEndToEndTest, testPartitionedProducerConsumerSubscriptionName) {
     Client client(lookupUrl);
     std::string topicName = "testPartitionedProducerConsumerSubscriptionName" + unique_str();
@@ -1247,6 +1256,7 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
             oldConnections.push_back(clientConnectionPtr);
             clientConnectionPtr->close();
         }
+        LOG_INFO("checking message " << i);
         ASSERT_EQ(producer.send(msg), ResultOk);
     }
 
@@ -1280,6 +1290,61 @@ TEST(BasicEndToEndTest, testHandlerReconnectionLogic) {
     }
 }
 
+void testHandlerReconnectionPartitionProducers(bool lazyStartPartitionedProducers, bool batchingEnabled) {
+    Client client(adminUrl);
+    std::string uniqueChunk = unique_str();
+    std::string topicName = "testHandlerReconnectionLogicLazyProducers" + uniqueChunk;
+
+    std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+    int res = makePutRequest(url, "1");
+
+    ProducerConfiguration producerConf;
+    producerConf.setSendTimeout(10000);
+    producerConf.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
+    producerConf.setBatchingEnabled(batchingEnabled);
+    Producer producer;
+
+    ASSERT_EQ(client.createProducer(topicName, producerConf, producer), ResultOk);
+
+    std::vector<ClientConnectionPtr> oldConnections;
+
+    int numOfMessages = 10;
+    std::string propertyName = "msgIndex";
+    for (int i = 0; i < numOfMessages; i++) {
+        std::string messageContent = "msg-" + std::to_string(i);
+        Message msg =
+            MessageBuilder().setContent(messageContent).setProperty(propertyName, std::to_string(i)).build();
+        if (i % 3 == 1) {
+            ProducerImpl &pImpl = PulsarFriend::getInternalProducerImpl(producer, 0);
+            ClientConnectionPtr clientConnectionPtr;
+            do {
+                ClientConnectionWeakPtr clientConnectionWeakPtr = PulsarFriend::getClientConnection(pImpl);
+                clientConnectionPtr = clientConnectionWeakPtr.lock();
+                std::this_thread::sleep_for(std::chrono::seconds(1));
+            } while (!clientConnectionPtr);
+            oldConnections.push_back(clientConnectionPtr);
+            clientConnectionPtr->close();
+        }
+        ASSERT_EQ(producer.send(msg), ResultOk);
+    }
+}
+
+TEST(BasicEndToEndTest, testHandlerReconnectionPartitionedProducersWithoutBatching) {
+    testHandlerReconnectionPartitionProducers(false, false);
+}
+
+TEST(BasicEndToEndTest, testHandlerReconnectionPartitionedProducersWithBatching) {
+    testHandlerReconnectionPartitionProducers(false, true);
+}
+
+TEST(BasicEndToEndTest, testHandlerReconnectionLazyPartitionedProducersWithoutBatching) {
+    testHandlerReconnectionPartitionProducers(true, false);
+}
+
+TEST(BasicEndToEndTest, testHandlerReconnectionLazyPartitionedProducersWithBatching) {
+    testHandlerReconnectionPartitionProducers(true, true);
+}
+
 TEST(BasicEndToEndTest, testRSAEncryption) {
     ClientConfiguration config;
     Client client(lookupUrl);
@@ -2027,12 +2092,16 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerPubSub) {
     std::string url4 =
         adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsNotMatchPubSub4/partitions";
 
+    makeDeleteRequest(url1);
     int res = makePutRequest(url1, "2");
     ASSERT_FALSE(res != 204 && res != 409);
+    makeDeleteRequest(url2);
     res = makePutRequest(url2, "3");
     ASSERT_FALSE(res != 204 && res != 409);
+    makeDeleteRequest(url3);
     res = makePutRequest(url3, "4");
     ASSERT_FALSE(res != 204 && res != 409);
+    makeDeleteRequest(url4);
     res = makePutRequest(url4, "4");
     ASSERT_FALSE(res != 204 && res != 409);
 
@@ -2136,10 +2205,13 @@ TEST(BasicEndToEndTest, testpatternMultiTopicsHttpConsumerPubSub) {
     std::string url3 =
         adminUrl + "admin/v2/persistent/public/default/patternMultiTopicsHttpConsumerPubSub3/partitions";
 
+    makeDeleteRequest(url1);
     int res = makePutRequest(url1, "2");
     ASSERT_FALSE(res != 204 && res != 409);
+    makeDeleteRequest(url2);
     res = makePutRequest(url2, "3");
     ASSERT_FALSE(res != 204 && res != 409);
+    makeDeleteRequest(url3);
     res = makePutRequest(url3, "4");
     ASSERT_FALSE(res != 204 && res != 409);
 
@@ -2262,6 +2334,7 @@ TEST(BasicEndToEndTest, testPatternMultiTopicsConsumerAutoDiscovery) {
     auto createProducer = [&client](Producer &producer, const std::string &topic, int numPartitions) {
         if (numPartitions > 0) {
             const std::string url = adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions";
+            makeDeleteRequest(url);
             int res = makePutRequest(url, std::to_string(numPartitions));
             ASSERT_TRUE(res == 204 || res == 409);
         }
@@ -2446,7 +2519,7 @@ static void simpleCallback(Result code, const MessageId &msgId) {
     LOG_INFO("Received code: " << code << " -- MsgID: " << msgId);
 }
 
-TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
+void testSyncFlushBatchMessagesPartitionedTopic(bool lazyStartPartitionedProducers) {
     Client client(lookupUrl);
     std::string uniqueChunk = unique_str();
     std::string topicName = "persistent://public/default/partition-testSyncFlushBatchMessages" + uniqueChunk;
@@ -2461,6 +2534,8 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
 
     Producer producer;
     int numOfMessages = 20;
+    // lazy partitioned producers make a single call to the message router during createProducer
+    int initPart = lazyStartPartitionedProducers ? 1 : 0;
     ProducerConfiguration tempProducerConfiguration;
     tempProducerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
     ProducerConfiguration producerConfiguration = tempProducerConfiguration;
@@ -2468,6 +2543,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
     // set batch message number numOfMessages, and max delay 60s
     producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions);
     producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+    producerConfiguration.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
 
     Result result = client.createProducer(topicName, producerConfiguration, producer);
     ASSERT_EQ(ResultOk, result);
@@ -2509,7 +2585,7 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
     LOG_INFO("sending first part messages in async, should timeout to receive");
 
     Message m;
-    ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 5000));
+    ASSERT_EQ(ResultTimeout, consumer[initPart].receive(m, 5000));
 
     for (int i = numOfMessages / numberOfPartitions / 2; i < numOfMessages; i++) {
         std::string messageContent = prefix + std::to_string(i);
@@ -2533,16 +2609,25 @@ TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
         std::string messageContent = prefix + std::to_string(i);
         Message msg =
             MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
-        producer.send(msg);
+        ASSERT_EQ(ResultOk, producer.send(msg));
         LOG_DEBUG("sync sending message " << messageContent);
     }
+
     LOG_INFO("sending first part messages in sync, should not timeout to receive");
-    ASSERT_EQ(ResultOk, consumer[0].receive(m, 5000));
+    ASSERT_EQ(ResultOk, consumer[initPart].receive(m, 10000));
 
     producer.close();
     client.shutdown();
 }
 
+TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopic) {
+    testSyncFlushBatchMessagesPartitionedTopic(false);
+}
+
+TEST(BasicEndToEndTest, testSyncFlushBatchMessagesPartitionedTopicLazyProducers) {
+    testSyncFlushBatchMessagesPartitionedTopic(true);
+}
+
 TEST(BasicEndToEndTest, testGetTopicPartitions) {
     Client client(lookupUrl);
     std::string topicName = "persistent://public/default/testGetPartitions";
@@ -2660,7 +2745,7 @@ TEST(BasicEndToEndTest, testFlushInProducer) {
     client.shutdown();
 }
 
-TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
+void testFlushInPartitionedProducer(bool lazyStartPartitionedProducers) {
     Client client(lookupUrl);
     std::string uniqueChunk = unique_str();
     std::string topicName =
@@ -2685,6 +2770,7 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
     producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions);
     producerConfiguration.setBatchingMaxPublishDelayMs(60000);
     producerConfiguration.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
+    producerConfiguration.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
 
     Result result = client.createProducer(topicName, producerConfiguration, producer);
     ASSERT_EQ(ResultOk, result);
@@ -2763,6 +2849,10 @@ TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
     client.shutdown();
 }
 
+TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { testFlushInPartitionedProducer(false); }
+
+TEST(BasicEndToEndTest, testFlushInLazyPartitionedProducer) { testFlushInPartitionedProducer(true); }
+
 TEST(BasicEndToEndTest, testReceiveAsync) {
     ClientConfiguration config;
     Client client(lookupUrl);
diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
index af473a2..845e447 100644
--- a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
+++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
@@ -32,11 +32,6 @@ using namespace pulsar;
 static const std::string serviceUrl = "pulsar://localhost:6650";
 static const std::string adminUrl = "http://localhost:8080/";
 
-static const std::string topicNameSuffix = "public/default/partitions-update-test-topic";
-static const std::string topicName = "persistent://" + topicNameSuffix;
-static const std::string topicOperateUrl =
-    adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions";
-
 static ClientConfiguration newClientConfig(bool enablePartitionsUpdate) {
     ClientConfiguration clientConfig;
     if (enablePartitionsUpdate) {
@@ -55,14 +50,16 @@ class PartitionsSet {
    public:
     size_t size() const { return names_.size(); }
 
-    Result initProducer(bool enablePartitionsUpdate) {
+    Result initProducer(std::string topicName, bool enablePartitionsUpdate,
+                        bool lazyStartPartitionedProducers) {
         clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
-        const auto producerConfig =
-            ProducerConfiguration().setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>());
+        const auto producerConfig = ProducerConfiguration()
+                                        .setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>())
+                                        .setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
         return clientForProducer_->createProducer(topicName, producerConfig, producer_);
     }
 
-    Result initConsumer(bool enablePartitionsUpdate) {
+    Result initConsumer(std::string topicName, bool enablePartitionsUpdate) {
         clientForConsumer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
         return clientForConsumer_->subscribe(topicName, "SubscriptionName", consumer_);
     }
@@ -118,7 +115,10 @@ TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) {
     ASSERT_EQ(static_cast<unsigned int>(-1), clientConfig.getPartitionsUpdateInterval());
 }
 
-TEST(PartitionsUpdateTest, testPartitionsUpdate) {
+void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicNameSuffix) {
+    std::string topicName = "persistent://" + topicNameSuffix;
+    std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions";
+
     // Ensure `topicName` doesn't exist before created
     makeDeleteRequest(topicOperateUrl);
     // Create a 2 partitions topic
@@ -128,8 +128,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) {
     PartitionsSet partitionsSet;
 
     // 1. Both producer and consumer enable partitions update
-    ASSERT_EQ(ResultOk, partitionsSet.initProducer(true));
-    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true));
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, lazyStartPartitionedProducers));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
 
     res = makePostRequest(topicOperateUrl, "3");  // update partitions to 3
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
@@ -140,8 +140,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) {
     partitionsSet.close();
 
     // 2. Only producer enables partitions update
-    ASSERT_EQ(ResultOk, partitionsSet.initProducer(true));
-    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false));
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
 
     res = makePostRequest(topicOperateUrl, "5");  // update partitions to 5
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
@@ -152,8 +152,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) {
     partitionsSet.close();
 
     // 3. Only consumer enables partitions update
-    ASSERT_EQ(ResultOk, partitionsSet.initProducer(false));
-    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true));
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
 
     res = makePostRequest(topicOperateUrl, "7");  // update partitions to 7
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
@@ -164,8 +164,8 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) {
     partitionsSet.close();
 
     // 4. Both producer and consumer disables partitions update
-    ASSERT_EQ(ResultOk, partitionsSet.initProducer(false));
-    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false));
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
 
     res = makePostRequest(topicOperateUrl, "10");  // update partitions to 10
     ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
@@ -175,3 +175,11 @@ TEST(PartitionsUpdateTest, testPartitionsUpdate) {
     ASSERT_EQ(7, partitionsSet.size());
     partitionsSet.close();
 }
+
+TEST(PartitionsUpdateTest, testPartitionsUpdate) {
+    testPartitionsUpdate(false, "public/default/partitions-update-test-topic");
+}
+
+TEST(PartitionsUpdateTest, testPartitionsUpdateWithLazyProducers) {
+    testPartitionsUpdate(true, "public/default/partitions-update-test-topic-lazy");
+}
diff --git a/pulsar-client-cpp/tests/ProducerTest.cc b/pulsar-client-cpp/tests/ProducerTest.cc
index 61cac57..210f013 100644
--- a/pulsar-client-cpp/tests/ProducerTest.cc
+++ b/pulsar-client-cpp/tests/ProducerTest.cc
@@ -18,11 +18,13 @@
  */
 #include <pulsar/Client.h>
 #include <gtest/gtest.h>
+#include <thread>
 
 #include "HttpHelper.h"
 
 #include "lib/Future.h"
 #include "lib/Utils.h"
+#include "lib/Latch.h"
 #include "lib/LogUtils.h"
 DECLARE_LOG_OBJECT()
 
@@ -126,3 +128,116 @@ TEST(ProducerTest, testIsConnected) {
 
     client.close();
 }
+
+TEST(ProducerTest, testSendAsyncAfterCloseAsyncWithLazyProducers) {
+    Client client(serviceUrl);
+    const std::string partitionedTopic =
+        "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr));
+
+    int res = makePutRequest(
+        adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setLazyStartPartitionedProducers(true);
+    Producer producer;
+    ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer));
+
+    Message msg = MessageBuilder().setContent("test").build();
+
+    Promise<bool, Result> promiseClose;
+    producer.closeAsync(WaitForCallback(promiseClose));
+
+    Promise<Result, MessageId> promise;
+    producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promise));
+
+    MessageId mi;
+    ASSERT_EQ(ResultAlreadyClosed, promise.getFuture().get(mi));
+
+    Result result;
+    promiseClose.getFuture().get(result);
+    ASSERT_EQ(ResultOk, result);
+}
+
+TEST(ProducerTest, testSendAsyncCloseAsyncConcurrentlyWithLazyProducers) {
+    // run sendAsync and closeAsync concurrently and verify that all sendAsync callbacks are called
+    // and that messages sent after closeAsync is invoked receive ResultAlreadyClosed.
+    for (int run = 0; run < 20; run++) {
+        LOG_INFO("Start of run " << run);
+        Client client(serviceUrl);
+        const std::string partitionedTopic =
+            "testProducerIsConnectedPartitioned-" + std::to_string(time(nullptr));
+
+        int res = makePutRequest(
+            adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "10");
+        ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+        ProducerConfiguration producerConfiguration;
+        producerConfiguration.setLazyStartPartitionedProducers(true);
+        producerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+        producerConfiguration.setBatchingEnabled(true);
+        Producer producer;
+        ASSERT_EQ(ResultOk, client.createProducer(partitionedTopic, producerConfiguration, producer));
+
+        int sendCount = 100;
+        std::vector<Promise<Result, MessageId>> promises(sendCount);
+        Promise<bool, Result> promiseClose;
+
+        // only call closeAsync once at least 10 messages have been sent
+        Latch sendStartLatch(10);
+        Latch closeLatch(1);
+        int closedAt = 0;
+
+        std::thread t1([&]() {
+            for (int i = 0; i < sendCount; i++) {
+                sendStartLatch.countdown();
+                Message msg = MessageBuilder().setContent("test").build();
+
+                if (closeLatch.getCount() == 0 && closedAt == 0) {
+                    closedAt = i;
+                    LOG_INFO("closedAt set to " << closedAt)
+                }
+
+                producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promises[i]));
+                std::this_thread::sleep_for(std::chrono::milliseconds(1));
+            }
+        });
+
+        std::thread t2([&]() {
+            sendStartLatch.wait(std::chrono::milliseconds(1000));
+            LOG_INFO("Closing");
+            producer.closeAsync(WaitForCallback(promiseClose));
+            LOG_INFO("Close called");
+            closeLatch.countdown();
+            Result result;
+            promiseClose.getFuture().get(result);
+            ASSERT_EQ(ResultOk, result);
+            LOG_INFO("Closed");
+        });
+
+        t1.join();
+        t2.join();
+
+        // make sure that all messages after the moment when closeAsync was invoked
+        // return AlreadyClosed
+        for (int i = 0; i < sendCount; i++) {
+            LOG_DEBUG("Checking " << i)
+
+            // whether a message was sent successfully or not, it's callback
+            // must have been invoked
+            ASSERT_EQ(true, promises[i].isComplete());
+            MessageId mi;
+            Result res = promises[i].getFuture().get(mi);
+            LOG_DEBUG("Result is " << res);
+
+            // for the messages sent after closeAsync was invoked, they
+            // should all return ResultAlreadyClosed
+            if (i >= closedAt) {
+                ASSERT_EQ(ResultAlreadyClosed, res);
+            }
+        }
+
+        client.close();
+        LOG_INFO("End of run " << run);
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index ab507ac..aed7096 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -61,6 +61,11 @@ class PulsarFriend {
         return *producerImpl;
     }
 
+    static ProducerImpl& getInternalProducerImpl(Producer producer, int index) {
+        PartitionedProducerImpl* producerImpl = static_cast<PartitionedProducerImpl*>(producer.impl_.get());
+        return *(producerImpl->producers_[index]);
+    }
+
     static void producerFailMessages(Producer producer, Result result) {
         producer.producerFailMessages(result);
     }
diff --git a/site2/docs/client-libraries-cpp.md b/site2/docs/client-libraries-cpp.md
index d6192fe..11dd854 100644
--- a/site2/docs/client-libraries-cpp.md
+++ b/site2/docs/client-libraries-cpp.md
@@ -253,52 +253,227 @@ pulsar+ssl://pulsar.us-west.example.com:6651
 
 ## Create a consumer
 
-To use Pulsar as a consumer, you need to create a consumer on the C++ client. The following is an example. 
+To use Pulsar as a consumer, you need to create a consumer on the C++ client. There are two main ways of using the consumer:
+- Blocking style: synchronously calling `receive(msg)`.
+- Non-blocking (event based) style: using a message listener.
 
-```c++
-Client client("pulsar://localhost:6650");
+### Blocking example
 
-Consumer consumer;
-Result result = client.subscribe("my-topic", "my-subscription-name", consumer);
-if (result != ResultOk) {
-    LOG_ERROR("Failed to subscribe: " << result);
-    return -1;
+The benefit of this approach is that it is the simplest code. Simply keeps calling `receive(msg)` which blocks until a message is received.
+
+This example starts a subscription at the earliest offset and consumes 100 messages.
+
+```c++
+#include <pulsar/Client.h>
+
+using namespace pulsar;
+
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    Consumer consumer;
+    ConsumerConfiguration config;
+    config.setSubscriptionInitialPosition(InitialPositionEarliest);
+    Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer);
+    if (result != ResultOk) {
+        std::cout << "Failed to subscribe: " << result << std::endl;
+        return -1;
+    }
+
+    Message msg;
+    int ctr = 0;
+    // consume 100 messages
+    while (ctr < 100) {
+        consumer.receive(msg);
+        std::cout << "Received: " << msg
+            << "  with payload '" << msg.getDataAsString() << "'" << std::endl;
+
+        consumer.acknowledge(msg);
+        ctr++;
+    }
+
+    std::cout << "Finished consuming synchronously!" << std::endl;
+
+    client.close();
+    return 0;
 }
+```
+
+### Consumer with a message listener
+
+We can avoid the need to run a loop with blocking calls with an event based style by using a message listener which is invoked for each message that is received.
+
+This example starts a subscription at the earliest offset and consumes 100 messages.
+
+```c++
+#include <pulsar/Client.h>
+#include <atomic>
+#include <thread>
+
+using namespace pulsar;
 
-Message msg;
+std::atomic<uint32_t> messagesReceived;
 
-while (true) {
-    consumer.receive(msg);
-    LOG_INFO("Received: " << msg
-            << "  with payload '" << msg.getDataAsString() << "'");
+void handleAckComplete(Result res) {
+    std::cout << "Ack res: " << res << std::endl;
+}
 
-    consumer.acknowledge(msg);
+void listener(Consumer consumer, const Message& msg) {
+    std::cout << "Got message " << msg << " with content '" << msg.getDataAsString() << "'" << std::endl;
+    messagesReceived++;
+    consumer.acknowledgeAsync(msg.getMessageId(), handleAckComplete);
 }
 
-client.close();
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    Consumer consumer;
+    ConsumerConfiguration config;
+    config.setMessageListener(listener);
+    config.setSubscriptionInitialPosition(InitialPositionEarliest);
+    Result result = client.subscribe("persistent://public/default/my-topic", "consumer-1", config, consumer);
+    if (result != ResultOk) {
+        std::cout << "Failed to subscribe: " << result << std::endl;
+        return -1;
+    }
+
+    // wait for 100 messages to be consumed
+    while (messagesReceived < 100) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+
+    std::cout << "Finished consuming asynchronously!" << std::endl;
+
+    client.close();
+    return 0;
+}
 ```
 
 ## Create a producer
 
-To use Pulsar as a producer, you need to create a producer on the C++ client. The following is an example. 
+To use Pulsar as a producer, you need to create a producer on the C++ client. There are two main ways of using a producer:
+- Blocking style where each call to `send` waits for an ack from the broker.
+- Non-blocking asynchronous style where `sendAsync` is called instead of `send` and a callback is supplied for when the ack is received from the broker.
+
+### Simple blocking example
+
+This example sends 100 messages using the blocking style. While simple, it does not produce high throughput as it waits for each ack to come back before sending the next message.
 
 ```c++
-Client client("pulsar://localhost:6650");
+#include <pulsar/Client.h>
+#include <thread>
+
+using namespace pulsar;
+
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    Result result = client.createProducer("persistent://public/default/my-topic", producer);
+    if (result != ResultOk) {
+        std::cout << "Error creating producer: " << result << std::endl;
+        return -1;
+    }
+
+    // Send 100 messages synchronously
+    int ctr = 0;
+    while (ctr < 100) {
+        std::string content = "msg" + std::to_string(ctr);
+        Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build();
+        Result result = producer.send(msg);
+        if (result != ResultOk) {
+            std::cout << "The message " << content << " could not be sent, received code: " << result << std::endl;
+        } else {
+            std::cout << "The message " << content << " sent successfully" << std::endl;
+        }
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        ctr++;
+    }
+
+    std::cout << "Finished producing synchronously!" << std::endl;
+
+    client.close();
+    return 0;
+}
+```
 
-Producer producer;
-Result result = client.createProducer("my-topic", producer);
-if (result != ResultOk) {
-    LOG_ERROR("Error creating producer: " << result);
-    return -1;
+### Non-blocking example
+
+This example sends 100 messages using the non-blocking style calling `sendAsync` instead of `send`. This allows the producer to have multiple messages inflight at a time which increases throughput.
+
+The producer configuration `blockIfQueueFull` is useful here to avoid `ResultProducerQueueIsFull` errors when the internal queue for outgoing send requests becomes full. Once the internal queue is full, `sendAsync` becomes blocking which can make your code simpler.
+
+Without this configuration, the result code `ResultProducerQueueIsFull` is passed to the callback. You must decide how to deal with that (retry, discard etc).
+
+```c++
+#include <pulsar/Client.h>
+#include <thread>
+
+using namespace pulsar;
+
+std::atomic<uint32_t> acksReceived;
+
+void callback(Result code, const MessageId& msgId, std::string msgContent) {
+    // message processing logic here
+    std::cout << "Received ack for msg: " << msgContent << " with code: "
+        << code << " -- MsgID: " << msgId << std::endl;
+    acksReceived++;
 }
 
-// Publish 10 messages to the topic
-for (int i = 0; i < 10; i++){
-    Message msg = MessageBuilder().setContent("my-message").build();
-    Result res = producer.send(msg);
-    LOG_INFO("Message sent: " << res);
+int main() {
+    Client client("pulsar://localhost:6650");
+
+    ProducerConfiguration producerConf;
+    producerConf.setBlockIfQueueFull(true);
+    Producer producer;
+    Result result = client.createProducer("persistent://public/default/my-topic",
+                                          producerConf, producer);
+    if (result != ResultOk) {
+        std::cout << "Error creating producer: " << result << std::endl;
+        return -1;
+    }
+
+    // Send 100 messages asynchronously
+    int ctr = 0;
+    while (ctr < 100) {
+        std::string content = "msg" + std::to_string(ctr);
+        Message msg = MessageBuilder().setContent(content).setProperty("x", "1").build();
+        producer.sendAsync(msg, std::bind(callback,
+                                          std::placeholders::_1, std::placeholders::_2, content));
+
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+        ctr++;
+    }
+
+    // wait for 100 messages to be acked
+    while (acksReceived < 100) {
+        std::this_thread::sleep_for(std::chrono::milliseconds(100));
+    }
+
+    std::cout << "Finished producing asynchronously!" << std::endl;
+
+    client.close();
+    return 0;
 }
-client.close();
+```
+
+### Partitioned topics and lazy producers
+
+When scaling out a Pulsar topic, you may configure a topic to have hundreds of partitions. Likewise, you may have also scaled out your producers so there are hundreds or even thousands of producers. This can put some strain on the Pulsar brokers as when you create a producer on a partitioned topic, internally it creates one internal producer per partition which involves communications to the brokers for each one. So for a topic with 1000 partitions and 1000 producers, it ends up creating [...]
+
+You can reduce the load caused by this combination of a large number of partitions and many producers by doing the following:
+- use SinglePartition partition routing mode (this ensures that all messages are only sent to a single, randomly selected partition)
+- use non-keyed messages (when messages are keyed, routing is based on the hash of the key and so messages will end up being sent to multiple partitions)
+- use lazy producers (this ensures that an internal producer is only created on demand when a message needs to be routed to a partition)
+
+With our example above, that reduces the number of internal producers spread out over the 1000 producer apps from 1,000,000 to just 1000.
+
+Note that there can be extra latency for the first message sent. If you set a low send timeout, this timeout could be reached if the initial connection handshake is slow to complete.
+
+```c++
+ProducerConfiguration producerConf;
+producerConf.setPartitionsRoutingMode(ProducerConfiguration::UseSinglePartition);
+producerConf.setLazyStartPartitionedProducers(true);
 ```
 
 ## Enable authentication in connection URLs