You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/28 08:51:20 UTC
[pulsar] branch master updated: [Issue 8154] [Python client] Expose
schema version (of writerSchema) in Message (#8173)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 54d6811 [Issue 8154] [Python client] Expose schema version (of writerSchema) in Message (#8173)
54d6811 is described below
commit 54d6811bf163fae6cae569c77c2537e33012cfdc
Author: Shivji Kumar Jha <sh...@gmail.com>
AuthorDate: Wed Oct 28 14:20:54 2020 +0530
[Issue 8154] [Python client] Expose schema version (of writerSchema) in Message (#8173)
* [Issue 8154] Expose schema version (of writerSchema) in python client message
* Adding formating suggestion on PR#8173 to fix tests
* Fixing build issues
* Added a test for python client returning schema version
* Added one more test case for python client returning schema version
* Fix test- move subscribe before send so the consumer offset is ahead of new data
* Fix test to make it run on python 2 and 3 both
Co-authored-by: Sijie Guo <si...@apache.org>
---
pulsar-client-cpp/include/pulsar/Message.h | 10 ++++++++++
pulsar-client-cpp/include/pulsar/c/message.h | 6 ++++++
pulsar-client-cpp/lib/ConsumerImpl.cc | 4 ++++
pulsar-client-cpp/lib/Message.cc | 14 ++++++++++++++
pulsar-client-cpp/lib/MessageImpl.cc | 6 ++++++
pulsar-client-cpp/lib/MessageImpl.h | 6 ++++++
pulsar-client-cpp/python/pulsar/__init__.py | 6 ++++++
pulsar-client-cpp/python/schema_test.py | 26 ++++++++++++++++++++++++++
pulsar-client-cpp/python/src/message.cc | 7 +++++++
9 files changed, 85 insertions(+)
diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index d1775a6..0a750e3 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -152,6 +152,16 @@ class PULSAR_PUBLIC Message {
*/
const int getRedeliveryCount() const;
+ /**
+ * Check if schema version exists
+ */
+ bool hasSchemaVersion() const;
+
+ /**
+ * Get the schema version
+ */
+ const std::string& getSchemaVersion() const;
+
bool operator==(const Message& msg) const;
private:
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h b/pulsar-client-cpp/include/pulsar/c/message.h
index dcb9152..f54d025 100644
--- a/pulsar-client-cpp/include/pulsar/c/message.h
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -200,6 +200,12 @@ PULSAR_PUBLIC const char *pulsar_message_get_topic_name(pulsar_message_t *messag
PULSAR_PUBLIC int pulsar_message_get_redelivery_count(pulsar_message_t *message);
+PULSAR_PUBLIC int pulsar_message_has_schema_version(pulsar_message_t *message);
+
+PULSAR_PUBLIC const char *pulsar_message_get_schemaVersion(pulsar_message_t *message);
+
+PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message, const char *schemaVersion);
+
#ifdef __cplusplus
}
#endif
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index b6a0c15..6f9cdbf 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -327,6 +327,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
m.impl_->setTopicName(topic_);
m.impl_->setRedeliveryCount(msg.redelivery_count());
+ if (metadata.has_schema_version()) {
+ m.impl_->setSchemaVersion(metadata.schema_version());
+ }
+
LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
<< metadata.has_num_messages_in_batch());
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 5b135bc..76e408f 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -145,6 +145,20 @@ const int Message::getRedeliveryCount() const {
return impl_->getRedeliveryCount();
}
+bool Message::hasSchemaVersion() const {
+ if (impl_) {
+ return impl_->hasSchemaVersion();
+ }
+ return false;
+}
+
+const std::string& Message::getSchemaVersion() const {
+ if (!impl_) {
+ return emptyString;
+ }
+ return impl_->getSchemaVersion();
+}
+
uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }
uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc b/pulsar-client-cpp/lib/MessageImpl.cc
index b41e7ae..5d1edbf 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -96,4 +96,10 @@ int MessageImpl::getRedeliveryCount() { return redeliveryCount_; }
void MessageImpl::setRedeliveryCount(int count) { redeliveryCount_ = count; }
+bool MessageImpl::hasSchemaVersion() const { return metadata.has_schema_version(); }
+
+void MessageImpl::setSchemaVersion(const std::string& schemaVersion) { schemaVersion_ = &schemaVersion; }
+
+const std::string& MessageImpl::getSchemaVersion() const { return metadata.schema_version(); }
+
} // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index ff2ac97..c9a37f4 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -43,6 +43,8 @@ class MessageImpl {
ClientConnection* cnx_;
const std::string* topicName_;
int redeliveryCount_;
+ bool hasSchemaVersion_;
+ const std::string* schemaVersion_;
const std::string& getPartitionKey() const;
bool hasPartitionKey() const;
@@ -66,6 +68,10 @@ class MessageImpl {
int getRedeliveryCount();
void setRedeliveryCount(int count);
+ bool hasSchemaVersion() const;
+ const std::string& getSchemaVersion() const;
+ void setSchemaVersion(const std::string& value);
+
friend class PulsarWrapper;
friend class MessageBuilder;
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 20e7afe..c3c610a 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -220,6 +220,12 @@ class Message:
"""
return self._message.redelivery_count()
+ def schema_version(self):
+ """
+ Get the schema version for this message
+ """
+ return self._message.schema_version()
+
@staticmethod
def _wrap(_message):
self = Message()
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 09cfd71..2d03020 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -354,6 +354,32 @@ class SchemaTest(TestCase):
self.assertEqual(r2.__class__.__name__, 'Example')
self.assertEqual(r2, r)
+ def test_schema_version(self):
+ class Example(Record):
+ a = Integer()
+ b = Integer()
+
+ client = pulsar.Client(self.serviceUrl)
+ producer = client.create_producer(
+ 'my-avro-python-schema-version-topic',
+ schema=AvroSchema(Example))
+
+ consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1',
+ schema=AvroSchema(Example))
+
+ r = Example(a=1, b=2)
+ producer.send(r)
+
+ msg = consumer.receive()
+
+ self.assertIsNotNone(msg.schema_version())
+
+ self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode())
+
+ self.assertEqual(r, msg.value())
+
+ client.close()
+
def test_serialize_wrong_types(self):
class Example(Record):
a = Integer()
diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc
index 719e09e..460a0c7 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -82,6 +82,12 @@ std::string Topic_name_str(const Message& msg) {
return ss.str();
}
+std::string schema_version_str(const Message& msg) {
+ std::stringstream ss;
+ ss << msg.getSchemaVersion();
+ return ss.str();
+}
+
const MessageId& Message_getMessageId(const Message& msg) {
return msg.getMessageId();
}
@@ -168,6 +174,7 @@ void export_message() {
.def("__str__", &Message_str)
.def("topic_name", &Topic_name_str)
.def("redelivery_count", &Message::getRedeliveryCount)
+ .def("schema_version", &schema_version_str)
;
MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom;