You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/05/17 03:10:39 UTC
[pulsar] branch master updated: Fix the current broken publish
method (#4289)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 26e357f Fix the current broken publish method (#4289)
26e357f is described below
commit 26e357ffb716acafab32ce198c6f68b2552182da
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Thu May 16 20:10:33 2019 -0700
Fix the current broken publish method (#4289)
* Fix the current broken publish method
* Added for properties
---
pulsar-client-cpp/python/pulsar/functions/context.py | 13 +------------
pulsar-functions/instance/src/main/python/contextimpl.py | 11 +++++++----
2 files changed, 8 insertions(+), 16 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 14b277a..b1ee3a6 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -130,18 +130,7 @@ class Context(object):
pass
@abstractmethod
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
- """
-
- DEPRECATED
-
- Publishes message to topic_name by first serializing the message using serde_class_name serde
- The message will have properties specified if any
- """
- pass
-
- @abstractmethod
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde
The message will have properties specified if any
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index fbba3d5..146af2c 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -149,10 +149,7 @@ class ContextImpl(pulsar.Context):
if callback:
callback(result, msg)
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None):
- self.publish(topic_name, message, serde_class_name=serde_class_name, compression_type=compression_type, callback=callback, message_conf={"properties": properties})
-
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", compression_type=None, callback=None, message_conf=None):
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None, callback=None, message_conf=None):
# Just make sure that user supplied values are properly typed
topic_name = str(topic_name)
serde_class_name = str(serde_class_name)
@@ -179,6 +176,12 @@ class ContextImpl(pulsar.Context):
output_bytes = bytes(self.publish_serializers[serde_class_name].serialize(message))
+ if properties:
+ # The deprecated properties args was passed. Need to merge into message_conf
+ if not message_conf:
+ message_conf = {}
+ message_conf['properties'] = properties
+
if message_conf:
self.publish_producers[topic_name].send_async(
output_bytes, partial(self.callback_wrapper, callback, topic_name, self.get_message_id()), **message_conf)