You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/19 04:21:24 UTC
[pulsar-client-go] branch master updated: Allow applications to
configure the compression level (#290)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6bc4949 Allow applications to configure the compression level (#290)
6bc4949 is described below
commit 6bc4949d120bb276e63e954ac16e24c08158f84c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 18 21:21:15 2020 -0700
Allow applications to configure the compression level (#290)
---
pulsar/consumer_partition.go | 2 +-
pulsar/internal/batch_builder.go | 9 ++++---
pulsar/internal/compression/compression.go | 8 ++++++
.../internal/compression/compression_bench_test.go | 12 ++++-----
pulsar/internal/compression/compression_test.go | 2 +-
pulsar/internal/compression/zstd.go | 10 +++----
pulsar/internal/compression/zstd_cgo.go | 31 +++++++++++++++-------
pulsar/internal/compression/zstd_go.go | 17 ++++++++----
pulsar/producer.go | 19 +++++++++++++
pulsar/producer_partition.go | 5 +++-
10 files changed, 81 insertions(+), 34 deletions(-)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index a74d88c..b7bf261 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -877,7 +877,7 @@ func (pc *partitionConsumer) initializeCompressionProvider(
case pb.CompressionType_LZ4:
return compression.NewLz4Provider(), nil
case pb.CompressionType_ZSTD:
- return compression.NewZStdProvider(), nil
+ return compression.NewZStdProvider(compression.Default), nil
}
return nil, fmt.Errorf("unsupported compression type: %v", compressionType)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 78be00e..61c5ced 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -62,7 +62,7 @@ type BatchBuilder struct {
// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
- compressionType pb.CompressionType) (*BatchBuilder, error) {
+ compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
@@ -84,7 +84,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
ProducerName: &producerName,
},
callbacks: []interface{}{},
- compressionProvider: getCompressionProvider(compressionType),
+ compressionProvider: getCompressionProvider(compressionType, level),
}
if compressionType != pb.CompressionType_NONE {
@@ -176,7 +176,8 @@ func (bb *BatchBuilder) Close() error {
return bb.compressionProvider.Close()
}
-func getCompressionProvider(compressionType pb.CompressionType) compression.Provider {
+func getCompressionProvider(compressionType pb.CompressionType,
+ level compression.Level) compression.Provider {
switch compressionType {
case pb.CompressionType_NONE:
return compression.NewNoopProvider()
@@ -185,7 +186,7 @@ func getCompressionProvider(compressionType pb.CompressionType) compression.Prov
case pb.CompressionType_ZLIB:
return compression.NewZLibProvider()
case pb.CompressionType_ZSTD:
- return compression.NewZStdProvider()
+ return compression.NewZStdProvider(level)
default:
log.Panic("unsupported compression type")
return nil
diff --git a/pulsar/internal/compression/compression.go b/pulsar/internal/compression/compression.go
index e45d18f..90adeeb 100644
--- a/pulsar/internal/compression/compression.go
+++ b/pulsar/internal/compression/compression.go
@@ -19,6 +19,14 @@ package compression
import "io"
+type Level int
+
+const (
+ Default Level = iota
+ Faster
+ Better
+)
+
// Provider is a interface of compression providers
type Provider interface {
// Compress a []byte, the param is a []byte with the uncompressed content.
diff --git a/pulsar/internal/compression/compression_bench_test.go b/pulsar/internal/compression/compression_bench_test.go
index fea2eb8..1f40321 100644
--- a/pulsar/internal/compression/compression_bench_test.go
+++ b/pulsar/internal/compression/compression_bench_test.go
@@ -64,12 +64,12 @@ func testDecompression(b *testing.B, provider Provider) {
var benchmarkProviders = []testProvider{
{"zlib", NewZLibProvider(), nil},
{"lz4", NewLz4Provider(), nil},
- {"zstd-pure-go-fastest", newPureGoZStdProvider(1), nil},
- {"zstd-pure-go-default", newPureGoZStdProvider(2), nil},
- {"zstd-pure-go-best", newPureGoZStdProvider(3), nil},
- {"zstd-cgo-level-fastest", newCGoZStdProvider(1), nil},
- {"zstd-cgo-level-default", newCGoZStdProvider(3), nil},
- {"zstd-cgo-level-best", newCGoZStdProvider(9), nil},
+ {"zstd-pure-go-fastest", newPureGoZStdProvider(Faster), nil},
+ {"zstd-pure-go-default", newPureGoZStdProvider(Default), nil},
+ {"zstd-pure-go-best", newPureGoZStdProvider(Better), nil},
+ {"zstd-cgo-level-fastest", newCGoZStdProvider(Faster), nil},
+ {"zstd-cgo-level-default", newCGoZStdProvider(Default), nil},
+ {"zstd-cgo-level-best", newCGoZStdProvider(Better), nil},
}
func BenchmarkCompression(b *testing.B) {
diff --git a/pulsar/internal/compression/compression_test.go b/pulsar/internal/compression/compression_test.go
index 7df821f..cfb00b2 100644
--- a/pulsar/internal/compression/compression_test.go
+++ b/pulsar/internal/compression/compression_test.go
@@ -34,7 +34,7 @@ type testProvider struct {
var providers = []testProvider{
{"zlib", NewZLibProvider(), []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
{"lz4", NewLz4Provider(), []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
- {"zstd", NewZStdProvider(),
+ {"zstd", NewZStdProvider(Default),
[]byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
}
diff --git a/pulsar/internal/compression/zstd.go b/pulsar/internal/compression/zstd.go
index a912403..694f9e7 100644
--- a/pulsar/internal/compression/zstd.go
+++ b/pulsar/internal/compression/zstd.go
@@ -21,17 +21,15 @@ package compression
import (
"fmt"
-
- "github.com/klauspost/compress/zstd"
)
-func NewZStdProvider() Provider {
- return newPureGoZStdProvider(zstd.SpeedDefault)
+func NewZStdProvider(level Level) Provider {
+ return newPureGoZStdProvider(level)
}
-func newCGoZStdProvider(compressionLevel int) Provider {
+func newCGoZStdProvider(level Level) Provider {
// This is kept to avoid compile errors in benchmark code when cgo is disabled.
// The warning is only shown when running the benchmark with CGO disabled.
fmt.Println("WARNING: CGO is disabled, using pure Go implementation of ZStd. Use CGO_ENABLED=1 when running benchmark.")
- return newPureGoZStdProvider(zstd.SpeedDefault)
+ return newPureGoZStdProvider(level)
}
diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go
index f6f6c45..9c43cc4 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -29,23 +29,34 @@ import (
)
type zstdCGoProvider struct {
- ctx zstd.Ctx
- compressionLevel int
+ ctx zstd.Ctx
+ level Level
+ zstdLevel int
}
-func newCGoZStdProvider(compressionLevel int) Provider {
- return &zstdCGoProvider{
- compressionLevel: compressionLevel,
- ctx: zstd.NewCtx(),
+func newCGoZStdProvider(level Level) Provider {
+ z := &zstdCGoProvider{
+ ctx: zstd.NewCtx(),
}
+
+ switch level {
+ case Default:
+ z.zstdLevel = zstd.DefaultCompression
+ case Faster:
+ z.zstdLevel = zstd.BestSpeed
+ case Better:
+ z.zstdLevel = 9
+ }
+
+ return z
}
-func NewZStdProvider() Provider {
- return newCGoZStdProvider(zstd.DefaultCompression)
+func NewZStdProvider(level Level) Provider {
+ return newCGoZStdProvider(level)
}
func (z *zstdCGoProvider) Compress(data []byte) []byte {
- out, err := z.ctx.CompressLevel(nil, data, z.compressionLevel)
+ out, err := z.ctx.CompressLevel(nil, data, z.zstdLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}
@@ -62,5 +73,5 @@ func (z *zstdCGoProvider) Close() error {
}
func (z *zstdCGoProvider) Clone() Provider {
- return newCGoZStdProvider(z.compressionLevel)
+ return newCGoZStdProvider(z.level)
}
diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go
index f9839c5..06f203e 100644
--- a/pulsar/internal/compression/zstd_go.go
+++ b/pulsar/internal/compression/zstd_go.go
@@ -23,16 +23,23 @@ import (
)
type zstdProvider struct {
- compressionLevel zstd.EncoderLevel
+ compressionLevel Level
encoder *zstd.Encoder
decoder *zstd.Decoder
}
-func newPureGoZStdProvider(compressionLevel zstd.EncoderLevel) Provider {
- p := &zstdProvider{
- compressionLevel: compressionLevel,
+func newPureGoZStdProvider(level Level) Provider {
+ var zstdLevel zstd.EncoderLevel
+ p := &zstdProvider{}
+ switch level {
+ case Default:
+ zstdLevel = zstd.SpeedDefault
+ case Faster:
+ zstdLevel = zstd.SpeedFastest
+ case Better:
+ zstdLevel = zstd.SpeedBetterCompression
}
- p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(compressionLevel))
+ p.encoder, _ = zstd.NewWriter(nil, zstd.WithEncoderLevel(zstdLevel))
p.decoder, _ = zstd.NewReader(nil)
return p
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index 2d630f1..7d44a56 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -40,6 +40,19 @@ const (
ZSTD
)
+type CompressionLevel int
+
+const (
+ // Default compression level
+ Default CompressionLevel = iota
+
+ // Faster compression, with lower compression ration
+ Faster
+
+ // Higher compression rate, but slower
+ Better
+)
+
// TopicMetadata is a interface of topic metadata
type TopicMetadata interface {
// NumPartitions get the number of partitions for the specific topic
@@ -87,6 +100,12 @@ type ProducerOptions struct {
// release in order to be able to receive messages compressed with ZSTD.
CompressionType
+ // Define the desired compression level. Options:
+ // - Default
+ // - Faster
+ // - Better
+ CompressionLevel
+
// MessageRouter set a custom message routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular message and the topic metadata, returns the
// partition index where the message should be routed to
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 284e505..32b0da0 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
@@ -152,7 +154,8 @@ func (p *partitionProducer) grabCnx() error {
p.producerName = res.Response.ProducerSuccess.GetProducerName()
if p.batchBuilder == nil {
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
- p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType))
+ p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
+ compression.Level(p.options.CompressionLevel))
if err != nil {
return err
}