You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/12 13:03:34 UTC

[pulsar] branch master updated: Added ability to specify compression while publishing (#2781)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a66808  Added ability to specify compression while publishing (#2781)
3a66808 is described below

commit 3a668084f74b9d4467ae22f38502ee1b9e3c76e9
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 12 06:03:29 2018 -0700

    Added ability to specify compression while publishing (#2781)
---
 pulsar-client-cpp/python/pulsar/functions/context.py     |  2 +-
 pulsar-functions/instance/src/main/python/contextimpl.py | 10 +++++++---
 2 files changed, 8 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 2510bad..47e86f9 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -104,7 +104,7 @@ class Context(object):
     pass
 
   @abstractmethod
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
     """Publishes message to topic_name by first serializing the message using serde_class_name serde
     The message will have properties specified if any"""
     pass
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index b855ec0..a02724f 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -118,17 +118,21 @@ class ContextImpl(pulsar.Context):
   def get_output_serde_class_name(self):
     return self.instance_config.function_details.outputSerdeClassName
 
-  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None):
+  def publish(self, topic_name, message, serde_class_name="serde.IdentitySerDe", properties=None, compression_type=None):
     # Just make sure that user supplied values are properly typed
     topic_name = str(topic_name)
     serde_class_name = str(serde_class_name)
+    pulsar_compression_type = pulsar._pulsar.CompressionType.NONE
+    if compression_type is not None:
+      pulsar_compression_type = compression_type
     if topic_name not in self.publish_producers:
       self.publish_producers[topic_name] = self.pulsar_client.create_producer(
         topic_name,
         block_if_queue_full=True,
         batching_enabled=True,
         batching_max_publish_delay_ms=1,
-        max_pending_messages=100000
+        max_pending_messages=100000,
+        compression_type=compression_type
       )
 
     if serde_class_name not in self.publish_serializers:
@@ -164,4 +168,4 @@ class ContextImpl(pulsar.Context):
       m.max = accumulated_metric.max
       m.min = accumulated_metric.min
       metrics.metrics[metric_name] = m
-    return metrics
\ No newline at end of file
+    return metrics