You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/12 02:51:39 UTC
[pulsar] branch master updated: Enhance publish method to allow
properties (#2777)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new ec51612 Enhance publish method to allow properties (#2777)
ec51612 is described below
commit ec516124cf38f12fc98474d8ca9ec0649fb1185a
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu Oct 11 19:51:34 2018 -0700
Enhance publish method to allow properties (#2777)
---
pulsar-client-cpp/python/pulsar/functions/context.py | 5 +++--
pulsar-functions/instance/src/main/python/contextimpl.py | 4 ++--
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index ed1164a..2510bad 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -104,8 +104,9 @@ class Context(object):
pass
@abstractmethod
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe"):
- """Publishes message to topic_name by first serializing the message using serde_class_name serde"""
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
+ """Publishes message to topic_name by first serializing the message using serde_class_name serde
+ The message will have properties specified if any"""
pass
@abstractmethod
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index f0f9d0f..dcfec42 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -118,7 +118,7 @@ class ContextImpl(pulsar.Context):
def get_output_serde_class_name(self):
return self.instance_config.function_details.outputSerdeClassName
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe"):
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
# Just make sure that user supplied values are properly typed
topic_name = str(topic_name)
serde_class_name = str(serde_class_name)
@@ -136,7 +136,7 @@ class ContextImpl(pulsar.Context):
self.publish_serializers[serde_class_name] = serde_klass()
output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message))
- self.publish_producers[topic_name].send_async(output_bytes, None)
+ self.publish_producers[topic_name].send_async(output_bytes, None, properties=properties)
def ack(self, msgid, topic):
if topic not in self.consumers: