You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/02/07 03:53:20 UTC

[pulsar-client-cpp] branch main updated: [feat] Support partitioned topic reader. (#154)

This is an automated email from the ASF dual-hosted git repository.

xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 70ccd83  [feat] Support partitioned topic reader. (#154)
70ccd83 is described below

commit 70ccd83503138f5adea8b3551754c2095a5150f0
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Tue Feb 7 11:53:15 2023 +0800

    [feat] Support partitioned topic reader. (#154)
---
 lib/ClientImpl.cc                  |  11 +---
 lib/ConsumerImpl.h                 |   2 +-
 lib/ConsumerImplBase.h             |   2 +
 lib/MultiTopicsConsumerImpl.cc     |  57 ++++++++++++++---
 lib/MultiTopicsConsumerImpl.h      |  14 ++++-
 lib/ReaderImpl.cc                  |  29 ++++++---
 lib/ReaderImpl.h                   |  10 +--
 lib/UnAckedMessageTrackerEnabled.h |   1 +
 tests/ConsumerTest.cc              |  47 ++++++++++++++
 tests/PulsarFriend.h               |   2 +-
 tests/ReaderTest.cc                | 124 ++++++++++++++++++++++++++-----------
 11 files changed, 232 insertions(+), 67 deletions(-)

diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index ec9de8a..14a4047 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -255,15 +255,10 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
         return;
     }
 
-    if (partitionMetadata->getPartitions() > 0) {
-        LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
-        callback(ResultOperationNotSupported, Reader());
-        return;
-    }
-
-    ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
+    ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(),
+                                                        partitionMetadata->getPartitions(), conf,
                                                         getListenerExecutorProvider()->get(), callback);
-    ConsumerImplBasePtr consumer = reader->getConsumer().lock();
+    ConsumerImplBasePtr consumer = reader->getConsumer();
     auto self = shared_from_this();
     reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
         auto consumer = weakConsumerPtr.lock();
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 6437472..b9292c0 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -124,6 +124,7 @@ class ConsumerImpl : public ConsumerImplBase {
     void negativeAcknowledge(const MessageId& msgId) override;
     bool isConnected() const override;
     uint64_t getNumberOfConnectedConsumer() override;
+    void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
 
     virtual void disconnectConsumer();
     Result fetchSingleMessageFromBroker(Message& msg);
@@ -133,7 +134,6 @@ class ConsumerImpl : public ConsumerImplBase {
     virtual void redeliverMessages(const std::set<MessageId>& messageIds);
 
     virtual bool isReadCompacted();
-    virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
     void beforeConnectionChange(ClientConnection& cnx) override;
 
    protected:
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 9cf63a3..73c9161 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -29,6 +29,7 @@
 #include "HandlerBase.h"
 
 namespace pulsar {
+typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
 class ConsumerImplBase;
 using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
 class OpBatchReceive {
@@ -76,6 +77,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
     virtual uint64_t getNumberOfConnectedConsumer() = 0;
     // overrided methods from HandlerBase
     virtual const std::string& getName() const override = 0;
+    virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0;
 
    protected:
     // overrided methods from HandlerBase
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 7410b74..39da4a6 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -40,16 +40,20 @@ using namespace pulsar;
 MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
                                                  int numPartitions, const std::string& subscriptionName,
                                                  const ConsumerConfiguration& conf,
-                                                 LookupServicePtr lookupServicePtr)
+                                                 LookupServicePtr lookupServicePtr,
+                                                 const Commands::SubscriptionMode subscriptionMode,
+                                                 boost::optional<MessageId> startMessageId)
     : MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
-                              lookupServicePtr) {
+                              lookupServicePtr, subscriptionMode, startMessageId) {
     topicsPartitions_[topicName->toString()] = numPartitions;
 }
 
 MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                                                  const std::string& subscriptionName, TopicNamePtr topicName,
                                                  const ConsumerConfiguration& conf,
-                                                 LookupServicePtr lookupServicePtr)
+                                                 LookupServicePtr lookupServicePtr,
+                                                 const Commands::SubscriptionMode subscriptionMode,
+                                                 boost::optional<MessageId> startMessageId)
     : ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
                        Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
                        client->getListenerExecutorProvider()->get()),
@@ -60,7 +64,9 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
       messageListener_(conf.getMessageListener()),
       lookupServicePtr_(lookupServicePtr),
       numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
-      topics_(topics) {
+      topics_(topics),
+      subscriptionMode_(subscriptionMode),
+      startMessageId_(startMessageId) {
     std::stringstream consumerStrStream;
     consumerStrStream << "[Muti Topics Consumer: "
                       << "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
@@ -226,7 +232,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
         // We don't have to add partition-n suffix
         consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
                                                   topicName->isPersistent(), internalListenerExecutor, true,
-                                                  NonPartitioned);
+                                                  NonPartitioned, subscriptionMode_, startMessageId_);
         consumer->getConsumerCreatedFuture().addListener(std::bind(
             &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
             std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
@@ -239,7 +245,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
             std::string topicPartitionName = topicName->getTopicPartitionName(i);
             consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
                                                       topicName->isPersistent(), internalListenerExecutor,
-                                                      true, Partitioned);
+                                                      true, Partitioned, subscriptionMode_, startMessageId_);
             consumer->getConsumerCreatedFuture().addListener(std::bind(
                 &MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
                 std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
@@ -686,7 +692,12 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
 }
 
 void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
-    callback(ResultOperationNotSupported);
+    msgId.getTopicName();
+    auto optConsumer = consumers_.find(msgId.getTopicName());
+    if (optConsumer) {
+        unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
+        optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
+    }
 }
 
 void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
@@ -1047,3 +1058,35 @@ void MultiTopicsConsumerImpl::cancelTimers() noexcept {
         partitionsUpdateTimer_->cancel(ec);
     }
 }
+
+void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
+    if (incomingMessagesSize_ > 0) {
+        callback(ResultOk, true);
+        return;
+    }
+
+    auto hasMessageAvailable = std::make_shared<std::atomic<bool>>();
+    auto needCallBack = std::make_shared<std::atomic<int>>(consumers_.size());
+    auto self = get_shared_this_ptr();
+
+    consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) {
+        consumer->hasMessageAvailableAsync(
+            [self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
+                if (result != ResultOk) {
+                    LOG_ERROR("Filed when acknowledge list: " << result);
+                    // set needCallBack is -1 to avoid repeated callback.
+                    needCallBack->store(-1);
+                    callback(result, false);
+                    return;
+                }
+
+                if (hasMsg) {
+                    hasMessageAvailable->store(hasMsg);
+                }
+
+                if (--(*needCallBack) == 0) {
+                    callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
+                }
+            });
+    });
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 50cdecf..a9ee160 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "BlockingQueue.h"
+#include "Commands.h"
 #include "ConsumerImplBase.h"
 #include "Future.h"
 #include "Latch.h"
@@ -53,10 +54,15 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
    public:
     MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
                             const std::string& subscriptionName, const ConsumerConfiguration& conf,
-                            LookupServicePtr lookupServicePtr);
+                            LookupServicePtr lookupServicePtr,
+                            Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
+                            boost::optional<MessageId> startMessageId = boost::none);
+
     MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
                             const std::string& subscriptionName, TopicNamePtr topicName,
-                            const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
+                            const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_,
+                            Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
+                            boost::optional<MessageId> startMessageId = boost::none);
 
     ~MultiTopicsConsumerImpl();
     // overrided methods from ConsumerImplBase
@@ -88,6 +94,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     void negativeAcknowledge(const MessageId& msgId) override;
     bool isConnected() const override;
     uint64_t getNumberOfConnectedConsumer() override;
+    void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
 
     void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
                                 size_t, BrokerConsumerStatsCallback);
@@ -118,6 +125,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
     UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
     const std::vector<std::string> topics_;
     std::queue<ReceiveCallback> pendingReceives_;
+    const Commands::SubscriptionMode subscriptionMode_;
+    boost::optional<MessageId> startMessageId_;
 
     /* methods */
     void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
@@ -167,6 +176,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
 
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
+    FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
 };
 
 typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index da1d95e..a06c0ec 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -23,6 +23,7 @@
 #include "ConsumerImpl.h"
 #include "ExecutorService.h"
 #include "GetLastMessageIdResponse.h"
+#include "MultiTopicsConsumerImpl.h"
 #include "TopicName.h"
 
 namespace pulsar {
@@ -35,9 +36,14 @@ ConsumerConfiguration consumerConfigOfReader;
 
 static ResultCallback emptyCallback;
 
-ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
-                       const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
-    : topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
+ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
+                       const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
+                       ReaderCallback readerCreatedCallback)
+    : topic_(topic),
+      partitions_(partitions),
+      client_(client),
+      readerConf_(conf),
+      readerCreatedCallback_(readerCreatedCallback) {}
 
 void ReaderImpl::start(const MessageId& startMessageId,
                        std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
@@ -80,10 +86,19 @@ void ReaderImpl::start(const MessageId& startMessageId,
         test::consumerConfigOfReader = consumerConf.clone();
     }
 
-    consumer_ = std::make_shared<ConsumerImpl>(
-        client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
-        ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId);
-    consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
+    if (partitions_ > 0) {
+        auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
+            client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
+            client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, startMessageId);
+        consumer_ = consumerImpl;
+    } else {
+        auto consumerImpl = std::make_shared<ConsumerImpl>(
+            client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
+            ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
+            startMessageId);
+        consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
+        consumer_ = consumerImpl;
+    }
     auto self = shared_from_this();
     consumer_->getConsumerCreatedFuture().addListener(
         [this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h
index e216241..3731990 100644
--- a/lib/ReaderImpl.h
+++ b/lib/ReaderImpl.h
@@ -58,8 +58,9 @@ extern PULSAR_PUBLIC ConsumerConfiguration consumerConfigOfReader;
 
 class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
    public:
-    ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
-               const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
+    ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
+               const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
+               ReaderCallback readerCreatedCallback);
 
     void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);
 
@@ -73,7 +74,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
 
     Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();
 
-    ConsumerImplWeakPtr getConsumer() const noexcept { return consumer_; }
+    ConsumerImplBasePtr getConsumer() const noexcept { return consumer_; }
 
     void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
 
@@ -90,9 +91,10 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
     void acknowledgeIfNecessary(Result result, const Message& msg);
 
     std::string topic_;
+    int partitions_;
     ClientImplWeakPtr client_;
     ReaderConfiguration readerConf_;
-    ConsumerImplPtr consumer_;
+    ConsumerImplBasePtr consumer_;
     ReaderCallback readerCreatedCallback_;
     ReaderListener readerListener_;
 };
diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h
index 0bdcc85..1453460 100644
--- a/lib/UnAckedMessageTrackerEnabled.h
+++ b/lib/UnAckedMessageTrackerEnabled.h
@@ -64,6 +64,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
     FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
     FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
+    FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
 };
 }  // namespace pulsar
 
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f3c8abb..ac971b3 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -261,6 +261,53 @@ TEST(ConsumerTest, testConsumerEventWithPartition) {
     ASSERT_EQ(0, result.size());
 }
 
+TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition) {
+    Client client(lookupUrl);
+
+    const std::string topic = "testAcknowledgeCumulativeWithPartition-" + std::to_string(time(nullptr));
+    const std::string subName = "sub";
+
+    int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions",
+                             std::to_string(2));
+    ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfiguration;
+    consumerConfiguration.setUnAckedMessagesTimeoutMs(10000);
+    ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer));
+
+    Producer producer;
+    ProducerConfiguration producerConfiguration;
+    producerConfiguration.setBatchingEnabled(false);
+    producerConfiguration.setPartitionsRoutingMode(
+        ProducerConfiguration::PartitionsRoutingMode::RoundRobinDistribution);
+    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));
+
+    const int numMessages = 100;
+    for (int i = 0; i < numMessages; ++i) {
+        Message msg = MessageBuilder().setContent(std::to_string(i)).build();
+        ASSERT_EQ(ResultOk, producer.send(msg));
+    }
+
+    Message msg;
+    for (int i = 0; i < numMessages; i++) {
+        ASSERT_EQ(ResultOk, consumer.receive(msg));
+        // The last message of each partition topic be ACK
+        if (i >= numMessages - 2) {
+            consumer.acknowledgeCumulative(msg.getMessageId());
+        }
+    }
+    ASSERT_EQ(ResultTimeout, consumer.receive(msg, 2000));
+
+    // Assert that there is no message in the tracker.
+    auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+    auto tracker =
+        static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImpl->unAckedMessageTrackerPtr_.get());
+    ASSERT_EQ(0, tracker->size());
+
+    client.close();
+}
+
 TEST(ConsumerTest, consumerNotInitialized) {
     Consumer consumer;
 
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index aa7737d..1ef1ee7 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -99,7 +99,7 @@ class PulsarFriend {
     }
 
     static ConsumerImplPtr getConsumer(Reader reader) {
-        return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
+        return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer());
     }
 
     static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index eefe1bc..afef384 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -37,10 +37,28 @@ using namespace pulsar;
 static std::string serviceUrl = "pulsar://localhost:6650";
 static const std::string adminUrl = "http://localhost:8080/";
 
-TEST(ReaderTest, testSimpleReader) {
+class ReaderTest : public ::testing::TestWithParam<bool> {
+   public:
+    void initTopic(std::string topicName) {
+        if (isMultiTopic_) {
+            // call admin api to make it partitioned
+            std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+            int res = makePutRequest(url, "5");
+            LOG_INFO("res = " << res);
+            ASSERT_FALSE(res != 204 && res != 409);
+        }
+    }
+
+   protected:
+    bool isMultiTopic_ = GetParam();
+};
+
+TEST_P(ReaderTest, testSimpleReader) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/test-simple-reader";
+    std::string topicName =
+        "test-simple-reader" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     ReaderConfiguration readerConf;
     Reader reader;
@@ -69,10 +87,11 @@ TEST(ReaderTest, testSimpleReader) {
     client.close();
 }
 
-TEST(ReaderTest, testAsyncRead) {
+TEST_P(ReaderTest, testAsyncRead) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/test-simple-reader" + std::to_string(time(nullptr));
+    std::string topicName = "testAsyncRead" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     ReaderConfiguration readerConf;
     Reader reader;
@@ -113,10 +132,12 @@ TEST(ReaderTest, testAsyncRead) {
     client.close();
 }
 
-TEST(ReaderTest, testReaderAfterMessagesWerePublished) {
+TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testReaderAfterMessagesWerePublished";
+    std::string topicName = "testReaderAfterMessagesWerePublished" + std::to_string(time(nullptr)) +
+                            std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     Producer producer;
     ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -145,10 +166,12 @@ TEST(ReaderTest, testReaderAfterMessagesWerePublished) {
     client.close();
 }
 
-TEST(ReaderTest, testMultipleReaders) {
+TEST_P(ReaderTest, testMultipleReaders) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testMultipleReaders";
+    std::string topicName =
+        "testMultipleReaders" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     Producer producer;
     ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -190,10 +213,12 @@ TEST(ReaderTest, testMultipleReaders) {
     client.close();
 }
 
-TEST(ReaderTest, testReaderOnLastMessage) {
+TEST_P(ReaderTest, testReaderOnLastMessage) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testReaderOnLastMessage";
+    std::string topicName =
+        "testReaderOnLastMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     Producer producer;
     ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -228,10 +253,12 @@ TEST(ReaderTest, testReaderOnLastMessage) {
     client.close();
 }
 
-TEST(ReaderTest, testReaderOnSpecificMessage) {
+TEST_P(ReaderTest, testReaderOnSpecificMessage) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testReaderOnSpecificMessage";
+    std::string topicName =
+        "testReaderOnSpecificMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     Producer producer;
     ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -277,12 +304,15 @@ TEST(ReaderTest, testReaderOnSpecificMessage) {
 }
 
 /**
- * Test that we can position on a particular message even within a batch
+ * build, file MessageIdBuilder.cc, line 45.?? Test that we can position on a particular message even within a
+ * batch
  */
-TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) {
+TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testReaderOnSpecificMessageWithBatches";
+    std::string topicName = "testReaderOnSpecificMessageWithBatches" + std::to_string(time(nullptr)) +
+                            std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     Producer producer;
     // Enable batching
@@ -339,10 +369,12 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) {
     client.close();
 }
 
-TEST(ReaderTest, testReaderReachEndOfTopic) {
+TEST_P(ReaderTest, testReaderReachEndOfTopic) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testReaderReachEndOfTopic";
+    std::string topicName =
+        "testReaderReachEndOfTopic" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     // 1. create producer
     Producer producer;
@@ -414,10 +446,12 @@ TEST(ReaderTest, testReaderReachEndOfTopic) {
     client.close();
 }
 
-TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
+TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testReaderReachEndOfTopicMessageWithBatches";
+    std::string topicName = "testReaderReachEndOfTopicMessageWithoutBatches" + std::to_string(time(nullptr)) +
+                            std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     // 1. create producer
     Producer producer;
@@ -510,10 +544,12 @@ TEST(ReaderTest, testPartitionIndex) {
     client.close();
 }
 
-TEST(ReaderTest, testSubscriptionNameSetting) {
+TEST_P(ReaderTest, testSubscriptionNameSetting) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/test-subscription-name-setting";
+    std::string topicName =
+        "testSubscriptionNameSetting" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
     std::string subName = "test-sub";
 
     ReaderConfiguration readerConf;
@@ -527,10 +563,12 @@ TEST(ReaderTest, testSubscriptionNameSetting) {
     client.close();
 }
 
-TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
+TEST_P(ReaderTest, testSetSubscriptionNameAndPrefix) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix";
+    std::string topicName =
+        "testSetSubscriptionNameAndPrefix" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
     std::string subName = "test-sub";
 
     ReaderConfiguration readerConf;
@@ -545,10 +583,12 @@ TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
     client.close();
 }
 
-TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
+TEST_P(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
     Client client(serviceUrl);
 
-    std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail";
+    std::string topicName = "testMultiSameSubscriptionNameReaderShouldFail" + std::to_string(time(nullptr)) +
+                            std::to_string(isMultiTopic_);
+    initTopic(topicName);
     std::string subscriptionName = "test-sub";
 
     ReaderConfiguration readerConf1;
@@ -567,28 +607,33 @@ TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
     client.close();
 }
 
-TEST(ReaderTest, testIsConnected) {
-    const std::string topic = "testReaderIsConnected-" + std::to_string(time(nullptr));
+TEST_P(ReaderTest, testIsConnected) {
     Client client(serviceUrl);
 
+    std::string topicName = "testIsConnected" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
+
     Reader reader;
     ASSERT_FALSE(reader.isConnected());
 
-    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), {}, reader));
     ASSERT_TRUE(reader.isConnected());
 
     ASSERT_EQ(ResultOk, reader.close());
     ASSERT_FALSE(reader.isConnected());
 }
 
-TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
-    const std::string topic = "testHasMessageAvailableWhenCreated-" + std::to_string(time(nullptr));
+TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) {
     Client client(serviceUrl);
 
+    std::string topicName =
+        "testHasMessageAvailableWhenCreated" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
+
     ProducerConfiguration producerConf;
     producerConf.setBatchingMaxMessages(3);
     Producer producer;
-    ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
 
     std::vector<MessageId> messageIds;
     constexpr int numMessages = 7;
@@ -612,25 +657,28 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
     bool hasMessageAvailable;
 
     for (size_t i = 0; i < messageIds.size() - 1; i++) {
-        ASSERT_EQ(ResultOk, client.createReader(topic, messageIds[i], {}, reader));
+        ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds[i], {}, reader));
         ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
         EXPECT_TRUE(hasMessageAvailable);
     }
 
     // The start message ID is exclusive by default, so when we start at the last message, there should be no
     // message available.
-    ASSERT_EQ(ResultOk, client.createReader(topic, messageIds.back(), {}, reader));
+    ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds.back(), {}, reader));
     ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
     EXPECT_FALSE(hasMessageAvailable);
     client.close();
 }
 
-TEST(ReaderTest, testReceiveAfterSeek) {
+TEST_P(ReaderTest, testReceiveAfterSeek) {
     Client client(serviceUrl);
-    const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr));
+
+    std::string topicName =
+        "testReceiveAfterSeek" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+    initTopic(topicName);
 
     Producer producer;
-    ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+    ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
 
     MessageId seekMessageId;
     for (int i = 0; i < 5; i++) {
@@ -642,7 +690,7 @@ TEST(ReaderTest, testReceiveAfterSeek) {
     }
 
     Reader reader;
-    ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), {}, reader));
 
     reader.seek(seekMessageId);
 
@@ -651,3 +699,5 @@ TEST(ReaderTest, testReceiveAfterSeek) {
 
     client.close();
 }
+
+INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
\ No newline at end of file