You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/10/28 08:51:20 UTC

[pulsar] branch master updated: [Issue 8154] [Python client] Expose schema version (of writerSchema) in Message (#8173)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 54d6811  [Issue 8154] [Python client] Expose schema version (of writerSchema) in Message (#8173)
54d6811 is described below

commit 54d6811bf163fae6cae569c77c2537e33012cfdc
Author: Shivji Kumar Jha <sh...@gmail.com>
AuthorDate: Wed Oct 28 14:20:54 2020 +0530

    [Issue 8154] [Python client] Expose schema version (of writerSchema) in Message (#8173)
    
    * [Issue 8154] Expose schema version (of writerSchema) in python client message
    
    * Adding formating suggestion on PR#8173 to fix tests
    
    * Fixing build issues
    
    * Added a test for python client returning schema version
    
    * Added one more test case for python client returning schema version
    
    * Fix test- move subscribe before send so the consumer offset is ahead of new data
    
    * Fix test to make it run on python 2 and 3 both
    
    Co-authored-by: Sijie Guo <si...@apache.org>
---
 pulsar-client-cpp/include/pulsar/Message.h   | 10 ++++++++++
 pulsar-client-cpp/include/pulsar/c/message.h |  6 ++++++
 pulsar-client-cpp/lib/ConsumerImpl.cc        |  4 ++++
 pulsar-client-cpp/lib/Message.cc             | 14 ++++++++++++++
 pulsar-client-cpp/lib/MessageImpl.cc         |  6 ++++++
 pulsar-client-cpp/lib/MessageImpl.h          |  6 ++++++
 pulsar-client-cpp/python/pulsar/__init__.py  |  6 ++++++
 pulsar-client-cpp/python/schema_test.py      | 26 ++++++++++++++++++++++++++
 pulsar-client-cpp/python/src/message.cc      |  7 +++++++
 9 files changed, 85 insertions(+)

diff --git a/pulsar-client-cpp/include/pulsar/Message.h b/pulsar-client-cpp/include/pulsar/Message.h
index d1775a6..0a750e3 100644
--- a/pulsar-client-cpp/include/pulsar/Message.h
+++ b/pulsar-client-cpp/include/pulsar/Message.h
@@ -152,6 +152,16 @@ class PULSAR_PUBLIC Message {
      */
     const int getRedeliveryCount() const;
 
+    /**
+     * Check if schema version exists
+     */
+    bool hasSchemaVersion() const;
+
+    /**
+     * Get the schema version
+     */
+    const std::string& getSchemaVersion() const;
+
     bool operator==(const Message& msg) const;
 
    private:
diff --git a/pulsar-client-cpp/include/pulsar/c/message.h b/pulsar-client-cpp/include/pulsar/c/message.h
index dcb9152..f54d025 100644
--- a/pulsar-client-cpp/include/pulsar/c/message.h
+++ b/pulsar-client-cpp/include/pulsar/c/message.h
@@ -200,6 +200,12 @@ PULSAR_PUBLIC const char *pulsar_message_get_topic_name(pulsar_message_t *messag
 
 PULSAR_PUBLIC int pulsar_message_get_redelivery_count(pulsar_message_t *message);
 
+PULSAR_PUBLIC int pulsar_message_has_schema_version(pulsar_message_t *message);
+
+PULSAR_PUBLIC const char *pulsar_message_get_schemaVersion(pulsar_message_t *message);
+
+PULSAR_PUBLIC void pulsar_message_set_schema_version(pulsar_message_t *message, const char *schemaVersion);
+
 #ifdef __cplusplus
 }
 #endif
diff --git a/pulsar-client-cpp/lib/ConsumerImpl.cc b/pulsar-client-cpp/lib/ConsumerImpl.cc
index b6a0c15..6f9cdbf 100644
--- a/pulsar-client-cpp/lib/ConsumerImpl.cc
+++ b/pulsar-client-cpp/lib/ConsumerImpl.cc
@@ -327,6 +327,10 @@ void ConsumerImpl::messageReceived(const ClientConnectionPtr& cnx, const proto::
     m.impl_->setTopicName(topic_);
     m.impl_->setRedeliveryCount(msg.redelivery_count());
 
+    if (metadata.has_schema_version()) {
+        m.impl_->setSchemaVersion(metadata.schema_version());
+    }
+
     LOG_DEBUG(getName() << " metadata.num_messages_in_batch() = " << metadata.num_messages_in_batch());
     LOG_DEBUG(getName() << " metadata.has_num_messages_in_batch() = "
                         << metadata.has_num_messages_in_batch());
diff --git a/pulsar-client-cpp/lib/Message.cc b/pulsar-client-cpp/lib/Message.cc
index 5b135bc..76e408f 100644
--- a/pulsar-client-cpp/lib/Message.cc
+++ b/pulsar-client-cpp/lib/Message.cc
@@ -145,6 +145,20 @@ const int Message::getRedeliveryCount() const {
     return impl_->getRedeliveryCount();
 }
 
+bool Message::hasSchemaVersion() const {
+    if (impl_) {
+        return impl_->hasSchemaVersion();
+    }
+    return false;
+}
+
+const std::string& Message::getSchemaVersion() const {
+    if (!impl_) {
+        return emptyString;
+    }
+    return impl_->getSchemaVersion();
+}
+
 uint64_t Message::getPublishTimestamp() const { return impl_ ? impl_->getPublishTimestamp() : 0ull; }
 
 uint64_t Message::getEventTimestamp() const { return impl_ ? impl_->getEventTimestamp() : 0ull; }
diff --git a/pulsar-client-cpp/lib/MessageImpl.cc b/pulsar-client-cpp/lib/MessageImpl.cc
index b41e7ae..5d1edbf 100644
--- a/pulsar-client-cpp/lib/MessageImpl.cc
+++ b/pulsar-client-cpp/lib/MessageImpl.cc
@@ -96,4 +96,10 @@ int MessageImpl::getRedeliveryCount() { return redeliveryCount_; }
 
 void MessageImpl::setRedeliveryCount(int count) { redeliveryCount_ = count; }
 
+bool MessageImpl::hasSchemaVersion() const { return metadata.has_schema_version(); }
+
+void MessageImpl::setSchemaVersion(const std::string& schemaVersion) { schemaVersion_ = &schemaVersion; }
+
+const std::string& MessageImpl::getSchemaVersion() const { return metadata.schema_version(); }
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/MessageImpl.h b/pulsar-client-cpp/lib/MessageImpl.h
index ff2ac97..c9a37f4 100644
--- a/pulsar-client-cpp/lib/MessageImpl.h
+++ b/pulsar-client-cpp/lib/MessageImpl.h
@@ -43,6 +43,8 @@ class MessageImpl {
     ClientConnection* cnx_;
     const std::string* topicName_;
     int redeliveryCount_;
+    bool hasSchemaVersion_;
+    const std::string* schemaVersion_;
 
     const std::string& getPartitionKey() const;
     bool hasPartitionKey() const;
@@ -66,6 +68,10 @@ class MessageImpl {
     int getRedeliveryCount();
     void setRedeliveryCount(int count);
 
+    bool hasSchemaVersion() const;
+    const std::string& getSchemaVersion() const;
+    void setSchemaVersion(const std::string& value);
+
     friend class PulsarWrapper;
     friend class MessageBuilder;
 
diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 20e7afe..c3c610a 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -220,6 +220,12 @@ class Message:
         """
         return self._message.redelivery_count()
 
+    def schema_version(self):
+        """
+        Get the schema version for this message
+        """
+        return self._message.schema_version()
+
     @staticmethod
     def _wrap(_message):
         self = Message()
diff --git a/pulsar-client-cpp/python/schema_test.py b/pulsar-client-cpp/python/schema_test.py
index 09cfd71..2d03020 100755
--- a/pulsar-client-cpp/python/schema_test.py
+++ b/pulsar-client-cpp/python/schema_test.py
@@ -354,6 +354,32 @@ class SchemaTest(TestCase):
         self.assertEqual(r2.__class__.__name__, 'Example')
         self.assertEqual(r2, r)
 
+    def test_schema_version(self):
+        class Example(Record):
+            a = Integer()
+            b = Integer()
+
+        client = pulsar.Client(self.serviceUrl)
+        producer = client.create_producer(
+                        'my-avro-python-schema-version-topic',
+                        schema=AvroSchema(Example))
+
+        consumer = client.subscribe('my-avro-python-schema-version-topic', 'sub-1',
+                                    schema=AvroSchema(Example))
+        
+        r = Example(a=1, b=2)
+        producer.send(r)
+
+        msg = consumer.receive()
+
+        self.assertIsNotNone(msg.schema_version())
+
+        self.assertEquals(b'\x00\x00\x00\x00\x00\x00\x00\x00', msg.schema_version().encode())
+        
+        self.assertEqual(r, msg.value())
+
+        client.close()
+
     def test_serialize_wrong_types(self):
         class Example(Record):
             a = Integer()
diff --git a/pulsar-client-cpp/python/src/message.cc b/pulsar-client-cpp/python/src/message.cc
index 719e09e..460a0c7 100644
--- a/pulsar-client-cpp/python/src/message.cc
+++ b/pulsar-client-cpp/python/src/message.cc
@@ -82,6 +82,12 @@ std::string Topic_name_str(const Message& msg) {
     return ss.str();
 }
 
+std::string schema_version_str(const Message& msg) {
+    std::stringstream ss;
+    ss << msg.getSchemaVersion();
+    return ss.str();
+}
+
 const MessageId& Message_getMessageId(const Message& msg) {
     return msg.getMessageId();
 }
@@ -168,6 +174,7 @@ void export_message() {
             .def("__str__", &Message_str)
             .def("topic_name", &Topic_name_str)
             .def("redelivery_count", &Message::getRedeliveryCount)
+            .def("schema_version", &schema_version_str)
             ;
 
     MessageBatch& (MessageBatch::*MessageBatchParseFromString)(const std::string& payload, uint32_t batchSize) = &MessageBatch::parseFrom;