You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/12 02:51:39 UTC

[pulsar] branch master updated: Enhance publish method to allow properties (#2777)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new ec51612  Enhance publish method to allow properties (#2777)
ec51612 is described below

commit ec516124cf38f12fc98474d8ca9ec0649fb1185a
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Oct 11 19:51:34 2018 -0700

    Enhance publish method to allow properties (#2777)
---
 pulsar-client-cpp/python/pulsar/functions/context.py     | 5 +++--
 pulsar-functions/instance/src/main/python/contextimpl.py | 4 ++--
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index ed1164a..2510bad 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -104,8 +104,9 @@ class Context(object):
     pass
 
   @abstractmethod
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe"):
-    """Publishes message to topic_name by first serializing the message using serde_class_name serde"""
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
+    """Publishes message to topic_name by first serializing the message using serde_class_name serde
+    The message will have properties specified if any"""
     pass
 
   @abstractmethod
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index f0f9d0f..dcfec42 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -118,7 +118,7 @@ class ContextImpl(pulsar.Context):
   def get_output_serde_class_name(self):
     return self.instance_config.function_details.outputSerdeClassName
 
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe"):
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
     # Just make sure that user supplied values are properly typed
     topic_name = str(topic_name)
     serde_class_name = str(serde_class_name)
@@ -136,7 +136,7 @@ class ContextImpl(pulsar.Context):
       self.publish_serializers[serde_class_name] = serde_klass()
 
     output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message))
-    self.publish_producers[topic_name].send_async(output_bytes, None)
+    self.publish_producers[topic_name].send_async(output_bytes, None, properties=properties)
 
   def ack(self, msgid, topic):
     if topic not in self.consumers: