You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2020/12/08 17:39:14 UTC
[pulsar] branch master updated: [Issue 8787][C++] Add reader
internal subscription name setter. (#8823)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 408f9e6 [Issue 8787][C++] Add reader internal subscription name setter. (#8823)
408f9e6 is described below
commit 408f9e61c07b436b425f9ef65ccbada022a4e7fd
Author: Zike Yang <Ro...@outlook.com>
AuthorDate: Wed Dec 9 01:38:47 2020 +0800
[Issue 8787][C++] Add reader internal subscription name setter. (#8823)
Master Issue: #8787
### Motivation
Currently, the reader subscription name can only be generated internally randomly in the C++ client.
Java client part is at #8801
### Modifications
Add a setter for the reader's internal subscription name.
### Verifying this change
This change is already covered by existing tests, such as *testSubscriptionNameSetting*, *testSetSubscriptionNameAndPrefix* and *testMultiSameSubscriptionNameReaderShouldFail*.
---
.../include/pulsar/ReaderConfiguration.h | 7 +++
pulsar-client-cpp/lib/ReaderConfiguration.cc | 9 ++++
pulsar-client-cpp/lib/ReaderConfigurationImpl.h | 1 +
pulsar-client-cpp/lib/ReaderImpl.cc | 11 +++--
pulsar-client-cpp/tests/ReaderTest.cc | 57 ++++++++++++++++++++++
5 files changed, 82 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index bf52c4f..25b0ba3 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -105,6 +105,13 @@ class PULSAR_PUBLIC ReaderConfiguration {
void setReadCompacted(bool compacted);
bool isReadCompacted() const;
+ /**
+ * Set the internal subscription name.
+ * @param internal subscriptionName
+ */
+ void setInternalSubscriptionName(std::string internalSubscriptionName);
+ const std::string& getInternalSubscriptionName() const;
+
private:
std::shared_ptr<ReaderConfigurationImpl> impl_;
};
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index fec8eed..eb18d11 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -67,4 +67,13 @@ void ReaderConfiguration::setSubscriptionRolePrefix(const std::string& subscript
bool ReaderConfiguration::isReadCompacted() const { return impl_->readCompacted; }
void ReaderConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; }
+
+void ReaderConfiguration::setInternalSubscriptionName(std::string internalSubscriptionName) {
+ impl_->internalSubscriptionName = internalSubscriptionName;
+}
+
+const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
+ return impl_->internalSubscriptionName;
+}
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index c74b217..3b5cc8a 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -30,6 +30,7 @@ struct ReaderConfigurationImpl {
std::string readerName;
std::string subscriptionRolePrefix;
bool readCompacted;
+ std::string internalSubscriptionName;
ReaderConfigurationImpl()
: schemaInfo(),
hasReaderListener(false),
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 1e08977..2ea3430 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -47,9 +47,14 @@ void ReaderImpl::start(const MessageId& startMessageId) {
std::placeholders::_1, std::placeholders::_2));
}
- std::string subscription = "reader-" + generateRandomName();
- if (!readerConf_.getSubscriptionRolePrefix().empty()) {
- subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+ std::string subscription;
+ if (!readerConf_.getInternalSubscriptionName().empty()) {
+ subscription = readerConf_.getInternalSubscriptionName();
+ } else {
+ subscription = "reader-" + generateRandomName();
+ if (!readerConf_.getSubscriptionRolePrefix().empty()) {
+ subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+ }
}
consumer_ = std::make_shared<ConsumerImpl>(
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index db4475b..a485652 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -506,3 +506,60 @@ TEST(ReaderTest, testPartitionIndex) {
client.close();
}
+
+TEST(ReaderTest, testSubscriptionNameSetting) {
+ Client client(serviceUrl);
+
+ std::string topicName = "persistent://public/default/test-subscription-name-setting";
+ std::string subName = "test-sub";
+
+ ReaderConfiguration readerConf;
+ readerConf.setInternalSubscriptionName(subName);
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+ ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+ reader.close();
+ client.close();
+}
+
+TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
+ Client client(serviceUrl);
+
+ std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix";
+ std::string subName = "test-sub";
+
+ ReaderConfiguration readerConf;
+ readerConf.setInternalSubscriptionName(subName);
+ readerConf.setSubscriptionRolePrefix("my-prefix");
+ Reader reader;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+ ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+ reader.close();
+ client.close();
+}
+
+TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
+ Client client(serviceUrl);
+
+ std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail";
+ std::string subscriptionName = "test-sub";
+
+ ReaderConfiguration readerConf1;
+ readerConf1.setInternalSubscriptionName(subscriptionName);
+ Reader reader1;
+ ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));
+
+ ReaderConfiguration readerConf2;
+ readerConf2.setInternalSubscriptionName(subscriptionName);
+ Reader reader2;
+ ASSERT_EQ(ResultConsumerBusy,
+ client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));
+
+ reader1.close();
+ reader2.close();
+ client.close();
+}