You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/01/09 15:30:11 UTC
[pulsar] branch master updated: add support get getTopicName for
CPP/Python clients (#3326)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new aae3755 add support get getTopicName for CPP/Python clients (#3326)
aae3755 is described below
commit aae3755f6445b582700e75ace0e01c205c87fdfd
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Jan 9 07:30:04 2019 -0800
add support get getTopicName for CPP/Python clients (#3326)
* add support get getTopicName for CPP/Python clients
* set topic in ConsumerImpl and PartitionedConsumerImpl and add tests
* cleaning up
* clean up spacing
---
pulsar-client-cpp/include/pulsar/Message.h | 5 ++++
pulsar-client-cpp/include/pulsar/MessageId.h | 2 +-
pulsar-client-cpp/lib/ConsumerImpl.cc | 1 +
pulsar-client-cpp/lib/Message.cc | 7 +++++
pulsar-client-cpp/lib/MessageImpl.h | 2 +-
pulsar-client-cpp/lib/PartitionedConsumerImpl.cc | 2 ++
pulsar-client-cpp/python/pulsar/__init__.py | 6 ++++
pulsar-client-cpp/python/pulsar_test.py | 35 +++++++++++++++++++++++-
pulsar-client-cpp/python/src/message.cc | 7 +++++
9 files changed, 64 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index a3b9af0..08d064a 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -122,6 +122,11 @@ class Message {
*/
uint64_t getEventTimestamp() const;
+ /**
+ * Get the topic Name from which this message originated from
+ */
+ const std::string& getTopicName() const;
+
private:
typedef boost::shared_ptr<MessageImpl> MessageImplPtr;
MessageImplPtr impl_;
diff --git a/pulsar-client-cpp/include/pulsar/MessageId.h b/pulsar-client-cpp/include/pulsar/MessageId.h
index e9ff133..4767bea 100644
--- a/pulsar-client-cpp/include/pulsar/MessageId.h
+++ b/pulsar-client-cpp/include/pulsar/MessageId.h
@@ -52,7 +52,7 @@ class MessageId {
void serialize(std::string& result) const;
/**
- * Get the topic Name
+ * Get the topic Name from which this message originated from
*/
const std::string& getTopicName() const;
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index 2f19ba6..d2954a1 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -279,6 +279,7 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
Message m(msg, metadata, payload, partitionIndex_);
m.impl_->cnx_ = cnx.get();
+ m.impl_->topicName_ = &topic_;
LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 683e718..f78bcb3 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -111,6 +111,13 @@ const std::string& Message::getPartitionKey() const {
return impl_->getPartitionKey();
}
+const std::string& Message::getTopicName() const {
+ if (!impl_) {
+ return emptyString;
+ }
+ return impl_->getTopicName();
+}
+
uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }
uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index 0ef63e8..4fa96c0 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -52,7 +52,7 @@ class MessageImpl {
uint64_t getEventTimestamp() const;
/**
- * Get a valid topicName
+ * Get the topic Name from which this message originated from
*/
const std::string& getTopicName();
diff --git a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
index 199edb2..e3f93c7 100644
--- a/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedConsumerImpl.cc
@@ -314,6 +314,8 @@ bool PartitionedConsumerImpl::isOpen() {
void PartitionedConsumerImpl::messageReceived(Consumer consumer, const Message& msg) {
LOG_DEBUG("Received Message from one of the partition - " << msg.impl_->messageId.partition());
+ const std::string& topicPartitionName = consumer.getTopic();
+ msg.impl_->setTopicName(topicPartitionName);
messages_.push(msg);
if (messageListener_) {
listenerExecutor_->postWork(
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 794c006..ab2cd3f 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -179,6 +179,12 @@ class Message:
"""
return self._message.message_id()
+ def topic_name(self):
+ """
+ Get the topic Name from which this message originated from
+ """
+ return self._message.topic_name()
+
class Authentication:
"""
diff --git a/pulsar-client-cpp/python/pulsar_test.py b/pulsar-client-cpp/python/pulsar_test.py
index cf95e3e..2df171b 100755
--- a/pulsar-client-cpp/python/pulsar_test.py
+++ b/pulsar-client-cpp/python/pulsar_test.py
@@ -832,7 +832,40 @@ class PulsarTest(TestCase):
self.assertEqual(msg.data(), b'hello')
client.close()
- #####
+ def test_get_topic_name(self):
+ client = Client(self.serviceUrl)
+ consumer = client.subscribe('persistent://public/default/topic_name_test',
+ 'topic_name_test_sub',
+ consumer_type=ConsumerType.Shared)
+ producer = client.create_producer('persistent://public/default/topic_name_test')
+ producer.send(b'hello')
+
+ msg = consumer.receive(1000)
+ self.assertEqual(msg.topic_name(), 'persistent://public/default/topic_name_test')
+ client.close()
+
+ def test_get_partitioned_topic_name(self):
+ client = Client(self.serviceUrl)
+ url1 = self.adminUrl + '/admin/v2/persistent/public/default/partitioned_topic_name_test/partitions'
+ doHttpPut(url1, '3')
+
+ partitions = ['persistent://public/default/partitioned_topic_name_test-partition-0',
+ 'persistent://public/default/partitioned_topic_name_test-partition-1',
+ 'persistent://public/default/partitioned_topic_name_test-partition-2']
+ self.assertEqual(client.get_topic_partitions('persistent://public/default/partitioned_topic_name_test'),
+ partitions)
+
+ consumer = client.subscribe('persistent://public/default/partitioned_topic_name_test',
+ 'partitioned_topic_name_test_sub',
+ consumer_type=ConsumerType.Shared)
+ producer = client.create_producer('persistent://public/default/partitioned_topic_name_test')
+ producer.send(b'hello')
+
+ msg = consumer.receive(1000)
+ self.assertTrue(msg.topic_name() in partitions)
+ client.close()
+
+#####
def _check_value_error(self, fun):
try:
diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc
index 4232249..6cff712 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -66,6 +66,12 @@ boost::python::object Message_data(const Message& msg) {
return boost::python::object(boost::python::handle<>(PyBytes_FromStringAndSize((const char*)msg.getData(), msg.getLength())));
}
+std::string Topic_name_str(const Message& msg) {
+ std::stringstream ss;
+ ss << msg.getTopicName();
+ return ss.str();
+}
+
const MessageId& Message_getMessageId(const Message& msg) {
return msg.getMessageId();
}
@@ -117,5 +123,6 @@ void export_message() {
.def("event_timestamp", &Message::getEventTimestamp)
.def("message_id", &Message_getMessageId, return_value_policy<copy_const_reference>())
.def("__str__", &Message_str)
+ .def("topic_name", &Topic_name_str)
;
}