You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/19 04:21:24 UTC

[pulsar-client-go] branch master updated: Allow applications to configure the compression level (#290)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bc4949  Allow applications to configure the compression level (#290)
6bc4949 is described below

commit 6bc4949d120bb276e63e954ac16e24c08158f84c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 18 21:21:15 2020 -0700

    Allow applications to configure the compression level (#290)
---
 pulsar/consumer_partition.go                       |  2 +-
 pulsar/internal/batch_builder.go                   |  9 ++++---
 pulsar/internal/compression/compression.go         |  8 ++++++
 .../internal/compression/compression_bench_test.go | 12 ++++-----
 pulsar/internal/compression/compression_test.go    |  2 +-
 pulsar/internal/compression/zstd.go                | 10 +++----
 pulsar/internal/compression/zstd_cgo.go            | 31 +++++++++++++++-------
 pulsar/internal/compression/zstd_go.go             | 17 ++++++++----
 pulsar/producer.go                                 | 19 +++++++++++++
 pulsar/producer_partition.go                       |  5 +++-
 10 files changed, 81 insertions(+), 34 deletions(-)

diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a74d88c..b7bf261 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -877,7 +877,7 @@ func (pc *partitionConsumer) initializeCompressionProvider(
 	case pb.CompressionType_LZ4:
 		return compression.NewLz4Provider(), nil
 	case pb.CompressionType_ZSTD:
-		return compression.NewZStdProvider(), nil
+		return compression.NewZStdProvider(compression.Default), nil
 	}
 
 	return nil, fmt.Errorf("unsupported compression type: %v", compressionType)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 78be00e..61c5ced 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -62,7 +62,7 @@ type BatchBuilder struct {
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
 func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
-	compressionType pb.CompressionType) (*BatchBuilder, error) {
+	compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) {
 	if maxMessages == 0 {
 		maxMessages = DefaultMaxMessagesPerBatch
 	}
@@ -84,7 +84,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
 			ProducerName: &producerName,
 		},
 		callbacks:           []interface{}{},
-		compressionProvider: getCompressionProvider(compressionType),
+		compressionProvider: getCompressionProvider(compressionType, level),
 	}
 
 	if compressionType != pb.CompressionType_NONE {
@@ -176,7 +176,8 @@ func (bb *BatchBuilder) Close() error {
 	return bb.compressionProvider.Close()
 }
 
-func getCompressionProvider(compressionType pb.CompressionType) compression.Provider {
+func getCompressionProvider(compressionType pb.CompressionType,
+	level compression.Level) compression.Provider {
 	switch compressionType {
 	case pb.CompressionType_NONE:
 		return compression.NewNoopProvider()
@@ -185,7 +186,7 @@ func getCompressionProvider(compressionType pb.CompressionType) compression.Prov
 	case pb.CompressionType_ZLIB:
 		return compression.NewZLibProvider()
 	case pb.CompressionType_ZSTD:
-		return compression.NewZStdProvider()
+		return compression.NewZStdProvider(level)
 	default:
 		log.Panic("unsupported compression type")
 		return nil
diff --git a/pulsar/internal/compression/compression.go b/pulsar/internal/compression/compression.go
index e45d18f..90adeeb 100644
--- a/pulsar/internal/compression/compression.go
+++ b/pulsar/internal/compression/compression.go
@@ -19,6 +19,14 @@ package compression
 
 import "io"
 
+type Level int
+
+const (
+	Default Level = iota
+	Faster
+	Better
+)
+
 // Provider is a interface of compression providers
 type Provider interface {
 	// Compress a []byte, the param is a []byte with the uncompressed content.
diff --git a/pulsar/internal/compression/compression_bench_test.go b/pulsar/internal/compression/compression_bench_test.go
index fea2eb8..1f40321 100644
--- a/pulsar/internal/compression/compression_bench_test.go
+++ b/pulsar/internal/compression/compression_bench_test.go
@@ -64,12 +64,12 @@ func testDecompression(b *testing.B, provider Provider) {
 var benchmarkProviders = []testProvider{
 	{"zlib", NewZLibProvider(), nil},
 	{"lz4", NewLz4Provider(), nil},
-	{"zstd-pure-go-fastest", newPureGoZStdProvider(1), nil},
-	{"zstd-pure-go-default", newPureGoZStdProvider(2), nil},
-	{"zstd-pure-go-best", newPureGoZStdProvider(3), nil},
-	{"zstd-cgo-level-fastest", newCGoZStdProvider(1), nil},
-	{"zstd-cgo-level-default", newCGoZStdProvider(3), nil},
-	{"zstd-cgo-level-best", newCGoZStdProvider(9), nil},
+	{"zstd-pure-go-fastest", newPureGoZStdProvider(Faster), nil},
+	{"zstd-pure-go-default", newPureGoZStdProvider(Default), nil},
+	{"zstd-pure-go-best", newPureGoZStdProvider(Better), nil},
+	{"zstd-cgo-level-fastest", newCGoZStdProvider(Faster), nil},
+	{"zstd-cgo-level-default", newCGoZStdProvider(Default), nil},
+	{"zstd-cgo-level-best", newCGoZStdProvider(Better), nil},
 }
 
 func BenchmarkCompression(b *testing.B) {
diff --git a/pulsar/internal/compression/compression_test.go b/pulsar/internal/compression/compression_test.go
index 7df821f..cfb00b2 100644
--- a/pulsar/internal/compression/compression_test.go
+++ b/pulsar/internal/compression/compression_test.go
@@ -34,7 +34,7 @@ type testProvider struct {
 var providers = []testProvider{
 	{"zlib", NewZLibProvider(), []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
 	{"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
-	{"zstd", NewZStdProvider(),
+	{"zstd", NewZStdProvider(Default),
 		[]byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
 }
 
diff --git a/pulsar/internal/compression/zstd.go b/pulsar/internal/compression/zstd.go
index a912403..694f9e7 100644
--- a/pulsar/internal/compression/zstd.go
+++ b/pulsar/internal/compression/zstd.go
@@ -21,17 +21,15 @@ package compression
 
 import (
 	"fmt"
-
-	"github.com/klauspost/compress/zstd"
 )
 
-func NewZStdProvider() Provider {
-	return newPureGoZStdProvider(zstd.SpeedDefault)
+func NewZStdProvider(level Level) Provider {
+	return newPureGoZStdProvider(level)
 }
 
-func newCGoZStdProvider(compressionLevel int) Provider {
+func newCGoZStdProvider(level Level) Provider {
 	// This is kept to avoid compile errors in benchmark code when cgo is disabled.
 	// The warning is only shown when running the benchmark with CGO disabled.
 	fmt.Println("WARNING: CGO is disabled, using pure Go implementation of ZStd. Use CGO_ENABLED=1 when running benchmark.")
-	return newPureGoZStdProvider(zstd.SpeedDefault)
+	return newPureGoZStdProvider(level)
 }
diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go
index f6f6c45..9c43cc4 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -29,23 +29,34 @@ import (
 )
 
 type zstdCGoProvider struct {
-	ctx              zstd.Ctx
-	compressionLevel int
+	ctx       zstd.Ctx
+	level     Level
+	zstdLevel int
 }
 
-func newCGoZStdProvider(compressionLevel int) Provider {
-	return &zstdCGoProvider{
-		compressionLevel: compressionLevel,
-		ctx:              zstd.NewCtx(),
+func newCGoZStdProvider(level Level) Provider {
+	z := &zstdCGoProvider{
+		ctx: zstd.NewCtx(),
 	}
+
+	switch level {
+	case Default:
+		z.zstdLevel = zstd.DefaultCompression
+	case Faster:
+		z.zstdLevel = zstd.BestSpeed
+	case Better:
+		z.zstdLevel = 9
+	}
+
+	return z
 }
 
-func NewZStdProvider() Provider {
-	return newCGoZStdProvider(zstd.DefaultCompression)
+func NewZStdProvider(level Level) Provider {
+	return newCGoZStdProvider(level)
 }
 
 func (z *zstdCGoProvider) Compress(data []byte) []byte {
-	out, err := z.ctx.CompressLevel(nil, data, z.compressionLevel)
+	out, err := z.ctx.CompressLevel(nil, data, z.zstdLevel)
 	if err != nil {
 		log.WithError(err).Fatal("Failed to compress")
 	}
@@ -62,5 +73,5 @@ func (z *zstdCGoProvider) Close() error {
 }
 
 func (z *zstdCGoProvider) Clone() Provider {
-	return newCGoZStdProvider(z.compressionLevel)
+	return newCGoZStdProvider(z.level)
 }
diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go
index f9839c5..06f203e 100644
--- a/pulsar/internal/compression/zstd_go.go
+++ b/pulsar/internal/compression/zstd_go.go
@@ -23,16 +23,23 @@ import (
 )
 
 type zstdProvider struct {
-	compressionLevel zstd.EncoderLevel
+	compressionLevel Level
 	encoder          *zstd.Encoder
 	decoder          *zstd.Decoder
 }
 
-func newPureGoZStdProvider(compressionLevel zstd.EncoderLevel) Provider {
-	p := &zstdProvider{
-		compressionLevel: compressionLevel,
+func newPureGoZStdProvider(level Level) Provider {
+	var zstdLevel zstd.EncoderLevel
+	p := &zstdProvider{}
+	switch level {
+	case Default:
+		zstdLevel = zstd.SpeedDefault
+	case Faster:
+		zstdLevel = zstd.SpeedFastest
+	case Better:
+		zstdLevel = zstd.SpeedBetterCompression
 	}
-	p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(compressionLevel))
+	p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstdLevel))
 	p.decoder, _ = zstd.NewReader(nil)
 	return p
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 2d630f1..7d44a56 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -40,6 +40,19 @@ const (
 	ZSTD
 )
 
+type CompressionLevel int
+
+const (
+	// Default compression level
+	Default CompressionLevel = iota
+
+	// Faster compression, with lower compression ration
+	Faster
+
+	// Higher compression rate, but slower
+	Better
+)
+
 // TopicMetadata is a interface of topic metadata
 type TopicMetadata interface {
 	// NumPartitions get the number of partitions for the specific topic
@@ -87,6 +100,12 @@ type ProducerOptions struct {
 	// release in order to be able to receive messages compressed with ZSTD.
 	CompressionType
 
+	// Define the desired compression level. Options:
+	// - Default
+	// - Faster
+	// - Better
+	CompressionLevel
+
 	// MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
 	// The router is a function that given a particular message and the topic metadata, returns the
 	// partition index where the message should be routed to
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 284e505..32b0da0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -24,6 +24,8 @@ import (
 	"sync/atomic"
 	"time"
 
+	"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+
 	"github.com/golang/protobuf/proto"
 
 	log "github.com/sirupsen/logrus"
@@ -152,7 +154,8 @@ func (p *partitionProducer) grabCnx() error {
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
 	if p.batchBuilder == nil {
 		p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
-			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType))
+			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
+			compression.Level(p.options.CompressionLevel))
 		if err != nil {
 			return err
 		}