You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/05/10 05:46:03 UTC

[pulsar] branch master updated: [C++] Auto update topic partitions (#6732)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 30934e1  [C++] Auto update topic partitions (#6732)
30934e1 is described below

commit 30934e16eacb83c34a6691db3d9ad294d260599e
Author: BewareMyPower <xy...@163.com>
AuthorDate: Sun May 10 13:45:46 2020 +0800

    [C++] Auto update topic partitions (#6732)
    
    ### Motivation
    
    We need to increase producers or consumers when partitions updated.
    
    Java client has implemented this feature, see [#3513](https://github.com/apache/pulsar/pull/3513). This PR trys to implement the same feature in C++ client.
    
    ### Modifications
    
    - Add a `boost::asio::deadline_timer` to `PartitionedConsumerImpl` and `PartitionedProducerImpl` to register lookup task to detect partitions changes periodly;
    - Add an `unsigned int` configuration parameter to indicate the period seconds of detecting partitions change (default: 60 seconds);
    - Unlock the `mutex_` in `PartitionedConsumerImpl::receive` after `state_` were checked.
    > Explain: When new consumers are created, `handleSinglePartitionConsumerCreated` will be called finally, which tried to lock the `mutex_`. It may happen that `receive` acquire the lock again and again so that `handleSinglePartitionConsumerCreated`
    are blocked in `lock.lock()` for a long time.
    
    
    
    * auto update topic partitions extend for consumer and producer in c++ client
    
    * add c++ unit test for partitions update
    
    * format code with clang-format-5.0
    
    * stop partitions update timer after producer/consumer called closeAsync()
    
    * fix bugs when running gtest-parallel
    
    * fix bug: Producer::flush() may cause deadlock
    
    * use getters to read `numPartitions` with or without lock
---
 .../include/pulsar/ClientConfiguration.h           |  16 ++
 pulsar-client-cpp/lib/ClientConfiguration.cc       |   9 ++
 pulsar-client-cpp/lib/ClientConfigurationImpl.h    |   5 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   | 152 ++++++++++++++----
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    |  13 ++
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc   | 125 ++++++++++++---
 pulsar-client-cpp/lib/PartitionedProducerImpl.h    |  17 ++
 pulsar-client-cpp/tests/HttpHelper.cc              |   6 +-
 pulsar-client-cpp/tests/HttpHelper.h               |   1 +
 pulsar-client-cpp/tests/PartitionsUpdateTest.cc    | 176 +++++++++++++++++++++
 10 files changed, 470 insertions(+), 50 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
index fd21a3e..056f7a7 100644
--- a/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ClientConfiguration.h
@@ -149,6 +149,22 @@ class PULSAR_PUBLIC ClientConfiguration {
      */
     const unsigned int& getStatsIntervalInSeconds() const;
 
+    /**
+     * Set partitions update interval in seconds.
+     * If a partitioned topic is produced or subscribed and `intervalInSeconds` is not 0, every
+     * `intervalInSeconds` seconds the partition number will be retrieved by sending lookup requests. If
+     * partition number has been increased, more producer/consumer of increased partitions will be created.
+     * Default is 60 seconds.
+     *
+     * @param intervalInSeconds the seconds between two lookup request for partitioned topic's metadata
+     */
+    ClientConfiguration& setPartititionsUpdateInterval(unsigned int intervalInSeconds);
+
+    /**
+     * Get partitions update interval in seconds.
+     */
+    unsigned int getPartitionsUpdateInterval() const;
+
     friend class ClientImpl;
     friend class PulsarWrapper;
 
diff --git a/pulsar-client-cpp/lib/ClientConfiguration.cc b/pulsar-client-cpp/lib/ClientConfiguration.cc
index aac7296..920b168 100644
--- a/pulsar-client-cpp/lib/ClientConfiguration.cc
+++ b/pulsar-client-cpp/lib/ClientConfiguration.cc
@@ -119,4 +119,13 @@ ClientConfiguration& ClientConfiguration::setStatsIntervalInSeconds(
 const unsigned int& ClientConfiguration::getStatsIntervalInSeconds() const {
     return impl_->statsIntervalInSeconds;
 }
+
+ClientConfiguration& ClientConfiguration::setPartititionsUpdateInterval(unsigned int intervalInSeconds) {
+    impl_->partitionsUpdateInterval = intervalInSeconds;
+    return *this;
+}
+
+unsigned int ClientConfiguration::getPartitionsUpdateInterval() const {
+    return impl_->partitionsUpdateInterval;
+}
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ClientConfigurationImpl.h b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
index 60e4ae6..e2f72fb 100644
--- a/pulsar-client-cpp/lib/ClientConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ClientConfigurationImpl.h
@@ -36,6 +36,7 @@ struct ClientConfigurationImpl {
     unsigned int statsIntervalInSeconds;
     LoggerFactoryPtr loggerFactory;
     bool validateHostName;
+    unsigned int partitionsUpdateInterval;
 
     ClientConfigurationImpl()
         : authenticationPtr(AuthFactory::Disabled()),
@@ -48,7 +49,9 @@ struct ClientConfigurationImpl {
           tlsAllowInsecureConnection(false),
           statsIntervalInSeconds(600),  // 10 minutes
           loggerFactory(),
-          validateHostName(false) {}
+          validateHostName(false),
+          partitionsUpdateInterval(60)  // 1 minute
+    {}
 };
 }  // namespace pulsar
 
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 0241a54..89fe1b9 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -53,6 +53,12 @@ PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    if (partitionsUpdateInterval > 0) {
+        partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
+        partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
+        lookupServicePtr_ = client_->getLookup();
+    }
 }
 
 PartitionedConsumerImpl::~PartitionedConsumerImpl() {}
@@ -70,6 +76,8 @@ Result PartitionedConsumerImpl::receive(Message& msg) {
         lock.unlock();
         return ResultAlreadyClosed;
     }
+    // See comments in `receive(Message&, int)`
+    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -87,6 +95,10 @@ Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
         lock.unlock();
         return ResultAlreadyClosed;
     }
+    // We unlocked `mutex_` here to avoid starvation of methods which are trying to acquire `mutex_`.
+    // In addition, `messageListener_` won't change once constructed, `BlockingQueue::pop` and
+    // `UnAckedMessageTracker::add` are thread-safe, so they don't need `mutex_` to achieve thread-safety.
+    lock.unlock();
 
     if (messageListener_) {
         LOG_ERROR("Can not receive when a listener has been set");
@@ -162,14 +174,15 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
         callback(ResultUnknownError);
         return;
     }
-    assert(unsubscribedSoFar_ <= numPartitions_);
-    assert(consumerIndex <= numPartitions_);
+    const auto numPartitions = getNumPartitionsWithLock();
+    assert(unsubscribedSoFar_ <= numPartitions);
+    assert(consumerIndex <= numPartitions);
     // this means we have successfully closed this partition consumer and no unsubscribe has failed so far
     LOG_INFO("Successfully Unsubscribed Consumer - " << consumerIndex << " for Subscription - "
                                                      << subscriptionName_ << " for Topic - "
                                                      << topicName_->toString());
     unsubscribedSoFar_++;
-    if (unsubscribedSoFar_ == numPartitions_) {
+    if (unsubscribedSoFar_ == numPartitions) {
         LOG_DEBUG("Unsubscribed all of the partition consumer for subscription - " << subscriptionName_);
         setState(Closed);
         callback(ResultOk);
@@ -179,7 +192,11 @@ void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int
 
 void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
     int32_t partition = msgId.partition();
-    assert(partition < numPartitions_ && partition >= 0 && consumers_.size() > partition);
+#ifndef NDEBUG
+    Lock consumersLock(consumersMutex_);
+    assert(partition < getNumPartitions() && partition >= 0 && consumers_.size() > partition);
+    consumersLock.unlock();
+#endif
     unAckedMessageTrackerPtr_->remove(msgId);
     consumers_[partition]->acknowledgeAsync(msgId, callback);
 }
@@ -194,35 +211,62 @@ void PartitionedConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
     consumers_[partition]->negativeAcknowledge(msgId);
 }
 
-void PartitionedConsumerImpl::start() {
-    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
-    std::shared_ptr<ConsumerImpl> consumer;
+unsigned int PartitionedConsumerImpl::getNumPartitions() const { return numPartitions_; }
+
+unsigned int PartitionedConsumerImpl::getNumPartitionsWithLock() const {
+    Lock consumersLock(consumersMutex_);
+    return getNumPartitions();
+}
+
+ConsumerConfiguration PartitionedConsumerImpl::getSinglePartitionConsumerConfig() const {
+    using namespace std::placeholders;
+
     ConsumerConfiguration config = conf_.clone();
     // all the partitioned-consumer belonging to one partitioned topic should have same name
     config.setConsumerName(conf_.getConsumerName());
     config.setConsumerType(conf_.getConsumerType());
     config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
-    config.setMessageListener(std::bind(&PartitionedConsumerImpl::messageReceived, shared_from_this(),
-                                        std::placeholders::_1, std::placeholders::_2));
+
+    const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
+    config.setMessageListener(std::bind(&PartitionedConsumerImpl::messageReceived, shared_this, _1, _2));
 
     // Apply total limit of receiver queue size across partitions
+    // NOTE: if it's called by handleGetPartitions(), the queue size of new internal consumers may be smaller
+    // than previous created internal consumers.
     config.setReceiverQueueSize(
         std::min(conf_.getReceiverQueueSize(),
-                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions_)));
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / getNumPartitions())));
+
+    return config;
+}
+
+ConsumerImplPtr PartitionedConsumerImpl::newInternalConsumer(unsigned int partition,
+                                                             const ConsumerConfiguration& config) const {
+    using namespace std::placeholders;
+
+    std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
+    auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+                                                   internalListenerExecutor_, Partitioned);
+
+    const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
+    consumer->getConsumerCreatedFuture().addListener(std::bind(
+        &PartitionedConsumerImpl::handleSinglePartitionConsumerCreated, shared_this, _1, _2, partition));
+    consumer->setPartitionIndex(partition);
+
+    LOG_DEBUG("Creating Consumer for single Partition - " << topicPartitionName << "SubName - "
+                                                          << subscriptionName_);
+    return consumer;
+}
+
+void PartitionedConsumerImpl::start() {
+    internalListenerExecutor_ = client_->getPartitionListenerExecutorProvider()->get();
+    const auto config = getSinglePartitionConsumerConfig();
 
     // create consumer on each partition
-    for (unsigned int i = 0; i < numPartitions_; i++) {
-        std::string topicPartitionName = topicName_->getTopicPartitionName(i);
-        consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
-                                                  internalListenerExecutor, Partitioned);
-        consumer->getConsumerCreatedFuture().addListener(
-            std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerCreated, shared_from_this(),
-                      std::placeholders::_1, std::placeholders::_2, i));
-        consumer->setPartitionIndex(i);
-        consumers_.push_back(consumer);
-
-        LOG_DEBUG("Creating Consumer for single Partition - " << topicPartitionName << "SubName - "
-                                                              << subscriptionName_);
+    // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
+    // when `state_` is Ready
+    for (unsigned int i = 0; i < getNumPartitions(); i++) {
+        consumers_.push_back(newInternalConsumer(i, config));
     }
     for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
          consumer++) {
@@ -238,7 +282,8 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
         // one of the consumer creation failed, and we are cleaning up
         return;
     }
-    assert(numConsumersCreated_ < numPartitions_);
+    const auto numPartitions = getNumPartitionsWithLock();
+    assert(numConsumersCreated_ < numPartitions);
 
     if (result != ResultOk) {
         state_ = Failed;
@@ -250,13 +295,16 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
         return;
     }
 
-    assert(partitionIndex < numPartitions_ && partitionIndex >= 0);
+    assert(partitionIndex < numPartitions && partitionIndex >= 0);
     numConsumersCreated_++;
-    if (numConsumersCreated_ == numPartitions_) {
+    if (numConsumersCreated_ == numPartitions) {
         LOG_INFO("Successfully Subscribed to Partitioned Topic - " << topicName_->toString() << " with - "
-                                                                   << numPartitions_ << " Partitions.");
+                                                                   << numPartitions << " Partitions.");
         state_ = Ready;
         lock.unlock();
+        if (partitionsUpdateTimer_) {
+            runPartitionUpdateTask();
+        }
         receiveMessages();
         partitionedConsumerCreatedPromise_.setValue(shared_from_this());
         return;
@@ -280,7 +328,7 @@ void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result,
         }
         return;
     }
-    assert(partitionIndex < numPartitions_ && partitionIndex >= 0);
+    assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0);
     if (numConsumersCreated_ > 0) {
         numConsumersCreated_--;
     }
@@ -305,6 +353,8 @@ void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
     int consumerIndex = 0;
     unsigned int consumerAlreadyClosed = 0;
     // close successfully subscribed consumers
+    // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
+    // when `state_` is Ready
     for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
         ConsumerImplPtr consumer = *i;
         if (!consumer->isClosed()) {
@@ -459,9 +509,10 @@ void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCal
         callback(ResultConsumerNotInitialized, BrokerConsumerStats());
         return;
     }
+    const auto numPartitions = getNumPartitionsWithLock();
     PartitionedBrokerConsumerStatsPtr statsPtr =
-        std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions_);
-    LatchPtr latchPtr = std::make_shared<Latch>(numPartitions_);
+        std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions);
+    LatchPtr latchPtr = std::make_shared<Latch>(numPartitions);
     ConsumerList consumerList = consumers_;
     lock.unlock();
     for (int i = 0; i < consumerList.size(); i++) {
@@ -498,4 +549,47 @@ void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callb
     callback(ResultOperationNotSupported);
 }
 
+void PartitionedConsumerImpl::runPartitionUpdateTask() {
+    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+    partitionsUpdateTimer_->async_wait(
+        std::bind(&PartitionedConsumerImpl::getPartitionMetadata, shared_from_this()));
+}
+
+void PartitionedConsumerImpl::getPartitionMetadata() {
+    using namespace std::placeholders;
+    lookupServicePtr_->getPartitionMetadataAsync(topicName_)
+        .addListener(std::bind(&PartitionedConsumerImpl::handleGetPartitions, shared_from_this(), _1, _2));
+}
+
+void PartitionedConsumerImpl::handleGetPartitions(Result result,
+                                                  const LookupDataResultPtr& lookupDataResult) {
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        return;
+    }
+
+    if (!result) {
+        const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
+        Lock consumersLock(consumersMutex_);
+        const auto currentNumPartitions = getNumPartitions();
+        assert(currentNumPartitions == consumers_.size());
+        if (newNumPartitions > currentNumPartitions) {
+            LOG_INFO("new partition count: " << newNumPartitions);
+            numPartitions_ = newNumPartitions;
+            const auto config = getSinglePartitionConsumerConfig();
+            for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
+                auto consumer = newInternalConsumer(i, config);
+                consumer->start();
+                consumers_.push_back(consumer);
+            }
+            // `runPartitionUpdateTask()` will be called in `handleSinglePartitionConsumerCreated()`
+            return;
+        }
+    } else {
+        LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
+    }
+
+    runPartitionUpdateTask();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
index fb4b047..02de7bc 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
@@ -84,6 +84,8 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     const ConsumerConfiguration conf_;
     typedef std::vector<ConsumerImplPtr> ConsumerList;
     ConsumerList consumers_;
+    // consumersMutex_ is used to share consumers_ and numPartitions_
+    mutable std::mutex consumersMutex_;
     std::mutex mutex_;
     std::mutex pendingReceiveMutex_;
     PartitionedConsumerState state_;
@@ -94,7 +96,15 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     const std::string topic_;
     const std::string name_;
     const std::string partitionStr_;
+    ExecutorServicePtr internalListenerExecutor_;
+    DeadlineTimerPtr partitionsUpdateTimer_;
+    boost::posix_time::time_duration partitionsUpdateInterval_;
+    LookupServicePtr lookupServicePtr_;
     /* methods */
+    unsigned int getNumPartitions() const;
+    unsigned int getNumPartitionsWithLock() const;
+    ConsumerConfiguration getSinglePartitionConsumerConfig() const;
+    ConsumerImplPtr newInternalConsumer(unsigned int partition, const ConsumerConfiguration& config) const;
     void setState(PartitionedConsumerState state);
     void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback);
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
@@ -109,6 +119,9 @@ class PartitionedConsumerImpl : public ConsumerImplBase,
     Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
     UnAckedMessageTrackerScopedPtr unAckedMessageTrackerPtr_;
     std::queue<ReceiveCallback> pendingReceives_;
+    void runPartitionUpdateTask();
+    void getPartitionMetadata();
+    void handleGetPartitions(const Result result, const LookupDataResultPtr& lookupDataResult);
 };
 typedef std::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
 typedef std::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index f79b5d3..0461ee3 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -50,6 +50,14 @@ PartitionedProducerImpl::PartitionedProducerImpl(ClientImplPtr client, const Top
         std::min(config.getMaxPendingMessages(),
                  (int)(config.getMaxPendingMessagesAcrossPartitions() / numPartitions));
     conf_.setMaxPendingMessages(maxPendingMessagesPerPartition);
+
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    if (partitionsUpdateInterval > 0) {
+        listenerExecutor_ = client_->getListenerExecutorProvider()->get();
+        partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
+        partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
+        lookupServicePtr_ = client_->getLookup();
+    }
 }
 
 MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
@@ -61,7 +69,7 @@ MessageRoutingPolicyPtr PartitionedProducerImpl::getMessageRouter() {
         case ProducerConfiguration::UseSinglePartition:
         default:
             unsigned int random = rand();
-            return std::make_shared<SinglePartitionMessageRouter>(random % topicMetadata_->getNumPartitions(),
+            return std::make_shared<SinglePartitionMessageRouter>(random % getNumPartitions(),
                                                                   conf_.getHashingScheme());
     }
 }
@@ -70,18 +78,34 @@ PartitionedProducerImpl::~PartitionedProducerImpl() {}
 // override
 const std::string& PartitionedProducerImpl::getTopic() const { return topic_; }
 
+unsigned int PartitionedProducerImpl::getNumPartitions() const {
+    return static_cast<unsigned int>(topicMetadata_->getNumPartitions());
+}
+
+unsigned int PartitionedProducerImpl::getNumPartitionsWithLock() const {
+    Lock lock(producersMutex_);
+    return getNumPartitions();
+}
+
+ProducerImplPtr PartitionedProducerImpl::newInternalProducer(unsigned int partition) const {
+    using namespace std::placeholders;
+    std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
+    auto producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
+    producer->getProducerCreatedFuture().addListener(
+        std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated,
+                  const_cast<PartitionedProducerImpl*>(this)->shared_from_this(), _1, _2, partition));
+
+    LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
+    return producer;
+}
+
 // override
 void PartitionedProducerImpl::start() {
-    std::shared_ptr<ProducerImpl> producer;
     // create producer per partition
-    for (unsigned int i = 0; i < topicMetadata_->getNumPartitions(); i++) {
-        std::string topicPartitionName = topicName_->getTopicPartitionName(i);
-        producer = std::make_shared<ProducerImpl>(client_, topicPartitionName, conf_);
-        producer->getProducerCreatedFuture().addListener(
-            std::bind(&PartitionedProducerImpl::handleSinglePartitionProducerCreated, shared_from_this(),
-                      std::placeholders::_1, std::placeholders::_2, i));
-        producers_.push_back(producer);
-        LOG_DEBUG("Creating Producer for single Partition - " << topicPartitionName);
+    // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
+    // when `state_` is Ready
+    for (unsigned int i = 0; i < getNumPartitions(); i++) {
+        producers_.push_back(newInternalProducer(i));
     }
 
     for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
@@ -100,7 +124,8 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
         // Ignore, we have already informed client that producer creation failed
         return;
     }
-    assert(numProducersCreated_ <= topicMetadata_->getNumPartitions());
+    const auto numPartitions = getNumPartitionsWithLock();
+    assert(numProducersCreated_ <= numPartitions);
     if (result != ResultOk) {
         state_ = Failed;
         lock.unlock();
@@ -110,10 +135,14 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
         return;
     }
 
-    assert(partitionIndex <= topicMetadata_->getNumPartitions());
+    assert(partitionIndex <= numPartitions);
     numProducersCreated_++;
-    if (numProducersCreated_ == topicMetadata_->getNumPartitions()) {
+    if (numProducersCreated_ == numPartitions) {
+        state_ = Ready;
         lock.unlock();
+        if (partitionsUpdateTimer_) {
+            runPartitionUpdateTask();
+        }
         partitionedProducerCreatedPromise_.setValue(shared_from_this());
     }
 }
@@ -121,8 +150,9 @@ void PartitionedProducerImpl::handleSinglePartitionProducerCreated(Result result
 // override
 void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callback) {
     // get partition for this message from router policy
+    Lock producersLock(producersMutex_);
     short partition = (short)(routerPolicy_->getPartition(msg, *topicMetadata_));
-    if (partition >= topicMetadata_->getNumPartitions() || partition >= producers_.size()) {
+    if (partition >= getNumPartitions() || partition >= producers_.size()) {
         LOG_ERROR("Got Invalid Partition for message from Router Policy, Partition - " << partition);
         // change me: abort or notify failure in callback?
         //          change to appropriate error if callback
@@ -130,7 +160,8 @@ void PartitionedProducerImpl::sendAsync(const Message& msg, SendCallback callbac
         return;
     }
     // find a producer for that partition, index should start from 0
-    ProducerImplPtr& producer = producers_[partition];
+    ProducerImplPtr producer = producers_[partition];
+    producersLock.unlock();
     // send message on that partition
     producer->sendAsync(msg, callback);
 }
@@ -145,10 +176,12 @@ void PartitionedProducerImpl::setState(const PartitionedProducerState state) {
 }
 
 const std::string& PartitionedProducerImpl::getProducerName() const {
+    Lock producersLock(producersMutex_);
     return producers_[0]->getProducerName();
 }
 
 const std::string& PartitionedProducerImpl::getSchemaVersion() const {
+    Lock producersLock(producersMutex_);
     // Since the schema is atomically assigned on the partitioned-topic,
     // it's guaranteed that all the partitions will have the same schema version.
     return producers_[0]->getSchemaVersion();
@@ -156,6 +189,7 @@ const std::string& PartitionedProducerImpl::getSchemaVersion() const {
 
 int64_t PartitionedProducerImpl::getLastSequenceId() const {
     int64_t currentMax = -1L;
+    Lock producersLock(producersMutex_);
     for (int i = 0; i < producers_.size(); i++) {
         currentMax = std::max(currentMax, producers_[i]->getLastSequenceId());
     }
@@ -168,9 +202,13 @@ int64_t PartitionedProducerImpl::getLastSequenceId() const {
  * create one or many producers for partitions. So, we have to notify with ERROR on createProducerFailure
  */
 void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
+    setState(Closing);
+
     int producerIndex = 0;
     unsigned int producerAlreadyClosed = 0;
 
+    // Here we don't need `producersMutex` to protect `producers_`, because `producers_` can only be increased
+    // when `state_` is Ready
     for (ProducerList::const_iterator i = producers_.begin(); i != producers_.end(); i++) {
         ProducerImplPtr prod = *i;
         if (!prod->isClosed()) {
@@ -181,6 +219,7 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
             producerAlreadyClosed++;
         }
     }
+    const auto numProducers = producers_.size();
 
     /*
      * No need to set state since:-
@@ -191,7 +230,7 @@ void PartitionedProducerImpl::closeAsync(CloseCallback closeCallback) {
      * c. If closeAsync called due to failure in creating just one sub producer then state is set by
      * handleSinglePartitionProducerCreated
      */
-    if (producerAlreadyClosed == producers_.size() && closeCallback) {
+    if (producerAlreadyClosed == numProducers && closeCallback) {
         setState(Closed);
         closeCallback(ResultOk);
     }
@@ -214,7 +253,7 @@ void PartitionedProducerImpl::handleSinglePartitionProducerClose(Result result,
         }
         return;
     }
-    assert(partitionIndex < topicMetadata_->getNumPartitions());
+    assert(partitionIndex < getNumPartitionsWithLock());
     if (numProducersCreated_ > 0) {
         numProducersCreated_--;
     }
@@ -244,6 +283,7 @@ Future<Result, ProducerImplBaseWeakPtr> PartitionedProducerImpl::getProducerCrea
 bool PartitionedProducerImpl::isClosed() { return state_ == Closed; }
 
 void PartitionedProducerImpl::triggerFlush() {
+    Lock producersLock(producersMutex_);
     for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) {
         (*prod)->triggerFlush();
     }
@@ -267,9 +307,13 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
         return;
     }
 
-    FlushCallback subFlushCallback = [this, callback](Result result) {
+    Lock producersLock(producersMutex_);
+    const int numProducers = static_cast<int>(producers_.size());
+    FlushCallback subFlushCallback = [this, callback, numProducers](Result result) {
+        // We shouldn't lock `producersMutex_` here because `subFlushCallback` may be called in
+        // `ProducerImpl::flushAsync`, and then deadlock occurs.
         int previous = flushedPartitions_.fetch_add(1);
-        if (previous == producers_.size() - 1) {
+        if (previous == numProducers - 1) {
             flushedPartitions_.store(0);
             flushPromise_->setValue(true);
             callback(result);
@@ -282,4 +326,47 @@ void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
     }
 }
 
+void PartitionedProducerImpl::runPartitionUpdateTask() {
+    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+    partitionsUpdateTimer_->async_wait(
+        std::bind(&PartitionedProducerImpl::getPartitionMetadata, shared_from_this()));
+}
+
+void PartitionedProducerImpl::getPartitionMetadata() {
+    using namespace std::placeholders;
+    lookupServicePtr_->getPartitionMetadataAsync(topicName_)
+        .addListener(std::bind(&PartitionedProducerImpl::handleGetPartitions, shared_from_this(), _1, _2));
+}
+
+void PartitionedProducerImpl::handleGetPartitions(Result result,
+                                                  const LookupDataResultPtr& lookupDataResult) {
+    Lock stateLock(mutex_);
+    if (state_ != Ready) {
+        return;
+    }
+
+    if (!result) {
+        const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
+        Lock producersLock(producersMutex_);
+        const auto currentNumPartitions = getNumPartitions();
+        assert(currentNumPartitions == producers_.size());
+        if (newNumPartitions > currentNumPartitions) {
+            LOG_INFO("new partition count: " << newNumPartitions);
+            topicMetadata_.reset(new TopicMetadataImpl(newNumPartitions));
+
+            for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
+                auto producer = newInternalProducer(i);
+                producer->start();
+                producers_.push_back(producer);
+            }
+            // `runPartitionUpdateTask()` will be called in `handleSinglePartitionProducerCreated()`
+            return;
+        }
+    } else {
+        LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
+    }
+
+    runPartitionUpdateTask();
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index 72be5fe..e6b511d 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -107,6 +107,14 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     ProducerList producers_;
 
+    // producersMutex_ is used to share producers_ and topicMetadata_
+    mutable std::mutex producersMutex_;
+
+    unsigned int getNumPartitions() const;
+    unsigned int getNumPartitionsWithLock() const;
+
+    ProducerImplPtr newInternalProducer(unsigned int partition) const;
+
     MessageRoutingPolicyPtr routerPolicy_;
 
     // mutex_ is used to share state_, and numProducersCreated_
@@ -121,6 +129,15 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     std::atomic<int> flushedPartitions_;
     std::shared_ptr<Promise<Result, bool_type>> flushPromise_;
+
+    ExecutorServicePtr listenerExecutor_;
+    DeadlineTimerPtr partitionsUpdateTimer_;
+    boost::posix_time::time_duration partitionsUpdateInterval_;
+    LookupServicePtr lookupServicePtr_;
+
+    void runPartitionUpdateTask();
+    void getPartitionMetadata();
+    void handleGetPartitions(const Result result, const LookupDataResultPtr& partitionMetadata);
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/tests/HttpHelper.cc b/pulsar-client-cpp/tests/HttpHelper.cc
index c3d7333..c4118e6 100644
--- a/pulsar-client-cpp/tests/HttpHelper.cc
+++ b/pulsar-client-cpp/tests/HttpHelper.cc
@@ -30,7 +30,9 @@ static int makeRequest(const std::string& method, const std::string& url, const
     curl_easy_setopt(curl, CURLOPT_HTTPHEADER, list);
     curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
     curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, method.c_str());
-    curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
+    if (!body.empty()) {
+        curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str());
+    }
     int res = curl_easy_perform(curl);
     curl_slist_free_all(list); /* free the list again */
 
@@ -49,3 +51,5 @@ int makePutRequest(const std::string& url, const std::string& body) { return mak
 int makePostRequest(const std::string& url, const std::string& body) {
     return makeRequest("POST", url, body);
 }
+
+int makeDeleteRequest(const std::string& url) { return makeRequest("DELETE", url, ""); }
diff --git a/pulsar-client-cpp/tests/HttpHelper.h b/pulsar-client-cpp/tests/HttpHelper.h
index 31abc2d..68119a7 100644
--- a/pulsar-client-cpp/tests/HttpHelper.h
+++ b/pulsar-client-cpp/tests/HttpHelper.h
@@ -23,5 +23,6 @@
 
 int makePutRequest(const std::string& url, const std::string& body);
 int makePostRequest(const std::string& url, const std::string& body);
+int makeDeleteRequest(const std::string& url);
 
 #endif /* end of include guard: HTTP_HELPER */
diff --git a/pulsar-client-cpp/tests/PartitionsUpdateTest.cc b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
new file mode 100644
index 0000000..e1bf68c
--- /dev/null
+++ b/pulsar-client-cpp/tests/PartitionsUpdateTest.cc
@@ -0,0 +1,176 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+#include <pulsar/Client.h>
+
+#include <set>
+#include <chrono>
+#include <thread>
+#include <memory>
+
+#include "HttpHelper.h"
+
+using namespace pulsar;
+
+static const std::string serviceUrl = "pulsar://localhost:6650";
+static const std::string adminUrl = "http://localhost:8080/";
+
+static const std::string topicNameSuffix = "public/default/partitions-update-test-topic";
+static const std::string topicName = "persistent://" + topicNameSuffix;
+static const std::string topicOperateUrl =
+    adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions";
+
+static ClientConfiguration newClientConfig(bool enablePartitionsUpdate) {
+    ClientConfiguration clientConfig;
+    if (enablePartitionsUpdate) {
+        clientConfig.setPartititionsUpdateInterval(1);  // 1s
+    } else {
+        clientConfig.setPartititionsUpdateInterval(0);  // disable
+    }
+    return clientConfig;
+}
+
+// In round robin routing mode, if N messages were sent to a topic with N partitions, each partition must have
+// received 1 message. So we check whether producer/consumer have increased along with partitions by checking
+// partitions' count of N messages.
+// Use std::set because it doesn't allow repeated elements.
+class PartitionsSet {
+   public:
+    size_t size() const { return names_.size(); }
+
+    Result initProducer(bool enablePartitionsUpdate) {
+        clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
+        const auto producerConfig =
+            ProducerConfiguration().setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+        return clientForProducer_->createProducer(topicName, producerConfig, producer_);
+    }
+
+    Result initConsumer(bool enablePartitionsUpdate) {
+        clientForConsumer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
+        return clientForConsumer_->subscribe(topicName, "SubscriptionName", consumer_);
+    }
+
+    void close() {
+        producer_.close();
+        clientForProducer_->close();
+        consumer_.close();
+        clientForConsumer_->close();
+    }
+
+    void doSendAndReceive(int numMessagesSend, int numMessagesReceive) {
+        names_.clear();
+        for (int i = 0; i < numMessagesSend; i++) {
+            producer_.send(MessageBuilder().setContent("a").build());
+        }
+        while (numMessagesReceive > 0) {
+            Message msg;
+            if (consumer_.receive(msg, 100) == ResultOk) {
+                names_.emplace(msg.getTopicName());
+                consumer_.acknowledge(msg);
+                numMessagesReceive--;
+            }
+        }
+    }
+
+   private:
+    std::set<std::string> names_;
+
+    std::unique_ptr<Client> clientForProducer_;
+    Producer producer_;
+
+    std::unique_ptr<Client> clientForConsumer_;
+    Consumer consumer_;
+};
+
+static void waitForPartitionsUpdated() {
+    // Assume producer and consumer have updated partitions in 3 seconds if enabled
+    std::this_thread::sleep_for(std::chrono::seconds(3));
+}
+
+TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) {
+    ClientConfiguration clientConfig;
+    ASSERT_EQ(60, clientConfig.getPartitionsUpdateInterval());
+
+    clientConfig.setPartititionsUpdateInterval(0);
+    ASSERT_EQ(0, clientConfig.getPartitionsUpdateInterval());
+
+    clientConfig.setPartititionsUpdateInterval(1);
+    ASSERT_EQ(1, clientConfig.getPartitionsUpdateInterval());
+
+    clientConfig.setPartititionsUpdateInterval(-1);
+    ASSERT_EQ(static_cast<unsigned int>(-1), clientConfig.getPartitionsUpdateInterval());
+}
+
+TEST(PartitionsUpdateTest, testPartitionsUpdate) {
+    // Ensure `topicName` doesn't exist before created
+    makeDeleteRequest(topicOperateUrl);
+    // Create a 2 partitions topic
+    int res = makePutRequest(topicOperateUrl, "2");
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    PartitionsSet partitionsSet;
+
+    // 1. Both producer and consumer enable partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(true));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true));
+
+    res = makePostRequest(topicOperateUrl, "3");  // update partitions to 3
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(3, 3);
+    ASSERT_EQ(3, partitionsSet.size());
+    partitionsSet.close();
+
+    // 2. Only producer enables partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(true));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false));
+
+    res = makePostRequest(topicOperateUrl, "5");  // update partitions to 5
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(5, 3);  // can't consume partition-3,4
+    ASSERT_EQ(3, partitionsSet.size());
+    partitionsSet.close();
+
+    // 3. Only consumer enables partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(false));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(true));
+
+    res = makePostRequest(topicOperateUrl, "7");  // update partitions to 7
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(7, 7);
+    ASSERT_EQ(5, partitionsSet.size());
+    partitionsSet.close();
+
+    // 4. Both producer and consumer disables partitions update
+    ASSERT_EQ(ResultOk, partitionsSet.initProducer(false));
+    ASSERT_EQ(ResultOk, partitionsSet.initConsumer(false));
+
+    res = makePostRequest(topicOperateUrl, "10");  // update partitions to 10
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+    waitForPartitionsUpdated();
+
+    partitionsSet.doSendAndReceive(10, 10);
+    ASSERT_EQ(7, partitionsSet.size());
+    partitionsSet.close();
+}