You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/05/17 03:10:39 UTC

[pulsar] branch master updated: Fix the current broken publish method (#4289)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 26e357f  Fix the current broken publish method (#4289)
26e357f is described below

commit 26e357ffb716acafab32ce198c6f68b2552182da
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu May 16 20:10:33 2019 -0700

    Fix the current broken publish method (#4289)
    
    * Fix the current broken publish method
    
    * Added for properties
---
 pulsar-client-cpp/python/pulsar/functions/context.py     | 13 +------------
 pulsar-functions/instance/src/main/python/contextimpl.py | 11 +++++++----
 2 files changed, 8 insertions(+), 16 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 14b277a..b1ee3a6 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -130,18 +130,7 @@ class Context(object):
     pass
 
   @abstractmethod
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
-    """
-
-    DEPRECATED
-
-    Publishes message to topic_name by first serializing the message using serde_class_name serde
-    The message will have properties specified if any
-    """
-    pass
-
-  @abstractmethod
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
     """Publishes message to topic_name by first serializing the message using serde_class_name serde
     The message will have properties specified if any
 
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index fbba3d5..146af2c 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -149,10 +149,7 @@ class ContextImpl(pulsar.Context):
     if callback:
       callback(result, msg)
 
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
-    self.publish(topic_name, message, serde_class_name=serde_class_name, compression_type=compression_type, callback=callback, message_conf={"properties": properties})
-
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
     # Just make sure that user supplied values are properly typed
     topic_name = str(topic_name)
     serde_class_name = str(serde_class_name)
@@ -179,6 +176,12 @@ class ContextImpl(pulsar.Context):
 
     output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message))
 
+    if properties:
+      # The deprecated properties args was passed. Need to merge into message_conf
+      if not message_conf:
+        message_conf = {}
+      message_conf['properties'] = properties
+
     if message_conf:
       self.publish_producers[topic_name].send_async(
         output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), **message_conf)