You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/08/31 02:15:10 UTC

[pulsar] branch master updated: [refactor][client c++] Delete PartitionedConsumerImpl, use MultiTopicsConsumerImpl instead (#16969)

This is an automated email from the ASF dual-hosted git repository.

zike pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a3ae23427b [refactor][client c++] Delete PartitionedConsumerImpl, use MultiTopicsConsumerImpl instead (#16969)
3a3ae23427b is described below

commit 3a3ae23427bc9c8642b3ec9a081ce01000339875
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Wed Aug 31 10:15:02 2022 +0800

    [refactor][client c++] Delete PartitionedConsumerImpl, use MultiTopicsConsumerImpl instead (#16969)
    
    ### Motivation
    
    The code of the C++ client is still relatively old, after #1365, the java client used `MultiTopicConsumerImpl` instead of `PartitionedConsumerImpl`.
    
    ### Modifications
    - Delete PartitionedConsumerImpl, use MultiTopicsConsumerImpl instead.
    - For MultiTopicConsumerImpl, support seek message and topic partition listener feature.
---
 pulsar-client-cpp/.gitignore                       |   1 +
 pulsar-client-cpp/include/pulsar/Consumer.h        |   1 -
 pulsar-client-cpp/include/pulsar/Message.h         |   1 -
 pulsar-client-cpp/include/pulsar/MessageId.h       |   1 -
 pulsar-client-cpp/lib/ClientImpl.cc                |   6 +-
 pulsar-client-cpp/lib/ConsumerImpl.h               |   1 -
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc   | 151 ++++-
 pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h    |  27 +-
 .../lib/PartitionedBrokerConsumerStatsImpl.cc      | 163 ------
 .../lib/PartitionedBrokerConsumerStatsImpl.h       |  90 ---
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc   | 615 ---------------------
 pulsar-client-cpp/lib/PartitionedConsumerImpl.h    | 137 -----
 pulsar-client-cpp/tests/BasicEndToEndTest.cc       |   2 +-
 pulsar-client-cpp/tests/ConsumerStatsTest.cc       |   7 +-
 pulsar-client-cpp/tests/ConsumerTest.cc            |  10 +-
 pulsar-client-cpp/tests/PulsarFriend.h             |   5 -
 16 files changed, 164 insertions(+), 1054 deletions(-)

diff --git a/pulsar-client-cpp/.gitignore b/pulsar-client-cpp/.gitignore
index 0d8d323e7ff..349be85a476 100644
--- a/pulsar-client-cpp/.gitignore
+++ b/pulsar-client-cpp/.gitignore
@@ -71,6 +71,7 @@ apidocs/
 generated/
 
 # CMAKE
+.cmake
 Makefile
 cmake_install.cmake
 CMakeFiles
diff --git a/pulsar-client-cpp/include/pulsar/Consumer.h b/pulsar-client-cpp/include/pulsar/Consumer.h
index f1e180aea2d..6c0ab27b06c 100644
--- a/pulsar-client-cpp/include/pulsar/Consumer.h
+++ b/pulsar-client-cpp/include/pulsar/Consumer.h
@@ -407,7 +407,6 @@ class PULSAR_PUBLIC Consumer {
 
     friend class PulsarFriend;
     friend class PulsarWrapper;
-    friend class PartitionedConsumerImpl;
     friend class MultiTopicsConsumerImpl;
     friend class ConsumerImpl;
     friend class ClientImpl;
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index 9cea48f2658..935236bd5bb 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -179,7 +179,6 @@ class PULSAR_PUBLIC Message {
     Message(const MessageId& messageId, proto::MessageMetadata& metadata, SharedBuffer& payload,
             proto::SingleMessageMetadata& singleMetadata, const std::string& topicName);
     friend class PartitionedProducerImpl;
-    friend class PartitionedConsumerImpl;
     friend class MultiTopicsConsumerImpl;
     friend class MessageBuilder;
     friend class ConsumerImpl;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index de64d1d8224..06be790c1ea 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -94,7 +94,6 @@ class PULSAR_PUBLIC MessageId {
     friend class MessageImpl;
     friend class Commands;
     friend class PartitionedProducerImpl;
-    friend class PartitionedConsumerImpl;
     friend class MultiTopicsConsumerImpl;
     friend class UnAckedMessageTrackerEnabled;
     friend class BatchAcknowledgementTracker;
diff --git a/pulsar-client-cpp/lib/ClientImpl.cc b/pulsar-client-cpp/lib/ClientImpl.cc
index c68d6a951b4..2c983421e67 100644
--- a/pulsar-client-cpp/lib/ClientImpl.cc
+++ b/pulsar-client-cpp/lib/ClientImpl.cc
@@ -23,7 +23,6 @@
 #include "ProducerImpl.h"
 #include "ReaderImpl.h"
 #include "PartitionedProducerImpl.h"
-#include "PartitionedConsumerImpl.h"
 #include "MultiTopicsConsumerImpl.h"
 #include "PatternMultiTopicsConsumerImpl.h"
 #include "TimeUtils.h"
@@ -369,8 +368,9 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
                 callback(ResultInvalidConfiguration, Consumer());
                 return;
             }
-            consumer = std::make_shared<PartitionedConsumerImpl>(
-                shared_from_this(), subscriptionName, topicName, partitionMetadata->getPartitions(), conf);
+            consumer = std::make_shared<MultiTopicsConsumerImpl>(shared_from_this(), topicName,
+                                                                 partitionMetadata->getPartitions(),
+                                                                 subscriptionName, conf, lookupServicePtr_);
         } else {
             auto consumerImpl = std::make_shared<ConsumerImpl>(shared_from_this(), topicName->toString(),
                                                                subscriptionName, conf);
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.h b/pulsar-client-cpp/lib/ConsumerImpl.h
index 78140f84b54..70fda0170cc 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.h
+++ b/pulsar-client-cpp/lib/ConsumerImpl.h
@@ -307,7 +307,6 @@ class ConsumerImpl : public ConsumerImplBase,
 
     // these two declared friend to access setNegativeAcknowledgeEnabledForTesting
     friend class MultiTopicsConsumerImpl;
-    friend class PartitionedConsumerImpl;
 
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
index ad30d5cca50..a9bdbea9082 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.cc
@@ -17,6 +17,7 @@
  * under the License.
  */
 #include "MultiTopicsConsumerImpl.h"
+#include "MultiResultCallback.h"
 
 DECLARE_LOG_OBJECT()
 
@@ -25,7 +26,7 @@ using namespace pulsar;
 MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                                                  const std::string& subscriptionName, TopicNamePtr topicName,
                                                  const ConsumerConfiguration& conf,
-                                                 const LookupServicePtr lookupServicePtr)
+                                                 LookupServicePtr lookupServicePtr)
     : client_(client),
       subscriptionName_(subscriptionName),
       topic_(topicName ? topicName->toString() : "EmptyTopics"),
@@ -52,6 +53,12 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
     } else {
         unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
     }
+    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
+    if (partitionsUpdateInterval > 0) {
+        partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
+        partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
+        lookupServicePtr_ = client_->getLookup();
+    }
 }
 
 void MultiTopicsConsumerImpl::start() {
@@ -125,25 +132,32 @@ Future<Result, Consumer> MultiTopicsConsumerImpl::subscribeOneTopicAsync(const s
     }
 
     // subscribe for each partition, when all partitions completed, complete promise
-    lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(std::bind(
-        &MultiTopicsConsumerImpl::subscribeTopicPartitions, shared_from_this(), std::placeholders::_1,
-        std::placeholders::_2, topicName, subscriptionName_, conf_, topicPromise));
+    Lock lock(mutex_);
+    auto entry = topicsPartitions_.find(topic);
+    if (entry == topicsPartitions_.end()) {
+        lock.unlock();
+        lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+            [this, topicName, topicPromise](Result result, const LookupDataResultPtr& lookupDataResult) {
+                if (result != ResultOk) {
+                    LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- "
+                              << consumerStr_ << " result: " << result)
+                    topicPromise->setFailed(result);
+                    return;
+                }
+                subscribeTopicPartitions(lookupDataResult->getPartitions(), topicName, subscriptionName_,
+                                         topicPromise);
+            });
+    } else {
+        auto numPartitions = entry->second;
+        lock.unlock();
+        subscribeTopicPartitions(numPartitions, topicName, subscriptionName_, topicPromise);
+    }
     return topicPromise->getFuture();
 }
 
-void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
-                                                       const LookupDataResultPtr partitionMetadata,
-                                                       TopicNamePtr topicName,
+void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName,
                                                        const std::string& consumerName,
-                                                       ConsumerConfiguration conf,
                                                        ConsumerSubResultPromisePtr topicSubResultPromise) {
-    if (result != ResultOk) {
-        LOG_ERROR("Error Checking/Getting Partition Metadata while MultiTopics Subscribing- "
-                  << consumerStr_ << " result: " << result)
-        topicSubResultPromise->setFailed(result);
-        return;
-    }
-
     std::shared_ptr<ConsumerImpl> consumer;
     ConsumerConfiguration config = conf_.clone();
     ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
@@ -151,7 +165,6 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
     config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
                                         std::placeholders::_1, std::placeholders::_2));
 
-    int numPartitions = partitionMetadata->getPartitions();
     int partitions = numPartitions == 0 ? 1 : numPartitions;
 
     // Apply total limit of receiver queue size across partitions
@@ -160,7 +173,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(const Result result,
                  (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / partitions)));
 
     Lock lock(mutex_);
-    topicsPartitions_.insert(std::make_pair(topicName->toString(), partitions));
+    topicsPartitions_[topicName->toString()] = partitions;
     lock.unlock();
     numberTopicPartitions_->fetch_add(partitions);
 
@@ -214,10 +227,13 @@ void MultiTopicsConsumerImpl::handleSingleConsumerCreated(
         return;
     }
 
-    LOG_DEBUG("Successfully Subscribed to a single partition of topic in TopicsConsumer. "
-              << "Partitions need to create - " << previous - 1);
+    LOG_INFO("Successfully Subscribed to a single partition of topic in TopicsConsumer. "
+             << "Partitions need to create : " << previous - 1);
 
     if (partitionsNeedCreate->load() == 0) {
+        if (partitionsUpdateTimer_) {
+            runPartitionUpdateTask();
+        }
         topicSubResultPromise->setValue(Consumer(shared_from_this()));
     }
 }
@@ -274,13 +290,17 @@ void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result,
 }
 
 void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) {
+    Lock lock(mutex_);
     std::map<std::string, int>::iterator it = topicsPartitions_.find(topic);
     if (it == topicsPartitions_.end()) {
+        lock.unlock();
         LOG_ERROR("TopicsConsumer does not subscribe topic : " << topic << " subscription - "
                                                                << subscriptionName_);
         callback(ResultTopicNotFound);
         return;
     }
+    int numberPartitions = it->second;
+    lock.unlock();
 
     const auto state = state_.load();
     if (state == Closing || state == Closed) {
@@ -295,7 +315,6 @@ void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic,
         LOG_ERROR("TopicName invalid: " << topic);
         callback(ResultUnknownError);
     }
-    int numberPartitions = it->second;
     std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0);
 
     for (int i = 0; i < numberPartitions; i++) {
@@ -683,7 +702,15 @@ void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback c
 }
 
 void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    if (state_ != Ready) {
+        callback(ResultAlreadyClosed);
+        return;
+    }
+
+    MultiResultCallback multiResultCallback(callback, consumers_.size());
+    consumers_.forEachValue([&timestamp, &multiResultCallback](ConsumerImplPtr consumer) {
+        consumer->seekAsync(timestamp, multiResultCallback);
+    });
 }
 
 void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
@@ -711,3 +738,85 @@ uint64_t MultiTopicsConsumerImpl::getNumberOfConnectedConsumer() {
     });
     return numberOfConnectedConsumer;
 }
+void MultiTopicsConsumerImpl::runPartitionUpdateTask() {
+    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
+    auto self = shared_from_this();
+    partitionsUpdateTimer_->async_wait([self](const boost::system::error_code& ec) {
+        // If two requests call runPartitionUpdateTask at the same time, the timer will fail, and it
+        // cannot continue at this time, and the request needs to be ignored.
+        if (!ec) {
+            self->topicPartitionUpdate();
+        }
+    });
+}
+void MultiTopicsConsumerImpl::topicPartitionUpdate() {
+    using namespace std::placeholders;
+    Lock lock(mutex_);
+    auto topicsPartitions = topicsPartitions_;
+    lock.unlock();
+    for (const auto& item : topicsPartitions) {
+        auto topicName = TopicName::get(item.first);
+        auto currentNumPartitions = item.second;
+        lookupServicePtr_->getPartitionMetadataAsync(topicName).addListener(
+            std::bind(&MultiTopicsConsumerImpl::handleGetPartitions, shared_from_this(), topicName,
+                      std::placeholders::_1, std::placeholders::_2, currentNumPartitions));
+    }
+}
+void MultiTopicsConsumerImpl::handleGetPartitions(TopicNamePtr topicName, Result result,
+                                                  const LookupDataResultPtr& lookupDataResult,
+                                                  int currentNumPartitions) {
+    if (state_ != Ready) {
+        return;
+    }
+    if (!result) {
+        const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
+        if (newNumPartitions > currentNumPartitions) {
+            LOG_INFO("new partition count: " << newNumPartitions
+                                             << " current partition count: " << currentNumPartitions);
+            auto partitionsNeedCreate =
+                std::make_shared<std::atomic<int>>(newNumPartitions - currentNumPartitions);
+            ConsumerSubResultPromisePtr topicPromise = std::make_shared<Promise<Result, Consumer>>();
+            Lock lock(mutex_);
+            topicsPartitions_[topicName->toString()] = newNumPartitions;
+            lock.unlock();
+            numberTopicPartitions_->fetch_add(newNumPartitions - currentNumPartitions);
+            for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
+                subscribeSingleNewConsumer(newNumPartitions, topicName, i, topicPromise,
+                                           partitionsNeedCreate);
+            }
+            // `runPartitionUpdateTask()` will be called in `handleSingleConsumerCreated()`
+            return;
+        }
+    } else {
+        LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
+    }
+    runPartitionUpdateTask();
+}
+
+void MultiTopicsConsumerImpl::subscribeSingleNewConsumer(
+    int numPartitions, TopicNamePtr topicName, int partitionIndex,
+    ConsumerSubResultPromisePtr topicSubResultPromise,
+    std::shared_ptr<std::atomic<int>> partitionsNeedCreate) {
+    ConsumerConfiguration config = conf_.clone();
+    ExecutorServicePtr internalListenerExecutor = client_->getPartitionListenerExecutorProvider()->get();
+    config.setMessageListener(std::bind(&MultiTopicsConsumerImpl::messageReceived, shared_from_this(),
+                                        std::placeholders::_1, std::placeholders::_2));
+
+    // Apply total limit of receiver queue size across partitions
+    config.setReceiverQueueSize(
+        std::min(conf_.getReceiverQueueSize(),
+                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions)));
+
+    std::string topicPartitionName = topicName->getTopicPartitionName(partitionIndex);
+
+    auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
+                                                   internalListenerExecutor, true, Partitioned);
+    consumer->getConsumerCreatedFuture().addListener(
+        std::bind(&MultiTopicsConsumerImpl::handleSingleConsumerCreated, shared_from_this(),
+                  std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
+    consumer->setPartitionIndex(partitionIndex);
+    consumer->start();
+    consumers_.emplace(topicPartitionName, consumer);
+    LOG_INFO("Add Creating Consumer for - " << topicPartitionName << " - " << consumerStr_
+                                            << " consumerSize: " << consumers_.size());
+}
diff --git a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
index 0f111110c44..95c24f68c5b 100644
--- a/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
+++ b/pulsar-client-cpp/lib/MultiTopicsConsumerImpl.h
@@ -51,7 +51,14 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     };
     MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                             const std::string& subscriptionName, TopicNamePtr topicName,
-                            const ConsumerConfiguration& conf, const LookupServicePtr lookupServicePtr_);
+                            const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
+    MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
+                            const std::string& subscriptionName, const ConsumerConfiguration& conf,
+                            LookupServicePtr lookupServicePtr)
+        : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
+                                  lookupServicePtr) {
+        topicsPartitions_[topicName->toString()] = numPartitions;
+    }
     ~MultiTopicsConsumerImpl();
     // overrided methods from ConsumerImplBase
     Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() override;
@@ -101,14 +108,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     std::mutex pendingReceiveMutex_;
     std::atomic<MultiTopicsConsumerState> state_{Pending};
     BlockingQueue<Message> messages_;
-    ExecutorServicePtr listenerExecutor_;
+    const ExecutorServicePtr listenerExecutor_;
     MessageListener messageListener_;
+    DeadlineTimerPtr partitionsUpdateTimer_;
+    boost::posix_time::time_duration partitionsUpdateInterval_;
     LookupServicePtr lookupServicePtr_;
     std::shared_ptr<std::atomic<int>> numberTopicPartitions_;
     std::atomic<Result> failedResult{ResultOk};
     Promise<Result, ConsumerImplBaseWeakPtr> multiTopicsConsumerCreatedPromise_;
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
-    const std::vector<std::string>& topics_;
+    const std::vector<std::string> topics_;
     std::queue<ReceiveCallback> pendingReceives_;
 
     /* methods */
@@ -122,9 +131,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
 
     void handleOneTopicSubscribed(Result result, Consumer consumer, const std::string& topic,
                                   std::shared_ptr<std::atomic<int>> topicsNeedCreate);
-    void subscribeTopicPartitions(const Result result, const LookupDataResultPtr partitionMetadata,
-                                  TopicNamePtr topicName, const std::string& consumerName,
-                                  ConsumerConfiguration conf,
+    void subscribeTopicPartitions(int numPartitions, TopicNamePtr topicName, const std::string& consumerName,
                                   ConsumerSubResultPromisePtr topicSubResultPromise);
     void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
                                      std::shared_ptr<std::atomic<int>> partitionsNeedCreate,
@@ -134,11 +141,19 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase,
     void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed,
                                          int numberPartitions, TopicNamePtr topicNamePtr,
                                          std::string& topicPartitionName, ResultCallback callback);
+    void runPartitionUpdateTask();
+    void topicPartitionUpdate();
+    void handleGetPartitions(TopicNamePtr topicName, Result result,
+                             const LookupDataResultPtr& lookupDataResult, int currentNumPartitions);
+    void subscribeSingleNewConsumer(int numPartitions, TopicNamePtr topicName, int partitionIndex,
+                                    ConsumerSubResultPromisePtr topicSubResultPromise,
+                                    std::shared_ptr<std::atomic<int>> partitionsNeedCreate);
 
    private:
     void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
 
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
+    FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
 };
 
 typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc
deleted file mode 100644
index 9d5965b24bd..00000000000
--- a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.cc
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include <lib/PartitionedBrokerConsumerStatsImpl.h>
-#include <boost/date_time/local_time/local_time.hpp>
-#include <algorithm>
-#include <numeric>
-
-namespace pulsar {
-
-const std::string PartitionedBrokerConsumerStatsImpl::DELIMITER = ";";
-
-PartitionedBrokerConsumerStatsImpl::PartitionedBrokerConsumerStatsImpl(size_t size) {
-    statsList_.resize(size);
-}
-
-bool PartitionedBrokerConsumerStatsImpl::isValid() const {
-    bool isValid = true;
-    for (int i = 0; i < statsList_.size(); i++) {
-        isValid &= statsList_[i].isValid();
-    }
-    return isValid;
-}
-
-std::ostream& operator<<(std::ostream& os, const PartitionedBrokerConsumerStatsImpl& obj) {
-    os << "\nPartitionedBrokerConsumerStatsImpl ["
-       << "validTill_ = " << obj.isValid() << ", msgRateOut_ = " << obj.getMsgRateOut()
-       << ", msgThroughputOut_ = " << obj.getMsgThroughputOut()
-       << ", msgRateRedeliver_ = " << obj.getMsgRateRedeliver()
-       << ", consumerName_ = " << obj.getConsumerName()
-       << ", availablePermits_ = " << obj.getAvailablePermits()
-       << ", unackedMessages_ = " << obj.getUnackedMessages()
-       << ", blockedConsumerOnUnackedMsgs_ = " << obj.isBlockedConsumerOnUnackedMsgs()
-       << ", address_ = " << obj.getAddress() << ", connectedSince_ = " << obj.getConnectedSince()
-       << ", type_ = " << obj.getType() << ", msgRateExpired_ = " << obj.getMsgRateExpired()
-       << ", msgBacklog_ = " << obj.getMsgBacklog() << "]";
-    return os;
-}
-
-double PartitionedBrokerConsumerStatsImpl::getMsgRateOut() const {
-    double sum = 0;
-    for (int i = 0; i < statsList_.size(); i++) {
-        sum += statsList_[i].getMsgRateOut();
-    }
-    return sum;
-}
-
-double PartitionedBrokerConsumerStatsImpl::getMsgThroughputOut() const {
-    double sum = 0;
-    for (int i = 0; i < statsList_.size(); i++) {
-        sum += statsList_[i].getMsgThroughputOut();
-    }
-    return sum;
-}
-
-double PartitionedBrokerConsumerStatsImpl::getMsgRateRedeliver() const {
-    double sum = 0;
-    for (int i = 0; i < statsList_.size(); i++) {
-        sum += statsList_[i].getMsgRateRedeliver();
-    }
-    return sum;
-}
-
-const std::string PartitionedBrokerConsumerStatsImpl::getConsumerName() const {
-    std::string str;
-    for (int i = 0; i < statsList_.size(); i++) {
-        str += statsList_[i].getConsumerName() + DELIMITER;
-    }
-    return str;
-}
-
-uint64_t PartitionedBrokerConsumerStatsImpl::getAvailablePermits() const {
-    uint64_t sum = 0;
-    for (int i = 0; i < statsList_.size(); i++) {
-        sum += statsList_[i].getAvailablePermits();
-    }
-    return sum;
-}
-
-uint64_t PartitionedBrokerConsumerStatsImpl::getUnackedMessages() const {
-    uint64_t sum = 0;
-    for (int i = 0; i < statsList_.size(); i++) {
-        sum += statsList_[i].getUnackedMessages();
-    }
-    return sum;
-}
-
-bool PartitionedBrokerConsumerStatsImpl::isBlockedConsumerOnUnackedMsgs() const {
-    if (statsList_.size() == 0) {
-        return false;
-    }
-
-    bool isValid = true;
-    for (int i = 0; i < statsList_.size(); i++) {
-        isValid &= statsList_[i].isValid();
-    }
-    return isValid;
-}
-
-const std::string PartitionedBrokerConsumerStatsImpl::getAddress() const {
-    std::string str;
-    for (int i = 0; i < statsList_.size(); i++) {
-        str += statsList_[i].getAddress() + DELIMITER;
-    }
-    return str;
-}
-
-const std::string PartitionedBrokerConsumerStatsImpl::getConnectedSince() const {
-    std::string str;
-    for (int i = 0; i < statsList_.size(); i++) {
-        str += statsList_[i].getConnectedSince() + DELIMITER;
-    }
-    return str;
-}
-
-const ConsumerType PartitionedBrokerConsumerStatsImpl::getType() const {
-    if (!statsList_.size()) {
-        return ConsumerExclusive;
-    }
-    return statsList_[0].getType();
-}
-
-double PartitionedBrokerConsumerStatsImpl::getMsgRateExpired() const {
-    double sum = 0;
-    for (int i = 0; i < statsList_.size(); i++) {
-        sum += statsList_[i].getMsgRateExpired();
-    }
-    return sum;
-}
-
-uint64_t PartitionedBrokerConsumerStatsImpl::getMsgBacklog() const {
-    uint64_t sum = 0;
-    for (int i = 0; i < statsList_.size(); i++) {
-        sum += statsList_[i].getMsgBacklog();
-    }
-    return sum;
-}
-
-BrokerConsumerStats PartitionedBrokerConsumerStatsImpl::getBrokerConsumerStats(int index) {
-    return statsList_[index];
-}
-
-void PartitionedBrokerConsumerStatsImpl::add(BrokerConsumerStats stats, int index) {
-    statsList_[index] = stats;
-}
-
-void PartitionedBrokerConsumerStatsImpl::clear() { statsList_.clear(); }
-}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h b/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h
deleted file mode 100644
index 683f5245dbb..00000000000
--- a/pulsar-client-cpp/lib/PartitionedBrokerConsumerStatsImpl.h
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef PULSAR_CPP_PARTITIONEDBROKERCONSUMERSTATSIMPL_H
-#define PULSAR_CPP_PARTITIONEDBROKERCONSUMERSTATSIMPL_H
-
-#include <string.h>
-#include <iostream>
-#include <vector>
-#include <pulsar/defines.h>
-#include <pulsar/Result.h>
-#include <functional>
-#include <boost/date_time/microsec_time_clock.hpp>
-#include <lib/BrokerConsumerStatsImpl.h>
-
-namespace pulsar {
-class PULSAR_PUBLIC PartitionedBrokerConsumerStatsImpl : public BrokerConsumerStatsImplBase {
-   private:
-    std::vector<BrokerConsumerStats> statsList_;
-    static const std::string DELIMITER;
-
-   public:
-    PartitionedBrokerConsumerStatsImpl(size_t size);
-
-    /** Returns true if the Stats are still valid **/
-    virtual bool isValid() const;
-
-    /** Returns the rate of messages delivered to the consumer. msg/s */
-    virtual double getMsgRateOut() const;
-
-    /** Returns the throughput delivered to the consumer. bytes/s */
-    virtual double getMsgThroughputOut() const;
-
-    /** Returns the rate of messages redelivered by this consumer. msg/s */
-    virtual double getMsgRateRedeliver() const;
-
-    /** Returns the Name of the consumer */
-    virtual const std::string getConsumerName() const;
-
-    /** Returns the Number of available message permits for the consumer */
-    virtual uint64_t getAvailablePermits() const;
-
-    /** Returns the Number of unacknowledged messages for the consumer */
-    virtual uint64_t getUnackedMessages() const;
-
-    /** Returns true if the consumer is blocked due to unacked messages.  */
-    virtual bool isBlockedConsumerOnUnackedMsgs() const;
-
-    /** Returns the Address of this consumer */
-    virtual const std::string getAddress() const;
-
-    /** Returns the Timestamp of connection */
-    virtual const std::string getConnectedSince() const;
-
-    /** Returns Whether this subscription is Exclusive or Shared or Failover */
-    virtual const ConsumerType getType() const;
-
-    /** Returns the rate of messages expired on this subscription. msg/s */
-    virtual double getMsgRateExpired() const;
-
-    /** Returns the Number of messages in the subscription backlog */
-    virtual uint64_t getMsgBacklog() const;
-
-    /** Returns the BrokerConsumerStatsImpl at of ith partition */
-    BrokerConsumerStats getBrokerConsumerStats(int index);
-
-    void add(BrokerConsumerStats stats, int index);
-
-    void clear();
-
-    friend std::ostream &operator<<(std::ostream &os, const PartitionedBrokerConsumerStatsImpl &obj);
-};
-typedef std::shared_ptr<PartitionedBrokerConsumerStatsImpl> PartitionedBrokerConsumerStatsPtr;
-}  // namespace pulsar
-#endif  // PULSAR_CPP_BROKERCONSUMERSTATSIMPL_H
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
deleted file mode 100644
index 54bb4b72c90..00000000000
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ /dev/null
@@ -1,615 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#include "PartitionedConsumerImpl.h"
-#include "MultiResultCallback.h"
-
-DECLARE_LOG_OBJECT()
-
-namespace pulsar {
-
-PartitionedConsumerImpl::PartitionedConsumerImpl(ClientImplPtr client, const std::string& subscriptionName,
-                                                 const TopicNamePtr topicName,
-                                                 const unsigned int numPartitions,
-                                                 const ConsumerConfiguration& conf)
-    : client_(client),
-      subscriptionName_(subscriptionName),
-      topicName_(topicName),
-      numPartitions_(numPartitions),
-      conf_(conf),
-      messages_(1000),
-      listenerExecutor_(client->getListenerExecutorProvider()->get()),
-      messageListener_(conf.getMessageListener()),
-      topic_(topicName->toString()) {
-    std::stringstream consumerStrStream;
-    consumerStrStream << "[Partitioned Consumer: " << topic_ << "," << subscriptionName << ","
-                      << numPartitions << "]";
-    if (conf.getUnAckedMessagesTimeoutMs() != 0) {
-        if (conf.getTickDurationInMs() > 0) {
-            unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerEnabled(
-                conf.getUnAckedMessagesTimeoutMs(), conf.getTickDurationInMs(), client, *this));
-        } else {
-            unAckedMessageTrackerPtr_.reset(
-                new UnAckedMessageTrackerEnabled(conf.getUnAckedMessagesTimeoutMs(), client, *this));
-        }
-    } else {
-        unAckedMessageTrackerPtr_.reset(new UnAckedMessageTrackerDisabled());
-    }
-    auto partitionsUpdateInterval = static_cast<unsigned int>(client_->conf().getPartitionsUpdateInterval());
-    if (partitionsUpdateInterval > 0) {
-        partitionsUpdateTimer_ = listenerExecutor_->createDeadlineTimer();
-        partitionsUpdateInterval_ = boost::posix_time::seconds(partitionsUpdateInterval);
-        lookupServicePtr_ = client_->getLookup();
-    }
-}
-
-PartitionedConsumerImpl::~PartitionedConsumerImpl() {}
-
-Future<Result, ConsumerImplBaseWeakPtr> PartitionedConsumerImpl::getConsumerCreatedFuture() {
-    return partitionedConsumerCreatedPromise_.getFuture();
-}
-const std::string& PartitionedConsumerImpl::getSubscriptionName() const { return subscriptionName_; }
-
-const std::string& PartitionedConsumerImpl::getTopic() const { return topic_; }
-
-Result PartitionedConsumerImpl::receive(Message& msg) {
-    if (state_ != Ready) {
-        return ResultAlreadyClosed;
-    }
-    // See comments in `receive(Message&, int)`
-
-    if (messageListener_) {
-        LOG_ERROR("Can not receive when a listener has been set");
-        return ResultInvalidConfiguration;
-    }
-
-    messages_.pop(msg);
-    unAckedMessageTrackerPtr_->add(msg.getMessageId());
-    return ResultOk;
-}
-
-Result PartitionedConsumerImpl::receive(Message& msg, int timeout) {
-    if (state_ != Ready) {
-        return ResultAlreadyClosed;
-    }
-
-    if (messageListener_) {
-        LOG_ERROR("Can not receive when a listener has been set");
-        return ResultInvalidConfiguration;
-    }
-
-    if (messages_.pop(msg, std::chrono::milliseconds(timeout))) {
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
-        return ResultOk;
-    } else {
-        if (state_ != Ready) {
-            return ResultAlreadyClosed;
-        }
-        return ResultTimeout;
-    }
-}
-
-void PartitionedConsumerImpl::receiveAsync(ReceiveCallback& callback) {
-    Message msg;
-
-    // fail the callback if consumer is closing or closed
-    if (state_ != Ready) {
-        callback(ResultAlreadyClosed, msg);
-        return;
-    }
-
-    Lock lock(pendingReceiveMutex_);
-    if (messages_.pop(msg, std::chrono::milliseconds(0))) {
-        lock.unlock();
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
-        callback(ResultOk, msg);
-    } else {
-        pendingReceives_.push(callback);
-    }
-}
-
-void PartitionedConsumerImpl::unsubscribeAsync(ResultCallback callback) {
-    LOG_INFO("[" << topicName_->toString() << "," << subscriptionName_ << "] Unsubscribing");
-    // change state to Closing, so that no Ready state operation is permitted during unsubscribe
-    state_ = Closing;
-    // do not accept un subscribe until we have subscribe to all of the partitions of a topic
-    // it's a logical single topic so it should behave like a single topic, even if it's sharded
-    unsigned int index = 0;
-    for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        LOG_DEBUG("Unsubcribing Consumer - " << index << " for Subscription - " << subscriptionName_
-                                             << " for Topic - " << topicName_->toString());
-        (*consumer)->unsubscribeAsync(std::bind(&PartitionedConsumerImpl::handleUnsubscribeAsync,
-                                                shared_from_this(), std::placeholders::_1, index++,
-                                                callback));
-    }
-}
-
-void PartitionedConsumerImpl::handleUnsubscribeAsync(Result result, unsigned int consumerIndex,
-                                                     ResultCallback callback) {
-    if (state_ == Failed) {
-        // we have already informed the client that unsubcribe has failed so, ignore this callbacks
-        // or do we still go ahead and check how many could we close successfully?
-        LOG_DEBUG("handleUnsubscribeAsync callback received in Failed State for consumerIndex - "
-                  << consumerIndex << "with Result - " << result << " for Subscription - "
-                  << subscriptionName_ << " for Topic - " << topicName_->toString());
-        return;
-    }
-    if (result != ResultOk) {
-        state_ = Failed;
-        LOG_ERROR("Error Closing one of the parition consumers, consumerIndex - " << consumerIndex);
-        callback(ResultUnknownError);
-        return;
-    }
-    const auto numPartitions = getNumPartitionsWithLock();
-    assert(unsubscribedSoFar_ <= numPartitions);
-    assert(consumerIndex <= numPartitions);
-    // this means we have successfully closed this partition consumer and no unsubscribe has failed so far
-    LOG_INFO("Successfully Unsubscribed Consumer - " << consumerIndex << " for Subscription - "
-                                                     << subscriptionName_ << " for Topic - "
-                                                     << topicName_->toString());
-    unsubscribedSoFar_++;
-    if (unsubscribedSoFar_ == numPartitions) {
-        LOG_DEBUG("Unsubscribed all of the partition consumer for subscription - " << subscriptionName_);
-        state_ = Closed;
-        callback(ResultOk);
-        return;
-    }
-}
-
-void PartitionedConsumerImpl::acknowledgeAsync(const MessageId& msgId, ResultCallback callback) {
-    int32_t partition = msgId.partition();
-#ifndef NDEBUG
-    Lock consumersLock(consumersMutex_);
-    assert(partition < getNumPartitions() && partition >= 0 && consumers_.size() > partition);
-    consumersLock.unlock();
-#endif
-    unAckedMessageTrackerPtr_->remove(msgId);
-    consumers_[partition]->acknowledgeAsync(msgId, callback);
-}
-
-void PartitionedConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
-}
-
-void PartitionedConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
-    int32_t partition = msgId.partition();
-    unAckedMessageTrackerPtr_->remove(msgId);
-    consumers_[partition]->negativeAcknowledge(msgId);
-}
-
-unsigned int PartitionedConsumerImpl::getNumPartitions() const { return numPartitions_; }
-
-unsigned int PartitionedConsumerImpl::getNumPartitionsWithLock() const {
-    Lock consumersLock(consumersMutex_);
-    return getNumPartitions();
-}
-
-ConsumerConfiguration PartitionedConsumerImpl::getSinglePartitionConsumerConfig() const {
-    using namespace std::placeholders;
-
-    ConsumerConfiguration config = conf_.clone();
-    // all the partitioned-consumer belonging to one partitioned topic should have same name
-    config.setConsumerName(conf_.getConsumerName());
-    config.setConsumerType(conf_.getConsumerType());
-    config.setBrokerConsumerStatsCacheTimeInMs(conf_.getBrokerConsumerStatsCacheTimeInMs());
-
-    const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
-    config.setMessageListener(std::bind(&PartitionedConsumerImpl::messageReceived, shared_this,
-                                        std::placeholders::_1, std::placeholders::_2));
-
-    // Apply total limit of receiver queue size across partitions
-    // NOTE: if it's called by handleGetPartitions(), the queue size of new internal consumers may be smaller
-    // than previous created internal consumers.
-    config.setReceiverQueueSize(
-        std::min(conf_.getReceiverQueueSize(),
-                 (int)(conf_.getMaxTotalReceiverQueueSizeAcrossPartitions() / getNumPartitions())));
-
-    return config;
-}
-
-ConsumerImplPtr PartitionedConsumerImpl::newInternalConsumer(unsigned int partition,
-                                                             const ConsumerConfiguration& config) const {
-    using namespace std::placeholders;
-
-    std::string topicPartitionName = topicName_->getTopicPartitionName(partition);
-    auto consumer = std::make_shared<ConsumerImpl>(client_, topicPartitionName, subscriptionName_, config,
-                                                   internalListenerExecutor_, true, Partitioned);
-
-    const auto shared_this = const_cast<PartitionedConsumerImpl*>(this)->shared_from_this();
-    consumer->getConsumerCreatedFuture().addListener(
-        std::bind(&PartitionedConsumerImpl::handleSinglePartitionConsumerCreated, shared_this,
-                  std::placeholders::_1, std::placeholders::_2, partition));
-    consumer->setPartitionIndex(partition);
-
-    LOG_DEBUG("Creating Consumer for single Partition - " << topicPartitionName << "SubName - "
-                                                          << subscriptionName_);
-    return consumer;
-}
-
-void PartitionedConsumerImpl::start() {
-    internalListenerExecutor_ = client_->getPartitionListenerExecutorProvider()->get();
-    const auto config = getSinglePartitionConsumerConfig();
-
-    // create consumer on each partition
-    // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
-    // when `state_` is Ready
-    for (unsigned int i = 0; i < getNumPartitions(); i++) {
-        consumers_.push_back(newInternalConsumer(i, config));
-    }
-    for (ConsumerList::const_iterator consumer = consumers_.begin(); consumer != consumers_.end();
-         consumer++) {
-        (*consumer)->start();
-    }
-}
-
-void PartitionedConsumerImpl::handleSinglePartitionConsumerCreated(
-    Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, unsigned int partitionIndex) {
-    ResultCallback nullCallbackForCleanup = NULL;
-    if (state_ == Failed) {
-        // one of the consumer creation failed, and we are cleaning up
-        return;
-    }
-    const auto numPartitions = getNumPartitionsWithLock();
-    assert(numConsumersCreated_ < numPartitions);
-
-    if (result != ResultOk) {
-        state_ = Failed;
-        partitionedConsumerCreatedPromise_.setFailed(result);
-        // unsubscribed all of the successfully subscribed partitioned consumers
-        closeAsync(nullCallbackForCleanup);
-        LOG_ERROR("Unable to create Consumer for partition - " << partitionIndex << " Error - " << result);
-        return;
-    }
-
-    assert(partitionIndex < numPartitions && partitionIndex >= 0);
-    Lock lock(mutex_);
-    numConsumersCreated_++;
-    lock.unlock();
-    if (numConsumersCreated_ == numPartitions) {
-        LOG_INFO("Successfully Subscribed to Partitioned Topic - " << topicName_->toString() << " with - "
-                                                                   << numPartitions << " Partitions.");
-        state_ = Ready;
-        if (partitionsUpdateTimer_) {
-            runPartitionUpdateTask();
-        }
-        receiveMessages();
-        partitionedConsumerCreatedPromise_.setValue(shared_from_this());
-        return;
-    }
-}
-
-void PartitionedConsumerImpl::handleSinglePartitionConsumerClose(Result result, unsigned int partitionIndex,
-                                                                 CloseCallback callback) {
-    if (state_ == Failed) {
-        // we should have already notified the client by callback
-        return;
-    }
-    if (result != ResultOk) {
-        state_ = Failed;
-        LOG_ERROR("Closing the consumer failed for partition - " << partitionIndex);
-        partitionedConsumerCreatedPromise_.setFailed(result);
-        if (callback) {
-            callback(result);
-        }
-        return;
-    }
-    assert(partitionIndex < getNumPartitionsWithLock() && partitionIndex >= 0);
-    Lock lock(mutex_);
-    if (numConsumersCreated_ > 0) {
-        numConsumersCreated_--;
-    }
-    lock.unlock();
-    // closed all successfully
-    if (!numConsumersCreated_) {
-        state_ = Closed;
-        // set the producerCreatedPromise to failure
-        partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
-        if (callback) {
-            callback(result);
-        }
-        return;
-    }
-}
-void PartitionedConsumerImpl::closeAsync(ResultCallback callback) {
-    Lock lock(consumersMutex_);
-    if (consumers_.empty()) {
-        notifyResult(callback);
-        return;
-    }
-    state_ = Closed;
-    unsigned int consumerAlreadyClosed = 0;
-    // close successfully subscribed consumers
-    // Here we don't need `consumersMutex` to protect `consumers_`, because `consumers_` can only be increased
-    // when `state_` is Ready
-    for (auto& consumer : consumers_) {
-        if (!consumer->isClosed()) {
-            auto self = shared_from_this();
-            const auto partition = consumer->getPartitionIndex();
-            consumer->closeAsync([this, self, partition, callback](Result result) {
-                handleSinglePartitionConsumerClose(result, partition, callback);
-            });
-        } else {
-            if (++consumerAlreadyClosed == consumers_.size()) {
-                // everything is closed already. so we are good.
-                notifyResult(callback);
-                return;
-            }
-        }
-    }
-
-    // fail pending recieve
-    failPendingReceiveCallback();
-}
-
-void PartitionedConsumerImpl::notifyResult(CloseCallback closeCallback) {
-    if (closeCallback) {
-        // this means client invoked the closeAsync with a valid callback
-        state_ = Closed;
-        closeCallback(ResultOk);
-    } else {
-        // consumer create failed, closeAsync called to cleanup the successfully created producers
-        state_ = Failed;
-        partitionedConsumerCreatedPromise_.setFailed(ResultUnknownError);
-    }
-}
-
-void PartitionedConsumerImpl::shutdown() {}
-
-bool PartitionedConsumerImpl::isClosed() { return state_ == Closed; }
-
-bool PartitionedConsumerImpl::isOpen() { return state_ == Ready; }
-
-void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
-    LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
-    const std::string& topicPartitionName = consumer.getTopic();
-    msg.impl_->setTopicName(topicPartitionName);
-    // messages_ is a blocking queue: if queue is already full then no need of lock as receiveAsync already
-    // gets available-msg and no need to put request in pendingReceives_
-    Lock lock(pendingReceiveMutex_);
-    if (!pendingReceives_.empty()) {
-        ReceiveCallback callback = pendingReceives_.front();
-        pendingReceives_.pop();
-        lock.unlock();
-        unAckedMessageTrackerPtr_->add(msg.getMessageId());
-        listenerExecutor_->postWork(std::bind(callback, ResultOk, msg));
-    } else {
-        if (messages_.full()) {
-            lock.unlock();
-        }
-        if (messages_.push(msg) && messageListener_) {
-            unAckedMessageTrackerPtr_->add(msg.getMessageId());
-            listenerExecutor_->postWork(
-                std::bind(&PartitionedConsumerImpl::internalListener, shared_from_this(), consumer));
-        }
-    }
-}
-
-void PartitionedConsumerImpl::failPendingReceiveCallback() {
-    Message msg;
-
-    messages_.close();
-
-    Lock lock(pendingReceiveMutex_);
-    while (!pendingReceives_.empty()) {
-        ReceiveCallback callback = pendingReceives_.front();
-        pendingReceives_.pop();
-        listenerExecutor_->postWork(std::bind(callback, ResultAlreadyClosed, msg));
-    }
-    lock.unlock();
-}
-
-void PartitionedConsumerImpl::internalListener(Consumer consumer) {
-    Message m;
-    messages_.pop(m);
-    try {
-        messageListener_(Consumer(shared_from_this()), m);
-    } catch (const std::exception& e) {
-        LOG_ERROR("Exception thrown from listener of Partitioned Consumer" << e.what());
-    }
-}
-
-void PartitionedConsumerImpl::receiveMessages() {
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
-        ConsumerImplPtr consumer = *i;
-        consumer->sendFlowPermitsToBroker(consumer->getCnx().lock(), conf_.getReceiverQueueSize());
-        LOG_DEBUG("Sending FLOW command for consumer - " << consumer->getConsumerId());
-    }
-}
-
-Result PartitionedConsumerImpl::pauseMessageListener() {
-    if (!messageListener_) {
-        return ResultInvalidConfiguration;
-    }
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
-        (*i)->pauseMessageListener();
-    }
-    return ResultOk;
-}
-
-Result PartitionedConsumerImpl::resumeMessageListener() {
-    if (!messageListener_) {
-        return ResultInvalidConfiguration;
-    }
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
-        (*i)->resumeMessageListener();
-    }
-    return ResultOk;
-}
-
-void PartitionedConsumerImpl::redeliverUnacknowledgedMessages() {
-    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
-        (*i)->redeliverUnacknowledgedMessages();
-    }
-    unAckedMessageTrackerPtr_->clear();
-}
-
-void PartitionedConsumerImpl::redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) {
-    if (messageIds.empty()) {
-        return;
-    }
-    if (conf_.getConsumerType() != ConsumerShared && conf_.getConsumerType() != ConsumerKeyShared) {
-        redeliverUnacknowledgedMessages();
-        return;
-    }
-    LOG_DEBUG("Sending RedeliverUnacknowledgedMessages command for partitioned consumer.");
-    for (ConsumerList::const_iterator i = consumers_.begin(); i != consumers_.end(); i++) {
-        (*i)->redeliverUnacknowledgedMessages(messageIds);
-    }
-}
-
-const std::string& PartitionedConsumerImpl::getName() const { return partitionStr_; }
-
-int PartitionedConsumerImpl::getNumOfPrefetchedMessages() const { return messages_.size(); }
-
-void PartitionedConsumerImpl::getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) {
-    if (state_ != Ready) {
-        callback(ResultConsumerNotInitialized, BrokerConsumerStats());
-        return;
-    }
-    const auto numPartitions = getNumPartitionsWithLock();
-    PartitionedBrokerConsumerStatsPtr statsPtr =
-        std::make_shared<PartitionedBrokerConsumerStatsImpl>(numPartitions);
-    LatchPtr latchPtr = std::make_shared<Latch>(numPartitions);
-    ConsumerList consumerList = consumers_;
-    for (int i = 0; i < consumerList.size(); i++) {
-        consumerList[i]->getBrokerConsumerStatsAsync(
-            std::bind(&PartitionedConsumerImpl::handleGetConsumerStats, shared_from_this(),
-                      std::placeholders::_1, std::placeholders::_2, latchPtr, statsPtr, i, callback));
-    }
-}
-
-void PartitionedConsumerImpl::handleGetConsumerStats(Result res, BrokerConsumerStats brokerConsumerStats,
-                                                     LatchPtr latchPtr,
-                                                     PartitionedBrokerConsumerStatsPtr statsPtr, size_t index,
-                                                     BrokerConsumerStatsCallback callback) {
-    Lock lock(mutex_);
-    if (res == ResultOk) {
-        latchPtr->countdown();
-        statsPtr->add(brokerConsumerStats, index);
-    } else {
-        lock.unlock();
-        callback(res, BrokerConsumerStats());
-        return;
-    }
-    if (latchPtr->getCount() == 0) {
-        lock.unlock();
-        callback(ResultOk, BrokerConsumerStats(statsPtr));
-    }
-}
-
-void PartitionedConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
-}
-
-void PartitionedConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) {
-    if (state_ != Ready) {
-        callback(ResultAlreadyClosed);
-        return;
-    }
-
-    // consumers_ could only be modified when state_ is Ready, so we needn't lock consumersMutex_ here
-    ConsumerList consumerList = consumers_;
-
-    MultiResultCallback multiResultCallback(callback, consumers_.size());
-    for (ConsumerList::const_iterator i = consumerList.begin(); i != consumerList.end(); i++) {
-        (*i)->seekAsync(timestamp, multiResultCallback);
-    }
-}
-
-void PartitionedConsumerImpl::runPartitionUpdateTask() {
-    partitionsUpdateTimer_->expires_from_now(partitionsUpdateInterval_);
-    partitionsUpdateTimer_->async_wait(
-        std::bind(&PartitionedConsumerImpl::getPartitionMetadata, shared_from_this()));
-}
-
-void PartitionedConsumerImpl::getPartitionMetadata() {
-    using namespace std::placeholders;
-    lookupServicePtr_->getPartitionMetadataAsync(topicName_)
-        .addListener(std::bind(&PartitionedConsumerImpl::handleGetPartitions, shared_from_this(),
-                               std::placeholders::_1, std::placeholders::_2));
-}
-
-void PartitionedConsumerImpl::handleGetPartitions(Result result,
-                                                  const LookupDataResultPtr& lookupDataResult) {
-    if (state_ != Ready) {
-        return;
-    }
-
-    if (!result) {
-        const auto newNumPartitions = static_cast<unsigned int>(lookupDataResult->getPartitions());
-        Lock consumersLock(consumersMutex_);
-        const auto currentNumPartitions = getNumPartitions();
-        assert(currentNumPartitions == consumers_.size());
-        if (newNumPartitions > currentNumPartitions) {
-            LOG_INFO("new partition count: " << newNumPartitions);
-            numPartitions_ = newNumPartitions;
-            const auto config = getSinglePartitionConsumerConfig();
-            for (unsigned int i = currentNumPartitions; i < newNumPartitions; i++) {
-                auto consumer = newInternalConsumer(i, config);
-                consumer->start();
-                consumers_.push_back(consumer);
-            }
-            // `runPartitionUpdateTask()` will be called in `handleSinglePartitionConsumerCreated()`
-            return;
-        }
-    } else {
-        LOG_WARN("Failed to getPartitionMetadata: " << strResult(result));
-    }
-
-    runPartitionUpdateTask();
-}
-
-void PartitionedConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) {
-    Lock lock(mutex_);
-    for (auto&& c : consumers_) {
-        c->setNegativeAcknowledgeEnabledForTesting(enabled);
-    }
-}
-
-bool PartitionedConsumerImpl::isConnected() const {
-    if (state_ != Ready) {
-        return false;
-    }
-
-    Lock consumersLock(consumersMutex_);
-    const auto consumers = consumers_;
-    consumersLock.unlock();
-    for (const auto& consumer : consumers_) {
-        if (!consumer->isConnected()) {
-            return false;
-        }
-    }
-    return true;
-}
-
-uint64_t PartitionedConsumerImpl::getNumberOfConnectedConsumer() {
-    uint64_t numberOfConnectedConsumer = 0;
-    Lock consumersLock(consumersMutex_);
-    const auto consumers = consumers_;
-    consumersLock.unlock();
-    for (const auto& consumer : consumers) {
-        if (consumer->isConnected()) {
-            numberOfConnectedConsumer++;
-        }
-    }
-    return numberOfConnectedConsumer;
-}
-
-}  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h b/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
deleted file mode 100644
index 8f4faf09954..00000000000
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.h
+++ /dev/null
@@ -1,137 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-#ifndef PULSAR_PARTITIONED_CONSUMER_HEADER
-#define PULSAR_PARTITIONED_CONSUMER_HEADER
-#include "lib/TestUtil.h"
-#include "ConsumerImpl.h"
-#include "ClientImpl.h"
-#include <vector>
-#include <queue>
-
-#include <mutex>
-#include "ConsumerImplBase.h"
-#include "lib/UnAckedMessageTrackerDisabled.h"
-#include <lib/Latch.h>
-#include <lib/PartitionedBrokerConsumerStatsImpl.h>
-#include <lib/TopicName.h>
-
-namespace pulsar {
-class PartitionedConsumerImpl;
-class PartitionedConsumerImpl : public ConsumerImplBase,
-                                public std::enable_shared_from_this<PartitionedConsumerImpl> {
-   public:
-    enum PartitionedConsumerState
-    {
-        Pending,
-        Ready,
-        Closing,
-        Closed,
-        Failed
-    };
-    PartitionedConsumerImpl(ClientImplPtr client, const std::string& subscriptionName,
-                            const TopicNamePtr topicName, const unsigned int numPartitions,
-                            const ConsumerConfiguration& conf);
-    ~PartitionedConsumerImpl();
-    // overrided methods from ConsumerImplBase
-    Future<Result, ConsumerImplBaseWeakPtr> getConsumerCreatedFuture() override;
-    const std::string& getSubscriptionName() const override;
-    const std::string& getTopic() const override;
-    Result receive(Message& msg) override;
-    Result receive(Message& msg, int timeout) override;
-    void receiveAsync(ReceiveCallback& callback) override;
-    void unsubscribeAsync(ResultCallback callback) override;
-    void acknowledgeAsync(const MessageId& msgId, ResultCallback callback) override;
-    void acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) override;
-    void closeAsync(ResultCallback callback) override;
-    void start() override;
-    void shutdown() override;
-    bool isClosed() override;
-    bool isOpen() override;
-    Result pauseMessageListener() override;
-    Result resumeMessageListener() override;
-    void redeliverUnacknowledgedMessages() override;
-    void redeliverUnacknowledgedMessages(const std::set<MessageId>& messageIds) override;
-    const std::string& getName() const override;
-    int getNumOfPrefetchedMessages() const override;
-    void getBrokerConsumerStatsAsync(BrokerConsumerStatsCallback callback) override;
-    void seekAsync(const MessageId& msgId, ResultCallback callback) override;
-    void seekAsync(uint64_t timestamp, ResultCallback callback) override;
-    void negativeAcknowledge(const MessageId& msgId) override;
-    bool isConnected() const override;
-    uint64_t getNumberOfConnectedConsumer() override;
-
-    void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, PartitionedBrokerConsumerStatsPtr,
-                                size_t, BrokerConsumerStatsCallback);
-
-   private:
-    const ClientImplPtr client_;
-    const std::string subscriptionName_;
-    const TopicNamePtr topicName_;
-    unsigned int numPartitions_;
-    unsigned int numConsumersCreated_ = 0;
-    const ConsumerConfiguration conf_;
-    typedef std::vector<ConsumerImplPtr> ConsumerList;
-    ConsumerList consumers_;
-    // consumersMutex_ is used to share consumers_ and numPartitions_
-    mutable std::mutex consumersMutex_;
-    mutable std::mutex mutex_;
-    std::mutex pendingReceiveMutex_;
-    std::atomic<PartitionedConsumerState> state_{Pending};
-    unsigned int unsubscribedSoFar_ = 0;
-    BlockingQueue<Message> messages_;
-    ExecutorServicePtr listenerExecutor_;
-    MessageListener messageListener_;
-    const std::string topic_;
-    const std::string name_;
-    const std::string partitionStr_;
-    ExecutorServicePtr internalListenerExecutor_;
-    DeadlineTimerPtr partitionsUpdateTimer_;
-    boost::posix_time::time_duration partitionsUpdateInterval_;
-    LookupServicePtr lookupServicePtr_;
-
-    unsigned int getNumPartitions() const;
-    unsigned int getNumPartitionsWithLock() const;
-    ConsumerConfiguration getSinglePartitionConsumerConfig() const;
-    ConsumerImplPtr newInternalConsumer(unsigned int partition, const ConsumerConfiguration& config) const;
-    void handleUnsubscribeAsync(Result result, unsigned int consumerIndex, ResultCallback callback);
-    void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
-                                              unsigned int partitionIndex);
-    void handleSinglePartitionConsumerClose(Result result, unsigned int partitionIndex,
-                                            CloseCallback callback);
-    void notifyResult(CloseCallback closeCallback);
-    void messageReceived(Consumer consumer, const Message& msg);
-    void internalListener(Consumer consumer);
-    void receiveMessages();
-    void failPendingReceiveCallback();
-    void setNegativeAcknowledgeEnabledForTesting(bool enabled) override;
-    Promise<Result, ConsumerImplBaseWeakPtr> partitionedConsumerCreatedPromise_;
-    UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
-    std::queue<ReceiveCallback> pendingReceives_;
-    void runPartitionUpdateTask();
-    void getPartitionMetadata();
-    void handleGetPartitions(const Result result, const LookupDataResultPtr& lookupDataResult);
-
-    friend class PulsarFriend;
-
-    FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
-};
-typedef std::weak_ptr<PartitionedConsumerImpl> PartitionedConsumerImplWeakPtr;
-typedef std::shared_ptr<PartitionedConsumerImpl> PartitionedConsumerImplPtr;
-}  // namespace pulsar
-#endif  // PULSAR_PARTITIONED_CONSUMER_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index 6bb31d0490d..5c4f7d21623 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -1692,7 +1692,7 @@ TEST(BasicEndToEndTest, testSeekOnPartitionedTopic) {
     ASSERT_EQ(expected.str(), msgReceived.getDataAsString());
     ASSERT_EQ(ResultOk, consumer.acknowledge(msgReceived));
     ASSERT_EQ(ResultOk, consumer.unsubscribe());
-    ASSERT_EQ(ResultOk, consumer.close());
+    ASSERT_EQ(ResultAlreadyClosed, consumer.close());
     ASSERT_EQ(ResultOk, producer.close());
     ASSERT_EQ(ResultOk, client.close());
 }
diff --git a/pulsar-client-cpp/tests/ConsumerStatsTest.cc b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
index ada86760a2a..c398a532e68 100644
--- a/pulsar-client-cpp/tests/ConsumerStatsTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerStatsTest.cc
@@ -20,15 +20,12 @@
 #include <pulsar/Client.h>
 #include <lib/LogUtils.h>
 #include <lib/Commands.h>
-#include "CustomRoutingPolicy.h"
 #include "lib/Future.h"
 #include "lib/Utils.h"
 #include "PulsarFriend.h"
 #include "ConsumerTest.h"
 #include "HttpHelper.h"
 #include <lib/Latch.h>
-#include <lib/PartitionedConsumerImpl.h>
-#include <lib/TopicName.h>
 
 #include <functional>
 #include <thread>
@@ -42,8 +39,8 @@ static std::string adminUrl = "http://localhost:8080/";
 void partitionedCallbackFunction(Result result, BrokerConsumerStats brokerConsumerStats, long expectedBacklog,
                                  Latch& latch, int index, bool accurate) {
     ASSERT_EQ(result, ResultOk);
-    PartitionedBrokerConsumerStatsImpl* statsPtr =
-        (PartitionedBrokerConsumerStatsImpl*)(brokerConsumerStats.getImpl().get());
+    MultiTopicsBrokerConsumerStatsImpl* statsPtr =
+        (MultiTopicsBrokerConsumerStatsImpl*)(brokerConsumerStats.getImpl().get());
     LOG_DEBUG(statsPtr);
     if (accurate) {
         ASSERT_EQ(expectedBacklog, statsPtr->getBrokerConsumerStats(index).getMsgBacklog());
diff --git a/pulsar-client-cpp/tests/ConsumerTest.cc b/pulsar-client-cpp/tests/ConsumerTest.cc
index 575abb69635..694a568eb15 100644
--- a/pulsar-client-cpp/tests/ConsumerTest.cc
+++ b/pulsar-client-cpp/tests/ConsumerTest.cc
@@ -30,7 +30,6 @@
 #include "lib/Future.h"
 #include "lib/Utils.h"
 #include "lib/LogUtils.h"
-#include "lib/PartitionedConsumerImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
 #include "HttpHelper.h"
 
@@ -406,8 +405,9 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
     consumerConfig.setUnAckedMessagesTimeoutMs(unAckedMessagesTimeoutMs);
     consumerConfig.setTickDurationInMs(tickDurationInMs);
     ASSERT_EQ(ResultOk, client.subscribe(partitionedTopic, subName, consumerConfig, consumer));
-    PartitionedConsumerImplPtr partitionedConsumerImplPtr =
-        PulsarFriend::getPartitionedConsumerImplPtr(consumer);
+
+    MultiTopicsConsumerImplPtr partitionedConsumerImplPtr =
+        PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
     ASSERT_EQ(numPartitions, partitionedConsumerImplPtr->consumers_.size());
 
     // send messages
@@ -442,8 +442,10 @@ TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery) {
     ASSERT_EQ(numOfMessages, partitionedTracker->size());
     ASSERT_FALSE(partitionedTracker->isEmpty());
     for (auto i = 0; i < numPartitions; i++) {
+        auto topicName =
+            "persistent://public/default/" + partitionedTopic + "-partition-" + std::to_string(i);
         ASSERT_EQ(numOfMessages / numPartitions, messageIds[i].size());
-        auto subConsumerPtr = partitionedConsumerImplPtr->consumers_[i];
+        auto subConsumerPtr = partitionedConsumerImplPtr->consumers_.find(topicName).value();
         auto tracker =
             static_cast<UnAckedMessageTrackerEnabled*>(subConsumerPtr->unAckedMessageTrackerPtr_.get());
         ASSERT_EQ(0, tracker->size());
diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h
index b74c0e1241b..b6fb219eabb 100644
--- a/pulsar-client-cpp/tests/PulsarFriend.h
+++ b/pulsar-client-cpp/tests/PulsarFriend.h
@@ -23,7 +23,6 @@
 #include "lib/ProducerImpl.h"
 #include "lib/PartitionedProducerImpl.h"
 #include "lib/ConsumerImpl.h"
-#include "lib/PartitionedConsumerImpl.h"
 #include "lib/MultiTopicsConsumerImpl.h"
 #include "lib/ReaderImpl.h"
 
@@ -92,10 +91,6 @@ class PulsarFriend {
         return consumerImpl->chunkedMessageCache_;
     }
 
-    static std::shared_ptr<PartitionedConsumerImpl> getPartitionedConsumerImplPtr(Consumer consumer) {
-        return std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_);
-    }
-
     static std::shared_ptr<MultiTopicsConsumerImpl> getMultiTopicsConsumerImplPtr(Consumer consumer) {
         return std::static_pointer_cast<MultiTopicsConsumerImpl>(consumer.impl_);
     }