You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 18:36:52 UTC

[pulsar-client-go] branch master updated: Use pure ZStd decompression is CGo is disabled (#3)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new f562db9  Use pure ZStd decompression is CGo is disabled (#3)
f562db9 is described below

commit f562db95dd03cf74c49f0ebe4260d6c2a5a0fca9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 15 11:36:48 2019 -0700

    Use pure ZStd decompression is CGo is disabled (#3)
---
 pkg/compression/compression.go              |  2 ++
 pkg/compression/compression_test.go         |  6 ++++-
 pkg/compression/lz4.go                      |  4 ++++
 pkg/compression/noop.go                     |  4 ++++
 pkg/compression/zlib.go                     |  4 ++++
 pkg/compression/zstd.go                     | 15 +++++++++++-
 pkg/compression/{zstd.go => zstd_native.go} | 36 +++++++++++++++++++++++++----
 pulsar/impl_partition_producer.go           |  5 +++-
 pulsar/internal/batch_builder.go            | 10 ++++++--
 9 files changed, 77 insertions(+), 9 deletions(-)

diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go
index a4aa652..e9eac6d 100644
--- a/pkg/compression/compression.go
+++ b/pkg/compression/compression.go
@@ -20,6 +20,8 @@
 package compression
 
 type Provider interface {
+	CanCompress() bool
+
 	Compress(data []byte) []byte
 
 	Decompress(compressedData []byte, originalSize int) ([]byte, error)
diff --git a/pkg/compression/compression_test.go b/pkg/compression/compression_test.go
index b21f0fc..84e95bc 100644
--- a/pkg/compression/compression_test.go
+++ b/pkg/compression/compression_test.go
@@ -41,6 +41,10 @@ var providers = []testProvider{
 func TestCompression(t *testing.T) {
 	for _, p := range providers {
 		t.Run(p.name, func(t *testing.T) {
+			if !p.provider.CanCompress() {
+				return
+			}
+
 			hello := []byte("test compression data")
 			compressed := p.provider.Compress(hello)
 			uncompressed, err := p.provider.Decompress(compressed, len(hello))
@@ -64,7 +68,7 @@ func TestJavaCompatibility(t *testing.T) {
 func TestDecompressionError(t *testing.T) {
 	for _, p := range providers {
 		t.Run(p.name, func(t *testing.T) {
-			_, err := p.provider.Decompress([]byte{0x05}, 0)
+			_, err := p.provider.Decompress([]byte{0x05}, 10)
 			assert.NotNil(t, err)
 		})
 	}
diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go
index f5a6d06..383c283 100644
--- a/pkg/compression/lz4.go
+++ b/pkg/compression/lz4.go
@@ -30,6 +30,10 @@ func NewLz4Provider() Provider {
 	return &lz4Provider{}
 }
 
+func (lz4Provider) CanCompress() bool {
+	return true
+}
+
 func (lz4Provider) Compress(data []byte) []byte {
 	const tableSize = 1 << 16
 	hashTable := make([]int, tableSize)
diff --git a/pkg/compression/noop.go b/pkg/compression/noop.go
index c806862..dc9c8b3 100644
--- a/pkg/compression/noop.go
+++ b/pkg/compression/noop.go
@@ -25,6 +25,10 @@ func NewNoopProvider() Provider {
 	return &noopProvider{}
 }
 
+func (noopProvider) CanCompress() bool {
+	return true
+}
+
 func (noopProvider) Compress(data []byte) []byte {
 	return data
 }
diff --git a/pkg/compression/zlib.go b/pkg/compression/zlib.go
index 692a29a..0e4f107 100644
--- a/pkg/compression/zlib.go
+++ b/pkg/compression/zlib.go
@@ -30,6 +30,10 @@ func NewZLibProvider() Provider {
 	return &zlibProvider{}
 }
 
+func (zlibProvider) CanCompress() bool {
+	return true
+}
+
 func (zlibProvider) Compress(data []byte) []byte {
 	var b bytes.Buffer
 	w := zlib.NewWriter(&b)
diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd.go
index 8d52e9e..e9bdc65 100644
--- a/pkg/compression/zstd.go
+++ b/pkg/compression/zstd.go
@@ -17,18 +17,31 @@
 // under the License.
 //
 
+// +build cgo
+
+// If CGO is enabled, use ZSTD library that links with official
+// C based zstd. This can perform both compression and decompression.
+
 package compression
 
 import (
 	zstd "github.com/valyala/gozstd"
 )
 
+func NewZStdProvider() Provider {
+	return newCGoZStdProvider()
+}
+
 type zstdProvider struct {}
 
-func NewZStdProvider() Provider {
+func newCGoZStdProvider() Provider {
 	return &zstdProvider{}
 }
 
+func (zstdProvider) CanCompress() bool {
+	return true
+}
+
 func (zstdProvider) Compress(data []byte) []byte {
 	return zstd.Compress(nil, data)
 }
diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd_native.go
similarity index 58%
copy from pkg/compression/zstd.go
copy to pkg/compression/zstd_native.go
index 8d52e9e..9cf1124 100644
--- a/pkg/compression/zstd.go
+++ b/pkg/compression/zstd_native.go
@@ -17,22 +17,50 @@
 // under the License.
 //
 
+// +build !cgo
+
+// Pure GO ZStd library only supports decompression
+
 package compression
 
 import (
-	zstd "github.com/valyala/gozstd"
+	"bytes"
+	"github.com/klauspost/compress/zstd"
+	"github.com/pkg/errors"
+	log "github.com/sirupsen/logrus"
 )
 
-type zstdProvider struct {}
+// Pure GO ZStd library only supports decompression
+
+type zstdProvider struct {
+}
 
 func NewZStdProvider() Provider {
 	return &zstdProvider{}
 }
 
+func (zstdProvider) CanCompress() bool {
+	return false
+}
+
 func (zstdProvider) Compress(data []byte) []byte {
-	return zstd.Compress(nil, data)
+	log.Panic("ZSTD compression is not supported if CGo is disabled")
+	return nil
 }
 
 func (zstdProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
-	return zstd.Decompress(nil, compressedData)
+	d, err := zstd.NewReader(bytes.NewReader(compressedData))
+	if err != nil {
+		return nil, err
+	}
+
+	uncompressed := make([]byte, originalSize)
+	size, err := d.Read(uncompressed)
+	if err != nil {
+		return nil, err
+	} else if size != originalSize {
+		return nil, errors.New("Invalid uncompressed size")
+	} else {
+		return uncompressed, nil
+	}
 }
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 215916d..c173ac6 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -140,8 +140,11 @@ func (p *partitionProducer) grabCnx() error {
 
 	p.producerName = res.Response.ProducerSuccess.ProducerName
 	if p.batchBuilder == nil {
-		p.batchBuilder = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
+		p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
 			p.producerId, pb.CompressionType(p.options.CompressionType))
+		if err != nil {
+			return err
+		}
 	}
 	if p.sequenceIdGenerator == nil {
 		nextSequenceId := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 56cbb2e..9c0c731 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -20,6 +20,8 @@
 package internal
 
 import (
+	"errors"
+	"fmt"
 	"github.com/golang/protobuf/proto"
 	log "github.com/sirupsen/logrus"
 	"github.com/apache/pulsar-client-go/pkg/compression"
@@ -53,7 +55,7 @@ type BatchBuilder struct {
 }
 
 func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
-	compressionType pb.CompressionType) *BatchBuilder {
+	compressionType pb.CompressionType) (*BatchBuilder, error) {
 	if maxMessages == 0 {
 		maxMessages = DefaultMaxMessagesPerBatch
 	}
@@ -78,7 +80,11 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
 		bb.msgMetadata.Compression = &compressionType
 	}
 
-	return bb
+	if !bb.compressionProvider.CanCompress() {
+		return nil, errors.New(fmt.Sprintf("Compression provider %v can only decompress data", compressionType))
+	}
+
+	return bb, nil
 }
 
 func (bb *BatchBuilder) IsFull() bool {