You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/02/27 16:33:29 UTC

[incubator-pulsar] branch master updated: Add prefix setting for C++/Python reader (#1283)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 08b5b85  Add prefix setting for C++/Python reader (#1283)
08b5b85 is described below

commit 08b5b857c64e2cdba8cd3e7f09612ee29662c7f7
Author: yush1ga <y....@gmail.com>
AuthorDate: Wed Feb 28 01:33:27 2018 +0900

    Add prefix setting for C++/Python reader (#1283)
    
    * Add prefix setting for C++ reader
    
    * Ran "make format"
    
    * Fix python reader for subscription role prefix
---
 pulsar-client-cpp/include/pulsar/ReaderConfiguration.h | 3 +++
 pulsar-client-cpp/lib/ReaderConfiguration.cc           | 8 ++++++++
 pulsar-client-cpp/lib/ReaderConfigurationImpl.h        | 4 +++-
 pulsar-client-cpp/lib/ReaderImpl.cc                    | 3 +++
 pulsar-client-cpp/python/pulsar.py                     | 8 +++++++-
 pulsar-client-cpp/python/src/config.cc                 | 2 ++
 6 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
index 7e47628..3102cf5 100644
--- a/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ReaderConfiguration.h
@@ -83,6 +83,9 @@ class ReaderConfiguration {
     void setReaderName(const std::string& readerName);
     const std::string& getReaderName() const;
 
+    void setSubscriptionRolePrefix(const std::string& subscriptionRolePrefix);
+    const std::string& getSubscriptionRolePrefix() const;
+
    private:
     boost::shared_ptr<ReaderConfigurationImpl> impl_;
 };
diff --git a/pulsar-client-cpp/lib/ReaderConfiguration.cc b/pulsar-client-cpp/lib/ReaderConfiguration.cc
index 984c8cc..e44714f 100644
--- a/pulsar-client-cpp/lib/ReaderConfiguration.cc
+++ b/pulsar-client-cpp/lib/ReaderConfiguration.cc
@@ -48,4 +48,12 @@ int ReaderConfiguration::getReceiverQueueSize() const { return impl_->receiverQu
 const std::string& ReaderConfiguration::getReaderName() const { return impl_->readerName; }
 
 void ReaderConfiguration::setReaderName(const std::string& readerName) { impl_->readerName = readerName; }
+
+const std::string& ReaderConfiguration::getSubscriptionRolePrefix() const {
+    return impl_->subscriptionRolePrefix;
+}
+
+void ReaderConfiguration::setSubscriptionRolePrefix(const std::string& subscriptionRolePrefix) {
+    impl_->subscriptionRolePrefix = subscriptionRolePrefix;
+}
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
index bfcf6f0..67ddb3f 100644
--- a/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
+++ b/pulsar-client-cpp/lib/ReaderConfigurationImpl.h
@@ -28,7 +28,9 @@ struct ReaderConfigurationImpl {
     bool hasReaderListener;
     int receiverQueueSize;
     std::string readerName;
-    ReaderConfigurationImpl() : hasReaderListener(false), receiverQueueSize(1000), readerName() {}
+    std::string subscriptionRolePrefix;
+    ReaderConfigurationImpl()
+        : hasReaderListener(false), receiverQueueSize(1000), readerName(), subscriptionRolePrefix() {}
 };
 }  // namespace pulsar
 #endif /* LIB_READERCONFIGURATIONIMPL_H_ */
diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc
index 4feb453..308908d 100644
--- a/pulsar-client-cpp/lib/ReaderImpl.cc
+++ b/pulsar-client-cpp/lib/ReaderImpl.cc
@@ -45,6 +45,9 @@ void ReaderImpl::start(const BatchMessageId& startMessageId) {
     }
 
     std::string subscription = "reader-" + generateRandomName();
+    if (!readerConf_.getSubscriptionRolePrefix().empty()) {
+        subscription = readerConf_.getSubscriptionRolePrefix() + "-" + subscription;
+    }
 
     consumer_ = boost::make_shared<ConsumerImpl>(
         client_.lock(), topic_, subscription, consumerConf, ExecutorServicePtr(), NonPartitioned,
diff --git a/pulsar-client-cpp/python/pulsar.py b/pulsar-client-cpp/python/pulsar.py
index ac38b25..a4c6c87 100644
--- a/pulsar-client-cpp/python/pulsar.py
+++ b/pulsar-client-cpp/python/pulsar.py
@@ -432,7 +432,8 @@ class Client:
     def create_reader(self, topic, start_message_id,
                       reader_listener=None,
                       receiver_queue_size=1000,
-                      reader_name=None
+                      reader_name=None,
+                      subscription_role_prefix=None
                       ):
         """
         Create a reader on a particular topic
@@ -476,11 +477,14 @@ class Client:
           memory utilization.
         * `reader_name`:
           Sets the reader name.
+        * `subscription_role_prefix`:
+          Sets the subscription role prefix.
         """
         _check_type(str, topic, 'topic')
         _check_type(_pulsar.MessageId, start_message_id, 'start_message_id')
         _check_type(int, receiver_queue_size, 'receiver_queue_size')
         _check_type_or_none(str, reader_name, 'reader_name')
+        _check_type_or_none(str, subscription_role_prefix, 'subscription_role_prefix')
 
         conf = _pulsar.ReaderConfiguration()
         if reader_listener:
@@ -488,6 +492,8 @@ class Client:
         conf.receiver_queue_size(receiver_queue_size)
         if reader_name:
             conf.reader_name(reader_name)
+        if subscription_role_prefix:
+            conf.subscription_role_prefix(subscription_role_prefix)
         c = Reader()
         c._reader = self._client.create_reader(topic, start_message_id, conf)
         c._client = self
diff --git a/pulsar-client-cpp/python/src/config.cc b/pulsar-client-cpp/python/src/config.cc
index 7eceba6..f03c883 100644
--- a/pulsar-client-cpp/python/src/config.cc
+++ b/pulsar-client-cpp/python/src/config.cc
@@ -142,5 +142,7 @@ void export_config() {
             .def("receiver_queue_size", &ReaderConfiguration::setReceiverQueueSize)
             .def("reader_name", &ReaderConfiguration::getReaderName, return_value_policy<copy_const_reference>())
             .def("reader_name", &ReaderConfiguration::setReaderName)
+            .def("subscription_role_prefix", &ReaderConfiguration::getSubscriptionRolePrefix, return_value_policy<copy_const_reference>())
+            .def("subscription_role_prefix", &ReaderConfiguration::setSubscriptionRolePrefix)
             ;
 }

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.