You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/28 18:50:30 UTC

[pulsar] branch master updated: Added support for Snappy compression for Python/Go (#4319)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dc0463c  Added support for Snappy compression for Python/Go (#4319)
dc0463c is described below

commit dc0463cdf63eec0c9e2773926c6f362b27b02b42
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Wed May 29 02:50:25 2019 +0800

    Added support for Snappy compression for Python/Go (#4319)
    
    * Added support for Snappy compression for Python/Go.
    
    * Fix python test
    
    * Fix python test
    
    * just rm python test
---
 pulsar-client-cpp/python/pulsar/__init__.py |  4 +++-
 pulsar-client-cpp/python/src/enums.cc       |  1 +
 pulsar-client-go/pulsar/producer.go         |  5 +++++
 pulsar-client-go/pulsar/producer_test.go    | 29 +++++++++++++++++++++++++++++
 site2/docs/client-libraries-go.md           |  2 +-
 5 files changed, 39 insertions(+), 2 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/__init__.py b/pulsar-client-cpp/python/pulsar/__init__.py
index 9de1719..9c2125a 100644
--- a/pulsar-client-cpp/python/pulsar/__init__.py
+++ b/pulsar-client-cpp/python/pulsar/__init__.py
@@ -416,9 +416,11 @@ class Client:
         * `compression_type`:
           Set the compression type for the producer. By default, message
           payloads are not compressed. Supported compression types are
-          `CompressionType.LZ4`, `CompressionType.ZLib` and `CompressionType.ZSTD`.
+          `CompressionType.LZ4`, `CompressionType.ZLib`, `CompressionType.ZSTD` and `CompressionType.SNAPPY`.
           ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
           release in order to be able to receive messages compressed with ZSTD.
+          SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
+          release in order to be able to receive messages compressed with SNAPPY.
         * `max_pending_messages`:
           Set the max size of the queue holding the messages pending to receive
           an acknowledgment from the broker.
diff --git a/pulsar-client-cpp/python/src/enums.cc b/pulsar-client-cpp/python/src/enums.cc
index 8e003a1..bba7b8f 100644
--- a/pulsar-client-cpp/python/src/enums.cc
+++ b/pulsar-client-cpp/python/src/enums.cc
@@ -33,6 +33,7 @@ void export_enums() {
             .value("LZ4", CompressionLZ4)
             .value("ZLib", CompressionZLib)
             .value("ZSTD", CompressionZSTD)
+            .value("SNAPPY", CompressionSNAPPY)
             ;
 
     enum_<ConsumerType>("ConsumerType")
diff --git a/pulsar-client-go/pulsar/producer.go b/pulsar-client-go/pulsar/producer.go
index 5e199f7..eb64e64 100644
--- a/pulsar-client-go/pulsar/producer.go
+++ b/pulsar-client-go/pulsar/producer.go
@@ -52,6 +52,7 @@ const (
 	LZ4
 	ZLib
 	ZSTD
+	SNAPPY
 )
 
 type TopicMetadata interface {
@@ -120,9 +121,13 @@ type ProducerOptions struct {
 	//  - LZ4
 	//  - ZLIB
 	//  - ZSTD
+	//  - SNAPPY
 	//
 	// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
 	// release in order to be able to receive messages compressed with ZSTD.
+	//
+	// Note: SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
+	// release in order to be able to receive messages compressed with SNAPPY.
 	CompressionType
 
 	// Set a custom message routing policy by passing an implementation of MessageRouter
diff --git a/pulsar-client-go/pulsar/producer_test.go b/pulsar-client-go/pulsar/producer_test.go
index df033a4..8c8e4d5 100644
--- a/pulsar-client-go/pulsar/producer_test.go
+++ b/pulsar-client-go/pulsar/producer_test.go
@@ -206,6 +206,35 @@ func TestProducerZstd(t *testing.T) {
 	}
 }
 
+func TestProducerSnappy(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: "pulsar://localhost:6650",
+	})
+
+	assert.Nil(t, err)
+	defer client.Close()
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           "my-topic",
+		CompressionType: SNAPPY,
+	})
+
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	assert.Equal(t, producer.Topic(), "persistent://public/default/my-topic")
+
+	ctx := context.Background()
+
+	for i := 0; i < 10; i++ {
+		if err := producer.Send(ctx, ProducerMessage{
+			Payload: []byte(fmt.Sprintf("hello-%d", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+}
+
 func TestProducer_Flush(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: "pulsar://localhost:6650",
diff --git a/site2/docs/client-libraries-go.md b/site2/docs/client-libraries-go.md
index 5e22ac6..5b9aa6d 100644
--- a/site2/docs/client-libraries-go.md
+++ b/site2/docs/client-libraries-go.md
@@ -211,7 +211,7 @@ Parameter | Description | Default
 `BlockIfQueueFull` | If set to `true`, the producer's `Send` and `SendAsync` methods will block when the outgoing message queue is full rather than failing and throwing an error (the size of that queue is dictated by the `MaxPendingMessages` parameter); if set to `false` (the default), `Send` and `SendAsync` operations will fail and throw a `ProducerQueueIsFullError` when the queue is full. | `false`
 `MessageRoutingMode` | The message routing logic (for producers on [partitioned topics](concepts-architecture-overview.md#partitioned-topics)). This logic is applied only when no key is set on messages. The available options are: round robin (`pulsar.RoundRobinDistribution`, the default), publishing all messages to a single partition (`pulsar.UseSinglePartition`), or a custom partitioning scheme (`pulsar.CustomPartition`). | `pulsar.RoundRobinDistribution`
 `HashingScheme` | The hashing function that determines the partition on which a particular message is published (partitioned topics only). The available options are: `pulsar.JavaStringHash` (the equivalent of `String.hashCode()` in Java), `pulsar.Murmur3_32Hash` (applies the [Murmur3](https://en.wikipedia.org/wiki/MurmurHash) hashing function), or `pulsar.BoostHash` (applies the hashing function from C++'s [Boost](https://www.boost.org/doc/libs/1_62_0/doc/html/hash.html) library) | `puls [...]
-`CompressionType` | The message data compression type used by the producer. The available options are [`LZ4`](https://github.com/lz4/lz4), [`ZLIB`](https://zlib.net/) and [`ZSTD`](https://facebook.github.io/zstd/). | No compression
+`CompressionType` | The message data compression type used by the producer. The available options are [`LZ4`](https://github.com/lz4/lz4), [`ZLIB`](https://zlib.net/), [`ZSTD`](https://facebook.github.io/zstd/) and [`SNAPPY`](https://google.github.io/snappy/). | No compression
 `MessageRouter` | By default, Pulsar uses a round-robin routing scheme for [partitioned topics](cookbooks-partitioned.md). The `MessageRouter` parameter enables you to specify custom routing logic via a function that takes the Pulsar message and topic metadata as an argument and returns an integer (where the ), i.e. a function signature of `func(Message, TopicMetadata) int`. |
 
 ## Consumers