You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/21 12:31:38 UTC

[pulsar] 02/02: [Issue 8787][C++] Add reader internal subscription name setter. (#8823)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 2f9f639daed910758b3015ee9883b156f7065a4a
Author: Zike Yang <Ro...@outlook.com>
AuthorDate: Wed Dec 9 01:38:47 2020 +0800

    [Issue 8787][C++] Add reader internal subscription name setter. (#8823)
    
    Master Issue: #8787
    
    ### Motivation
    
    Currently, the reader subscription name can only be generated internally randomly in the C++ client.
    Java client part is at #8801
    
    ### Modifications
    
    Add a setter for the reader's internal subscription name.
    
    ### Verifying this change
    
    This change is already covered by existing tests, such as *testSubscriptionNameSetting*, *testSetSubscriptionNameAndPrefix* and *testMultiSameSubscriptionNameReaderShouldFail*.
    
    (cherry picked from commit 408f9e61c07b436b425f9ef65ccbada022a4e7fd)
---
 .../include/pulsar/ReaderConfiguration.h           |  7 +++
 pulsar-client-cpp/lib/ReaderConfiguration.cc       |  9 ++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h    |  1 +
 pulsar-client-cpp/lib/ReaderImpl.cc                | 11 +++--
 pulsar-client-cpp/tests/ReaderTest.cc              | 57 ++++++++++++++++++++++
 5 files changed, 82 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index bf52c4f..25b0ba3 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -105,6 +105,13 @@ class PULSAR_PUBLIC ReaderConfiguration {
     void setReadCompacted(bool compacted);
     bool isReadCompacted() const;
 
+    /**
+     * Set the internal subscription name.
+     * @param internal subscriptionName
+     */
+    void setInternalSubscriptionName(std::string internalSubscriptionName);
+    const std::string& getInternalSubscriptionName() const;
+
    private:
     std::shared_ptr<ReaderConfigurationImpl> impl_;
 };
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index fec8eed..eb18d11 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -67,4 +67,13 @@ void ReaderConfiguration::setSubscriptionRolePrefix(const std::string& subscript
 bool ReaderConfiguration::isReadCompacted() const { return impl_->readCompacted; }
 
 void ReaderConfiguration::setReadCompacted(bool compacted) { impl_->readCompacted = compacted; }
+
+void ReaderConfiguration::setInternalSubscriptionName(std::string internalSubscriptionName) {
+    impl_->internalSubscriptionName = internalSubscriptionName;
+}
+
+const std::string& ReaderConfiguration::getInternalSubscriptionName() const {
+    return impl_->internalSubscriptionName;
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index c74b217..3b5cc8a 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -30,6 +30,7 @@ struct ReaderConfigurationImpl {
     std::string readerName;
     std::string subscriptionRolePrefix;
     bool readCompacted;
+    std::string internalSubscriptionName;
     ReaderConfigurationImpl()
         : schemaInfo(),
           hasReaderListener(false),
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 1e08977..2ea3430 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -47,9 +47,14 @@ void ReaderImpl::start(const MessageId& startMessageId) {
                                                   std::placeholders::_1, std::placeholders::_2));
     }
 
-    std::string subscription = "reader-" + generateRandomName();
-    if (!readerConf_.getSubscriptionRolePrefix().empty()) {
-        subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+    std::string subscription;
+    if (!readerConf_.getInternalSubscriptionName().empty()) {
+        subscription = readerConf_.getInternalSubscriptionName();
+    } else {
+        subscription = "reader-" + generateRandomName();
+        if (!readerConf_.getSubscriptionRolePrefix().empty()) {
+            subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+        }
     }
 
     consumer_ = std::make_shared<ConsumerImpl>(
diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc
index db4475b..a485652 100644
--- a/pulsar-client-cpp/tests/ReaderTest.cc
+++ b/pulsar-client-cpp/tests/ReaderTest.cc
@@ -506,3 +506,60 @@ TEST(ReaderTest, testPartitionIndex) {
 
     client.close();
 }
+
+TEST(ReaderTest, testSubscriptionNameSetting) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/test-subscription-name-setting";
+    std::string subName = "test-sub";
+
+    ReaderConfiguration readerConf;
+    readerConf.setInternalSubscriptionName(subName);
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+    reader.close();
+    client.close();
+}
+
+TEST(ReaderTest, testSetSubscriptionNameAndPrefix) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testSetSubscriptionNameAndPrefix";
+    std::string subName = "test-sub";
+
+    ReaderConfiguration readerConf;
+    readerConf.setInternalSubscriptionName(subName);
+    readerConf.setSubscriptionRolePrefix("my-prefix");
+    Reader reader;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
+
+    ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName());
+
+    reader.close();
+    client.close();
+}
+
+TEST(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
+    Client client(serviceUrl);
+
+    std::string topicName = "persistent://public/default/testMultiSameSubscriptionNameReaderShouldFail";
+    std::string subscriptionName = "test-sub";
+
+    ReaderConfiguration readerConf1;
+    readerConf1.setInternalSubscriptionName(subscriptionName);
+    Reader reader1;
+    ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));
+
+    ReaderConfiguration readerConf2;
+    readerConf2.setInternalSubscriptionName(subscriptionName);
+    Reader reader2;
+    ASSERT_EQ(ResultConsumerBusy,
+              client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));
+
+    reader1.close();
+    reader2.close();
+    client.close();
+}