You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by xy...@apache.org on 2023/02/07 03:53:20 UTC
[pulsar-client-cpp] branch main updated: [feat] Support partitioned topic reader. (#154)
This is an automated email from the ASF dual-hosted git repository.
xyz pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 70ccd83 [feat] Support partitioned topic reader. (#154)
70ccd83 is described below
commit 70ccd83503138f5adea8b3551754c2095a5150f0
Author: Baodi Shi <wu...@icloud.com>
AuthorDate: Tue Feb 7 11:53:15 2023 +0800
[feat] Support partitioned topic reader. (#154)
---
lib/ClientImpl.cc | 11 +---
lib/ConsumerImpl.h | 2 +-
lib/ConsumerImplBase.h | 2 +
lib/MultiTopicsConsumerImpl.cc | 57 ++++++++++++++---
lib/MultiTopicsConsumerImpl.h | 14 ++++-
lib/ReaderImpl.cc | 29 ++++++---
lib/ReaderImpl.h | 10 +--
lib/UnAckedMessageTrackerEnabled.h | 1 +
tests/ConsumerTest.cc | 47 ++++++++++++++
tests/PulsarFriend.h | 2 +-
tests/ReaderTest.cc | 124 ++++++++++++++++++++++++++-----------
11 files changed, 232 insertions(+), 67 deletions(-)
diff --git a/lib/ClientImpl.cc b/lib/ClientImpl.cc
index ec9de8a..14a4047 100644
--- a/lib/ClientImpl.cc
+++ b/lib/ClientImpl.cc
@@ -255,15 +255,10 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
return;
}
- if (partitionMetadata->getPartitions() > 0) {
- LOG_ERROR("Topic reader cannot be created on a partitioned topic: " << topicName->toString());
- callback(ResultOperationNotSupported, Reader());
- return;
- }
-
- ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(), conf,
+ ReaderImplPtr reader = std::make_shared<ReaderImpl>(shared_from_this(), topicName->toString(),
+ partitionMetadata->getPartitions(), conf,
getListenerExecutorProvider()->get(), callback);
- ConsumerImplBasePtr consumer = reader->getConsumer().lock();
+ ConsumerImplBasePtr consumer = reader->getConsumer();
auto self = shared_from_this();
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
auto consumer = weakConsumerPtr.lock();
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 6437472..b9292c0 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -124,6 +124,7 @@ class ConsumerImpl : public ConsumerImplBase {
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
+ void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
virtual void disconnectConsumer();
Result fetchSingleMessageFromBroker(Message& msg);
@@ -133,7 +134,6 @@ class ConsumerImpl : public ConsumerImplBase {
virtual void redeliverMessages(const std::set<MessageId>& messageIds);
virtual bool isReadCompacted();
- virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
void beforeConnectionChange(ClientConnection& cnx) override;
protected:
diff --git a/lib/ConsumerImplBase.h b/lib/ConsumerImplBase.h
index 9cf63a3..73c9161 100644
--- a/lib/ConsumerImplBase.h
+++ b/lib/ConsumerImplBase.h
@@ -29,6 +29,7 @@
#include "HandlerBase.h"
namespace pulsar {
+typedef std::function<void(Result result, bool hasMessageAvailable)> HasMessageAvailableCallback;
class ConsumerImplBase;
using ConsumerImplBaseWeakPtr = std::weak_ptr<ConsumerImplBase>;
class OpBatchReceive {
@@ -76,6 +77,7 @@ class ConsumerImplBase : public HandlerBase, public std::enable_shared_from_this
virtual uint64_t getNumberOfConnectedConsumer() = 0;
// overrided methods from HandlerBase
virtual const std::string& getName() const override = 0;
+ virtual void hasMessageAvailableAsync(HasMessageAvailableCallback callback) = 0;
protected:
// overrided methods from HandlerBase
diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc
index 7410b74..39da4a6 100644
--- a/lib/MultiTopicsConsumerImpl.cc
+++ b/lib/MultiTopicsConsumerImpl.cc
@@ -40,16 +40,20 @@ using namespace pulsar;
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName,
int numPartitions, const std::string& subscriptionName,
const ConsumerConfiguration& conf,
- LookupServicePtr lookupServicePtr)
+ LookupServicePtr lookupServicePtr,
+ const Commands::SubscriptionMode subscriptionMode,
+ boost::optional<MessageId> startMessageId)
: MultiTopicsConsumerImpl(client, {topicName->toString()}, subscriptionName, topicName, conf,
- lookupServicePtr) {
+ lookupServicePtr, subscriptionMode, startMessageId) {
topicsPartitions_[topicName->toString()] = numPartitions;
}
MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
const ConsumerConfiguration& conf,
- LookupServicePtr lookupServicePtr)
+ LookupServicePtr lookupServicePtr,
+ const Commands::SubscriptionMode subscriptionMode,
+ boost::optional<MessageId> startMessageId)
: ConsumerImplBase(client, topicName ? topicName->toString() : "EmptyTopics",
Backoff(milliseconds(100), seconds(60), milliseconds(0)), conf,
client->getListenerExecutorProvider()->get()),
@@ -60,7 +64,9 @@ MultiTopicsConsumerImpl::MultiTopicsConsumerImpl(ClientImplPtr client, const std
messageListener_(conf.getMessageListener()),
lookupServicePtr_(lookupServicePtr),
numberTopicPartitions_(std::make_shared<std::atomic<int>>(0)),
- topics_(topics) {
+ topics_(topics),
+ subscriptionMode_(subscriptionMode),
+ startMessageId_(startMessageId) {
std::stringstream consumerStrStream;
consumerStrStream << "[Muti Topics Consumer: "
<< "TopicName - " << topic_ << " - Subscription - " << subscriptionName << "]";
@@ -226,7 +232,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
// We don't have to add partition-n suffix
consumer = std::make_shared<ConsumerImpl>(client, topicName->toString(), subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor, true,
- NonPartitioned);
+ NonPartitioned, subscriptionMode_, startMessageId_);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
@@ -239,7 +245,7 @@ void MultiTopicsConsumerImpl::subscribeTopicPartitions(int numPartitions, TopicN
std::string topicPartitionName = topicName->getTopicPartitionName(i);
consumer = std::make_shared<ConsumerImpl>(client, topicPartitionName, subscriptionName_, config,
topicName->isPersistent(), internalListenerExecutor,
- true, Partitioned);
+ true, Partitioned, subscriptionMode_, startMessageId_);
consumer->getConsumerCreatedFuture().addListener(std::bind(
&MultiTopicsConsumerImpl::handleSingleConsumerCreated, get_shared_this_ptr(),
std::placeholders::_1, std::placeholders::_2, partitionsNeedCreate, topicSubResultPromise));
@@ -686,7 +692,12 @@ void MultiTopicsConsumerImpl::acknowledgeAsync(const MessageIdList& messageIdLis
}
void MultiTopicsConsumerImpl::acknowledgeCumulativeAsync(const MessageId& msgId, ResultCallback callback) {
- callback(ResultOperationNotSupported);
+ msgId.getTopicName();
+ auto optConsumer = consumers_.find(msgId.getTopicName());
+ if (optConsumer) {
+ unAckedMessageTrackerPtr_->removeMessagesTill(msgId);
+ optConsumer.value()->acknowledgeCumulativeAsync(msgId, callback);
+ }
}
void MultiTopicsConsumerImpl::negativeAcknowledge(const MessageId& msgId) {
@@ -1047,3 +1058,35 @@ void MultiTopicsConsumerImpl::cancelTimers() noexcept {
partitionsUpdateTimer_->cancel(ec);
}
}
+
+void MultiTopicsConsumerImpl::hasMessageAvailableAsync(HasMessageAvailableCallback callback) {
+ if (incomingMessagesSize_ > 0) {
+ callback(ResultOk, true);
+ return;
+ }
+
+ auto hasMessageAvailable = std::make_shared<std::atomic<bool>>();
+ auto needCallBack = std::make_shared<std::atomic<int>>(consumers_.size());
+ auto self = get_shared_this_ptr();
+
+ consumers_.forEachValue([self, needCallBack, callback, hasMessageAvailable](ConsumerImplPtr consumer) {
+ consumer->hasMessageAvailableAsync(
+ [self, needCallBack, callback, hasMessageAvailable](Result result, bool hasMsg) {
+ if (result != ResultOk) {
+ LOG_ERROR("Filed when acknowledge list: " << result);
+ // set needCallBack is -1 to avoid repeated callback.
+ needCallBack->store(-1);
+ callback(result, false);
+ return;
+ }
+
+ if (hasMsg) {
+ hasMessageAvailable->store(hasMsg);
+ }
+
+ if (--(*needCallBack) == 0) {
+ callback(result, hasMessageAvailable->load() || self->incomingMessagesSize_ > 0);
+ }
+ });
+ });
+}
diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h
index 50cdecf..a9ee160 100644
--- a/lib/MultiTopicsConsumerImpl.h
+++ b/lib/MultiTopicsConsumerImpl.h
@@ -25,6 +25,7 @@
#include <vector>
#include "BlockingQueue.h"
+#include "Commands.h"
#include "ConsumerImplBase.h"
#include "Future.h"
#include "Latch.h"
@@ -53,10 +54,15 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
public:
MultiTopicsConsumerImpl(ClientImplPtr client, TopicNamePtr topicName, int numPartitions,
const std::string& subscriptionName, const ConsumerConfiguration& conf,
- LookupServicePtr lookupServicePtr);
+ LookupServicePtr lookupServicePtr,
+ Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
+ boost::optional<MessageId> startMessageId = boost::none);
+
MultiTopicsConsumerImpl(ClientImplPtr client, const std::vector<std::string>& topics,
const std::string& subscriptionName, TopicNamePtr topicName,
- const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_);
+ const ConsumerConfiguration& conf, LookupServicePtr lookupServicePtr_,
+ Commands::SubscriptionMode = Commands::SubscriptionModeDurable,
+ boost::optional<MessageId> startMessageId = boost::none);
~MultiTopicsConsumerImpl();
// overrided methods from ConsumerImplBase
@@ -88,6 +94,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
void negativeAcknowledge(const MessageId& msgId) override;
bool isConnected() const override;
uint64_t getNumberOfConnectedConsumer() override;
+ void hasMessageAvailableAsync(HasMessageAvailableCallback callback) override;
void handleGetConsumerStats(Result, BrokerConsumerStats, LatchPtr, MultiTopicsBrokerConsumerStatsPtr,
size_t, BrokerConsumerStatsCallback);
@@ -118,6 +125,8 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
UnAckedMessageTrackerPtr unAckedMessageTrackerPtr_;
const std::vector<std::string> topics_;
std::queue<ReceiveCallback> pendingReceives_;
+ const Commands::SubscriptionMode subscriptionMode_;
+ boost::optional<MessageId> startMessageId_;
/* methods */
void handleSinglePartitionConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
@@ -167,6 +176,7 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase {
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
+ FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
};
typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr;
diff --git a/lib/ReaderImpl.cc b/lib/ReaderImpl.cc
index da1d95e..a06c0ec 100644
--- a/lib/ReaderImpl.cc
+++ b/lib/ReaderImpl.cc
@@ -23,6 +23,7 @@
#include "ConsumerImpl.h"
#include "ExecutorService.h"
#include "GetLastMessageIdResponse.h"
+#include "MultiTopicsConsumerImpl.h"
#include "TopicName.h"
namespace pulsar {
@@ -35,9 +36,14 @@ ConsumerConfiguration consumerConfigOfReader;
static ResultCallback emptyCallback;
-ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
- const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback)
- : topic_(topic), client_(client), readerConf_(conf), readerCreatedCallback_(readerCreatedCallback) {}
+ReaderImpl::ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
+ const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
+ ReaderCallback readerCreatedCallback)
+ : topic_(topic),
+ partitions_(partitions),
+ client_(client),
+ readerConf_(conf),
+ readerCreatedCallback_(readerCreatedCallback) {}
void ReaderImpl::start(const MessageId& startMessageId,
std::function<void(const ConsumerImplBaseWeakPtr&)> callback) {
@@ -80,10 +86,19 @@ void ReaderImpl::start(const MessageId& startMessageId,
test::consumerConfigOfReader = consumerConf.clone();
}
- consumer_ = std::make_shared<ConsumerImpl>(
- client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
- ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable, startMessageId);
- consumer_->setPartitionIndex(TopicName::getPartitionIndex(topic_));
+ if (partitions_ > 0) {
+ auto consumerImpl = std::make_shared<MultiTopicsConsumerImpl>(
+ client_.lock(), TopicName::get(topic_), partitions_, subscription, consumerConf,
+ client_.lock()->getLookup(), Commands::SubscriptionModeNonDurable, startMessageId);
+ consumer_ = consumerImpl;
+ } else {
+ auto consumerImpl = std::make_shared<ConsumerImpl>(
+ client_.lock(), topic_, subscription, consumerConf, TopicName::get(topic_)->isPersistent(),
+ ExecutorServicePtr(), false, NonPartitioned, Commands::SubscriptionModeNonDurable,
+ startMessageId);
+ consumerImpl->setPartitionIndex(TopicName::getPartitionIndex(topic_));
+ consumer_ = consumerImpl;
+ }
auto self = shared_from_this();
consumer_->getConsumerCreatedFuture().addListener(
[this, self, callback](Result result, const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
diff --git a/lib/ReaderImpl.h b/lib/ReaderImpl.h
index e216241..3731990 100644
--- a/lib/ReaderImpl.h
+++ b/lib/ReaderImpl.h
@@ -58,8 +58,9 @@ extern PULSAR_PUBLIC ConsumerConfiguration consumerConfigOfReader;
class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> {
public:
- ReaderImpl(const ClientImplPtr client, const std::string& topic, const ReaderConfiguration& conf,
- const ExecutorServicePtr listenerExecutor, ReaderCallback readerCreatedCallback);
+ ReaderImpl(const ClientImplPtr client, const std::string& topic, int partitions,
+ const ReaderConfiguration& conf, const ExecutorServicePtr listenerExecutor,
+ ReaderCallback readerCreatedCallback);
void start(const MessageId& startMessageId, std::function<void(const ConsumerImplBaseWeakPtr&)> callback);
@@ -73,7 +74,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture();
- ConsumerImplWeakPtr getConsumer() const noexcept { return consumer_; }
+ ConsumerImplBasePtr getConsumer() const noexcept { return consumer_; }
void hasMessageAvailableAsync(HasMessageAvailableCallback callback);
@@ -90,9 +91,10 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl>
void acknowledgeIfNecessary(Result result, const Message& msg);
std::string topic_;
+ int partitions_;
ClientImplWeakPtr client_;
ReaderConfiguration readerConf_;
- ConsumerImplPtr consumer_;
+ ConsumerImplBasePtr consumer_;
ReaderCallback readerCreatedCallback_;
ReaderListener readerListener_;
};
diff --git a/lib/UnAckedMessageTrackerEnabled.h b/lib/UnAckedMessageTrackerEnabled.h
index 0bdcc85..1453460 100644
--- a/lib/UnAckedMessageTrackerEnabled.h
+++ b/lib/UnAckedMessageTrackerEnabled.h
@@ -64,6 +64,7 @@ class UnAckedMessageTrackerEnabled : public UnAckedMessageTrackerInterface {
FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery);
FRIEND_TEST(ConsumerTest, testBatchUnAckedMessageTracker);
+ FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition);
};
} // namespace pulsar
diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc
index f3c8abb..ac971b3 100644
--- a/tests/ConsumerTest.cc
+++ b/tests/ConsumerTest.cc
@@ -261,6 +261,53 @@ TEST(ConsumerTest, testConsumerEventWithPartition) {
ASSERT_EQ(0, result.size());
}
+TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition) {
+ Client client(lookupUrl);
+
+ const std::string topic = "testAcknowledgeCumulativeWithPartition-" + std::to_string(time(nullptr));
+ const std::string subName = "sub";
+
+ int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions",
+ std::to_string(2));
+ ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
+
+ Consumer consumer;
+ ConsumerConfiguration consumerConfiguration;
+ consumerConfiguration.setUnAckedMessagesTimeoutMs(10000);
+ ASSERT_EQ(ResultOk, client.subscribe(topic, "t-sub", consumerConfiguration, consumer));
+
+ Producer producer;
+ ProducerConfiguration producerConfiguration;
+ producerConfiguration.setBatchingEnabled(false);
+ producerConfiguration.setPartitionsRoutingMode(
+ ProducerConfiguration::PartitionsRoutingMode::RoundRobinDistribution);
+ ASSERT_EQ(ResultOk, client.createProducer(topic, producerConfiguration, producer));
+
+ const int numMessages = 100;
+ for (int i = 0; i < numMessages; ++i) {
+ Message msg = MessageBuilder().setContent(std::to_string(i)).build();
+ ASSERT_EQ(ResultOk, producer.send(msg));
+ }
+
+ Message msg;
+ for (int i = 0; i < numMessages; i++) {
+ ASSERT_EQ(ResultOk, consumer.receive(msg));
+ // The last message of each partition topic be ACK
+ if (i >= numMessages - 2) {
+ consumer.acknowledgeCumulative(msg.getMessageId());
+ }
+ }
+ ASSERT_EQ(ResultTimeout, consumer.receive(msg, 2000));
+
+ // Assert that there is no message in the tracker.
+ auto multiConsumerImpl = PulsarFriend::getMultiTopicsConsumerImplPtr(consumer);
+ auto tracker =
+ static_cast<UnAckedMessageTrackerEnabled*>(multiConsumerImpl->unAckedMessageTrackerPtr_.get());
+ ASSERT_EQ(0, tracker->size());
+
+ client.close();
+}
+
TEST(ConsumerTest, consumerNotInitialized) {
Consumer consumer;
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index aa7737d..1ef1ee7 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -99,7 +99,7 @@ class PulsarFriend {
}
static ConsumerImplPtr getConsumer(Reader reader) {
- return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock());
+ return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer());
}
static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; }
diff --git a/tests/ReaderTest.cc b/tests/ReaderTest.cc
index eefe1bc..afef384 100644
--- a/tests/ReaderTest.cc
+++ b/tests/ReaderTest.cc
@@ -37,10 +37,28 @@ using namespace pulsar;
static std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";
-TEST(ReaderTest, testSimpleReader) {
+class ReaderTest : public ::testing::TestWithParam<bool> {
+ public:
+ void initTopic(std::string topicName) {
+ if (isMultiTopic_) {
+ // call admin api to make it partitioned
+ std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
+ int res = makePutRequest(url, "5");
+ LOG_INFO("res = " << res);
+ ASSERT_FALSE(res != 204 && res != 409);
+ }
+ }
+
+ protected:
+ bool isMultiTopic_ = GetParam();
+};
+
+TEST_P(ReaderTest, testSimpleReader) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/test-simple-reader";
+ std::string topicName =
+ "test-simple-reader" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
ReaderConfiguration readerConf;
Reader reader;
@@ -69,10 +87,11 @@ TEST(ReaderTest, testSimpleReader) {
client.close();
}
-TEST(ReaderTest, testAsyncRead) {
+TEST_P(ReaderTest, testAsyncRead) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/test-simple-reader" + std::to_string(time(nullptr));
+ std::string topicName = "testAsyncRead" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
ReaderConfiguration readerConf;
Reader reader;
@@ -113,10 +132,12 @@ TEST(ReaderTest, testAsyncRead) {
client.close();
}
-TEST(ReaderTest, testReaderAfterMessagesWerePublished) {
+TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testReaderAfterMessagesWerePublished";
+ std::string topicName = "testReaderAfterMessagesWerePublished" + std::to_string(time(nullptr)) +
+ std::to_string(isMultiTopic_);
+ initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -145,10 +166,12 @@ TEST(ReaderTest, testReaderAfterMessagesWerePublished) {
client.close();
}
-TEST(ReaderTest, testMultipleReaders) {
+TEST_P(ReaderTest, testMultipleReaders) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testMultipleReaders";
+ std::string topicName =
+ "testMultipleReaders" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -190,10 +213,12 @@ TEST(ReaderTest, testMultipleReaders) {
client.close();
}
-TEST(ReaderTest, testReaderOnLastMessage) {
+TEST_P(ReaderTest, testReaderOnLastMessage) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testReaderOnLastMessage";
+ std::string topicName =
+ "testReaderOnLastMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -228,10 +253,12 @@ TEST(ReaderTest, testReaderOnLastMessage) {
client.close();
}
-TEST(ReaderTest, testReaderOnSpecificMessage) {
+TEST_P(ReaderTest, testReaderOnSpecificMessage) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testReaderOnSpecificMessage";
+ std::string topicName =
+ "testReaderOnSpecificMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
@@ -277,12 +304,15 @@ TEST(ReaderTest, testReaderOnSpecificMessage) {
}
/**
- * Test that we can position on a particular message even within a batch
+ * build, file MessageIdBuilder.cc, line 45.?? Test that we can position on a particular message even within a
+ * batch
*/
-TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) {
+TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testReaderOnSpecificMessageWithBatches";
+ std::string topicName = "testReaderOnSpecificMessageWithBatches" + std::to_string(time(nullptr)) +
+ std::to_string(isMultiTopic_);
+ initTopic(topicName);
Producer producer;
// Enable batching
@@ -339,10 +369,12 @@ TEST(ReaderTest, testReaderOnSpecificMessageWithBatches) {
client.close();
}
-TEST(ReaderTest, testReaderReachEndOfTopic) {
+TEST_P(ReaderTest, testReaderReachEndOfTopic) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testReaderReachEndOfTopic";
+ std::string topicName =
+ "testReaderReachEndOfTopic" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
// 1. create producer
Producer producer;
@@ -414,10 +446,12 @@ TEST(ReaderTest, testReaderReachEndOfTopic) {
client.close();
}
-TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
+TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testReaderReachEndOfTopicMessageWithBatches";
+ std::string topicName = "testReaderReachEndOfTopicMessageWithoutBatches" + std::to_string(time(nullptr)) +
+ std::to_string(isMultiTopic_);
+ initTopic(topicName);
// 1. create producer
Producer producer;
@@ -510,10 +544,12 @@ TEST(ReaderTest, testPartitionIndex) {
client.close();
}
-TEST(ReaderTest, testSubscriptionNameSetting) {
+TEST_P(ReaderTest, testSubscriptionNameSetting) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/test-subscription-name-setting";
+ std::string topicName =
+ "testSubscriptionNameSetting" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
std::string subName = "test-sub";
ReaderConfiguration readerConf;
@@ -527,10 +563,12 @@ TEST(ReaderTest, testSubscriptionNameSetting) {
client.close();
}
-TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
+TEST_P(ReaderTest, testSetSubscriptionNameAndPrefix) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix";
+ std::string topicName =
+ "testSetSubscriptionNameAndPrefix" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
std::string subName = "test-sub";
ReaderConfiguration readerConf;
@@ -545,10 +583,12 @@ TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
client.close();
}
-TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
+TEST_P(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
Client client(serviceUrl);
- std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail";
+ std::string topicName = "testMultiSameSubscriptionNameReaderShouldFail" + std::to_string(time(nullptr)) +
+ std::to_string(isMultiTopic_);
+ initTopic(topicName);
std::string subscriptionName = "test-sub";
ReaderConfiguration readerConf1;
@@ -567,28 +607,33 @@ TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
client.close();
}
-TEST(ReaderTest, testIsConnected) {
- const std::string topic = "testReaderIsConnected-" + std::to_string(time(nullptr));
+TEST_P(ReaderTest, testIsConnected) {
Client client(serviceUrl);
+ std::string topicName = "testIsConnected" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
+
Reader reader;
ASSERT_FALSE(reader.isConnected());
- ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::earliest(), {}, reader));
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), {}, reader));
ASSERT_TRUE(reader.isConnected());
ASSERT_EQ(ResultOk, reader.close());
ASSERT_FALSE(reader.isConnected());
}
-TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
- const std::string topic = "testHasMessageAvailableWhenCreated-" + std::to_string(time(nullptr));
+TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) {
Client client(serviceUrl);
+ std::string topicName =
+ "testHasMessageAvailableWhenCreated" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
+
ProducerConfiguration producerConf;
producerConf.setBatchingMaxMessages(3);
Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
std::vector<MessageId> messageIds;
constexpr int numMessages = 7;
@@ -612,25 +657,28 @@ TEST(ReaderTest, testHasMessageAvailableWhenCreated) {
bool hasMessageAvailable;
for (size_t i = 0; i < messageIds.size() - 1; i++) {
- ASSERT_EQ(ResultOk, client.createReader(topic, messageIds[i], {}, reader));
+ ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds[i], {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_TRUE(hasMessageAvailable);
}
// The start message ID is exclusive by default, so when we start at the last message, there should be no
// message available.
- ASSERT_EQ(ResultOk, client.createReader(topic, messageIds.back(), {}, reader));
+ ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds.back(), {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_FALSE(hasMessageAvailable);
client.close();
}
-TEST(ReaderTest, testReceiveAfterSeek) {
+TEST_P(ReaderTest, testReceiveAfterSeek) {
Client client(serviceUrl);
- const std::string topic = "reader-test-receive-after-seek-" + std::to_string(time(nullptr));
+
+ std::string topicName =
+ "testReceiveAfterSeek" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
+ initTopic(topicName);
Producer producer;
- ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
+ ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
MessageId seekMessageId;
for (int i = 0; i < 5; i++) {
@@ -642,7 +690,7 @@ TEST(ReaderTest, testReceiveAfterSeek) {
}
Reader reader;
- ASSERT_EQ(ResultOk, client.createReader(topic, MessageId::latest(), {}, reader));
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), {}, reader));
reader.seek(seekMessageId);
@@ -651,3 +699,5 @@ TEST(ReaderTest, testReceiveAfterSeek) {
client.close();
}
+
+INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));
\ No newline at end of file