You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/15 18:36:52 UTC
[pulsar-client-go] branch master updated: Use pure ZStd
decompression is CGo is disabled (#3)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new f562db9 Use pure ZStd decompression is CGo is disabled (#3)
f562db9 is described below
commit f562db95dd03cf74c49f0ebe4260d6c2a5a0fca9
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed May 15 11:36:48 2019 -0700
Use pure ZStd decompression is CGo is disabled (#3)
---
pkg/compression/compression.go | 2 ++
pkg/compression/compression_test.go | 6 ++++-
pkg/compression/lz4.go | 4 ++++
pkg/compression/noop.go | 4 ++++
pkg/compression/zlib.go | 4 ++++
pkg/compression/zstd.go | 15 +++++++++++-
pkg/compression/{zstd.go => zstd_native.go} | 36 +++++++++++++++++++++++++----
pulsar/impl_partition_producer.go | 5 +++-
pulsar/internal/batch_builder.go | 10 ++++++--
9 files changed, 77 insertions(+), 9 deletions(-)
diff --git a/pkg/compression/compression.go b/pkg/compression/compression.go
index a4aa652..e9eac6d 100644
--- a/pkg/compression/compression.go
+++ b/pkg/compression/compression.go
@@ -20,6 +20,8 @@
package compression
type Provider interface {
+ CanCompress() bool
+
Compress(data []byte) []byte
Decompress(compressedData []byte, originalSize int) ([]byte, error)
diff --git a/pkg/compression/compression_test.go b/pkg/compression/compression_test.go
index b21f0fc..84e95bc 100644
--- a/pkg/compression/compression_test.go
+++ b/pkg/compression/compression_test.go
@@ -41,6 +41,10 @@ var providers = []testProvider{
func TestCompression(t *testing.T) {
for _, p := range providers {
t.Run(p.name, func(t *testing.T) {
+ if !p.provider.CanCompress() {
+ return
+ }
+
hello := []byte("test compression data")
compressed := p.provider.Compress(hello)
uncompressed, err := p.provider.Decompress(compressed, len(hello))
@@ -64,7 +68,7 @@ func TestJavaCompatibility(t *testing.T) {
func TestDecompressionError(t *testing.T) {
for _, p := range providers {
t.Run(p.name, func(t *testing.T) {
- _, err := p.provider.Decompress([]byte{0x05}, 0)
+ _, err := p.provider.Decompress([]byte{0x05}, 10)
assert.NotNil(t, err)
})
}
diff --git a/pkg/compression/lz4.go b/pkg/compression/lz4.go
index f5a6d06..383c283 100644
--- a/pkg/compression/lz4.go
+++ b/pkg/compression/lz4.go
@@ -30,6 +30,10 @@ func NewLz4Provider() Provider {
return &lz4Provider{}
}
+func (lz4Provider) CanCompress() bool {
+ return true
+}
+
func (lz4Provider) Compress(data []byte) []byte {
const tableSize = 1 << 16
hashTable := make([]int, tableSize)
diff --git a/pkg/compression/noop.go b/pkg/compression/noop.go
index c806862..dc9c8b3 100644
--- a/pkg/compression/noop.go
+++ b/pkg/compression/noop.go
@@ -25,6 +25,10 @@ func NewNoopProvider() Provider {
return &noopProvider{}
}
+func (noopProvider) CanCompress() bool {
+ return true
+}
+
func (noopProvider) Compress(data []byte) []byte {
return data
}
diff --git a/pkg/compression/zlib.go b/pkg/compression/zlib.go
index 692a29a..0e4f107 100644
--- a/pkg/compression/zlib.go
+++ b/pkg/compression/zlib.go
@@ -30,6 +30,10 @@ func NewZLibProvider() Provider {
return &zlibProvider{}
}
+func (zlibProvider) CanCompress() bool {
+ return true
+}
+
func (zlibProvider) Compress(data []byte) []byte {
var b bytes.Buffer
w := zlib.NewWriter(&b)
diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd.go
index 8d52e9e..e9bdc65 100644
--- a/pkg/compression/zstd.go
+++ b/pkg/compression/zstd.go
@@ -17,18 +17,31 @@
// under the License.
//
+// +build cgo
+
+// If CGO is enabled, use ZSTD library that links with official
+// C based zstd. This can perform both compression and decompression.
+
package compression
import (
zstd "github.com/valyala/gozstd"
)
+func NewZStdProvider() Provider {
+ return newCGoZStdProvider()
+}
+
type zstdProvider struct {}
-func NewZStdProvider() Provider {
+func newCGoZStdProvider() Provider {
return &zstdProvider{}
}
+func (zstdProvider) CanCompress() bool {
+ return true
+}
+
func (zstdProvider) Compress(data []byte) []byte {
return zstd.Compress(nil, data)
}
diff --git a/pkg/compression/zstd.go b/pkg/compression/zstd_native.go
similarity index 58%
copy from pkg/compression/zstd.go
copy to pkg/compression/zstd_native.go
index 8d52e9e..9cf1124 100644
--- a/pkg/compression/zstd.go
+++ b/pkg/compression/zstd_native.go
@@ -17,22 +17,50 @@
// under the License.
//
+// +build !cgo
+
+// Pure GO ZStd library only supports decompression
+
package compression
import (
- zstd "github.com/valyala/gozstd"
+ "bytes"
+ "github.com/klauspost/compress/zstd"
+ "github.com/pkg/errors"
+ log "github.com/sirupsen/logrus"
)
-type zstdProvider struct {}
+// Pure GO ZStd library only supports decompression
+
+type zstdProvider struct {
+}
func NewZStdProvider() Provider {
return &zstdProvider{}
}
+func (zstdProvider) CanCompress() bool {
+ return false
+}
+
func (zstdProvider) Compress(data []byte) []byte {
- return zstd.Compress(nil, data)
+ log.Panic("ZSTD compression is not supported if CGo is disabled")
+ return nil
}
func (zstdProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
- return zstd.Decompress(nil, compressedData)
+ d, err := zstd.NewReader(bytes.NewReader(compressedData))
+ if err != nil {
+ return nil, err
+ }
+
+ uncompressed := make([]byte, originalSize)
+ size, err := d.Read(uncompressed)
+ if err != nil {
+ return nil, err
+ } else if size != originalSize {
+ return nil, errors.New("Invalid uncompressed size")
+ } else {
+ return uncompressed, nil
+ }
}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 215916d..c173ac6 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -140,8 +140,11 @@ func (p *partitionProducer) grabCnx() error {
p.producerName = res.Response.ProducerSuccess.ProducerName
if p.batchBuilder == nil {
- p.batchBuilder = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
+ p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
p.producerId, pb.CompressionType(p.options.CompressionType))
+ if err != nil {
+ return err
+ }
}
if p.sequenceIdGenerator == nil {
nextSequenceId := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 56cbb2e..9c0c731 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -20,6 +20,8 @@
package internal
import (
+ "errors"
+ "fmt"
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
"github.com/apache/pulsar-client-go/pkg/compression"
@@ -53,7 +55,7 @@ type BatchBuilder struct {
}
func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
- compressionType pb.CompressionType) *BatchBuilder {
+ compressionType pb.CompressionType) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
@@ -78,7 +80,11 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
bb.msgMetadata.Compression = &compressionType
}
- return bb
+ if !bb.compressionProvider.CanCompress() {
+ return nil, errors.New(fmt.Sprintf("Compression provider %v can only decompress data", compressionType))
+ }
+
+ return bb, nil
}
func (bb *BatchBuilder) IsFull() bool {