You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/09 15:30:11 UTC

[pulsar] branch master updated: add support get getTopicName for CPP/Python clients (#3326)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new aae3755  add support get getTopicName for CPP/Python clients (#3326)
aae3755 is described below

commit aae3755f6445b582700e75ace0e01c205c87fdfd
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Jan 9 07:30:04 2019 -0800

    add support get getTopicName for CPP/Python clients (#3326)
    
    * add support get getTopicName for CPP/Python clients
    
    * set topic in ConsumerImpl and PartitionedConsumerImpl and add tests
    
    * cleaning up
    
    * clean up spacing
---
 pulsar-client-cpp/include/pulsar/Message.h       |  5 ++++
 pulsar-client-cpp/include/pulsar/MessageId.h     |  2 +-
 pulsar-client-cpp/lib/ConsumerImpl.cc            |  1 +
 pulsar-client-cpp/lib/Message.cc                 |  7 +++++
 pulsar-client-cpp/lib/MessageImpl.h              |  2 +-
 pulsar-client-cpp/lib/PartitionedConsumerImpl.cc |  2 ++
 pulsar-client-cpp/python/pulsar/__init__.py      |  6 ++++
 pulsar-client-cpp/python/pulsar_test.py          | 35 +++++++++++++++++++++++-
 pulsar-client-cpp/python/src/message.cc          |  7 +++++
 9 files changed, 64 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index a3b9af0..08d064a 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -122,6 +122,11 @@ class Message {
      */
     uint64_t getEventTimestamp() const;
 
+    /**
+     * Get the topic Name from which this message originated from
+     */
+    const std::string& getTopicName() const;
+
    private:
     typedef boost::shared_ptr<MessageImpl> MessageImplPtr;
     MessageImplPtr impl_;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index e9ff133..4767bea 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -52,7 +52,7 @@ class MessageId {
     void serialize(std::string& result) const;
 
     /**
-     * Get the topic Name
+     * Get the topic Name from which this message originated from
      */
     const std::string& getTopicName() const;
 
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 2f19ba6..d2954a1 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -279,6 +279,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
 
     Message m(msg, metadata, payload, partitionIndex_);
     m.impl_->cnx_ = cnx.get();
+    m.impl_->topicName_ = &topic_;
 
     LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
     LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 683e718..f78bcb3 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -111,6 +111,13 @@ const std::string& Message::getPartitionKey() const {
     return impl_->getPartitionKey();
 }
 
+const std::string& Message::getTopicName() const {
+    if (!impl_) {
+        return emptyString;
+    }
+    return impl_->getTopicName();
+}
+
 uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }
 
 uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index 0ef63e8..4fa96c0 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -52,7 +52,7 @@ class MessageImpl {
     uint64_t getEventTimestamp() const;
 
     /**
-     * Get a valid topicName
+     * Get the topic Name from which this message originated from
      */
     const std::string& getTopicName();
 
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 199edb2..e3f93c7 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -314,6 +314,8 @@ bool PartitionedConsumerImpl::isOpen() {
 
 void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
     LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
+    const std::string& topicPartitionName = consumer.getTopic();
+    msg.impl_->setTopicName(topicPartitionName);
     messages_.push(msg);
     if (messageListener_) {
         listenerExecutor_->postWork(
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 794c006..ab2cd3f 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -179,6 +179,12 @@ class Message:
         """
         return self._message.message_id()
 
+    def topic_name(self):
+        """
+        Get the topic Name from which this message originated from
+        """
+        return self._message.topic_name()
+
 
 class Authentication:
     """
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index cf95e3e..2df171b 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -832,7 +832,40 @@ class PulsarTest(TestCase):
         self.assertEqual(msg.data(), b'hello')
         client.close()
 
-    #####
+    def test_get_topic_name(self):
+        client = Client(self.serviceUrl)
+        consumer = client.subscribe('persistent://public/default/topic_name_test',
+                                    'topic_name_test_sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = client.create_producer('persistent://public/default/topic_name_test')
+        producer.send(b'hello')
+
+        msg = consumer.receive(1000)
+        self.assertEqual(msg.topic_name(), 'persistent://public/default/topic_name_test')
+        client.close()
+
+    def test_get_partitioned_topic_name(self):
+        client = Client(self.serviceUrl)
+        url1 = self.adminUrl + '/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions'
+        doHttpPut(url1, '3')
+
+        partitions = ['persistent://public/default/partitioned_topic_name_test-partition-0',
+                      'persistent://public/default/partitioned_topic_name_test-partition-1',
+                      'persistent://public/default/partitioned_topic_name_test-partition-2']
+        self.assertEqual(client.get_topic_partitions('persistent://public/default/partitioned_topic_name_test'),
+                         partitions)
+
+        consumer = client.subscribe('persistent://public/default/partitioned_topic_name_test',
+                                    'partitioned_topic_name_test_sub',
+                                    consumer_type=ConsumerType.Shared)
+        producer = client.create_producer('persistent://public/default/partitioned_topic_name_test')
+        producer.send(b'hello')
+
+        msg = consumer.receive(1000)
+        self.assertTrue(msg.topic_name() in partitions)
+        client.close()
+
+#####
 
     def _check_value_error(self, fun):
         try:
diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc
index 4232249..6cff712 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -66,6 +66,12 @@ boost::python::object Message_data(const Message& msg) {
     return boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength())));
 }
 
+std::string Topic_name_str(const Message& msg) {
+    std::stringstream ss;
+    ss << msg.getTopicName();
+    return ss.str();
+}
+
 const MessageId& Message_getMessageId(const Message& msg) {
     return msg.getMessageId();
 }
@@ -117,5 +123,6 @@ void export_message() {
             .def("event_timestamp", &Message::getEventTimestamp)
             .def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>())
             .def("__str__", &Message_str)
+            .def("topic_name", &Topic_name_str)
             ;
 }