You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/12 13:03:34 UTC
[pulsar] branch master updated: Added ability to specify
compression while publishing (#2781)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 3a66808 Added ability to specify compression while publishing (#2781)
3a66808 is described below
commit 3a668084f74b9d4467ae22f38502ee1b9e3c76e9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 12 06:03:29 2018 -0700
Added ability to specify compression while publishing (#2781)
---
pulsar-client-cpp/python/pulsar/functions/context.py | 2 +-
pulsar-functions/instance/src/main/python/contextimpl.py | 10 +++++++---
2 files changed, 8 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 2510bad..47e86f9 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -104,7 +104,7 @@ class Context(object):
pass
@abstractmethod
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
"""Publishes message to topic_name by first serializing the message using serde_class_name serde
The message will have properties specified if any"""
pass
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index b855ec0..a02724f 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -118,17 +118,21 @@ class ContextImpl(pulsar.Context):
def get_output_serde_class_name(self):
return self.instance_config.function_details.outputSerdeClassName
- def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
+ def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
# Just make sure that user supplied values are properly typed
topic_name = str(topic_name)
serde_class_name = str(serde_class_name)
+ pulsar_compression_type = pulsar._pulsar.CompressionType.NONE
+ if compression_type is not None:
+ pulsar_compression_type = compression_type
if topic_name not in self.publish_producers:
self.publish_producers[topic_name] = self.pulsar_client.create_producer(
topic_name,
block_if_queue_full=True,
batching_enabled=True,
batching_max_publish_delay_ms=1,
- max_pending_messages=100000
+ max_pending_messages=100000,
+ compression_type=compression_type
)
if serde_class_name not in self.publish_serializers:
@@ -164,4 +168,4 @@ class ContextImpl(pulsar.Context):
m.max = accumulated_metric.max
m.min = accumulated_metric.min
metrics.metrics[metric_name] = m
- return metrics
\ No newline at end of file
+ return metrics