You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:55 UTC
[pulsar-client-go] 18/38: Added compression codecs and tests
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit f9fa72736270b99fb95476645d81cc3fd0704993
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Apr 11 17:50:11 2019 -0700
Added compression codecs and tests
---
pulsar/impl/compression/compression.go | 14 ++++++++
pulsar/impl/compression/compression_test.go | 52 +++++++++++++++++++++++++++++
pulsar/impl/compression/lz4.go | 28 ++++++++++++++++
pulsar/impl/compression/noop.go | 16 +++++++++
pulsar/impl/compression/zlib.go | 35 +++++++++++++++++++
pulsar/impl/compression/zstd.go | 20 +++++++++++
6 files changed, 165 insertions(+)
diff --git a/pulsar/impl/compression/compression.go b/pulsar/impl/compression/compression.go
new file mode 100644
index 0000000..107485b
--- /dev/null
+++ b/pulsar/impl/compression/compression.go
@@ -0,0 +1,14 @@
+package compression
+
+type Provider interface {
+ Compress(data []byte) []byte
+
+ Decompress(compressedData []byte, originalSize int) ([]byte, error)
+}
+
+var (
+ NoopProvider = NewNoopProvider()
+ ZLibProvider = NewZLibProvider()
+ Lz4Provider = NewLz4Provider()
+ ZStdProvider = NewZStdProvider()
+)
diff --git a/pulsar/impl/compression/compression_test.go b/pulsar/impl/compression/compression_test.go
new file mode 100644
index 0000000..ef57402
--- /dev/null
+++ b/pulsar/impl/compression/compression_test.go
@@ -0,0 +1,52 @@
+package compression
+
+import (
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+type testProvider struct {
+ name string
+ provider Provider
+
+ // Compressed data for "hello"
+ compressedHello []byte
+}
+
+var providers = []testProvider{
+ {"zlib", ZLibProvider, []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
+ {"lz4", Lz4Provider, []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
+ {"zstd", ZStdProvider, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
+}
+
+func TestCompression(t *testing.T) {
+ for _, p := range providers {
+ t.Run(p.name, func(t *testing.T) {
+ hello := []byte("test compression data")
+ compressed := p.provider.Compress(hello)
+ uncompressed, err := p.provider.Decompress(compressed, len(hello))
+ assert.Nil(t, err)
+ assert.ElementsMatch(t, hello, uncompressed)
+ })
+ }
+}
+
+func TestJavaCompatibility(t *testing.T) {
+ for _, p := range providers {
+ t.Run(p.name, func(t *testing.T) {
+ hello := []byte("hello")
+ uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello))
+ assert.Nil(t, err)
+ assert.ElementsMatch(t, hello, uncompressed)
+ })
+ }
+}
+
+func TestDecompressionError(t *testing.T) {
+ for _, p := range providers {
+ t.Run(p.name, func(t *testing.T) {
+ _, err := p.provider.Decompress([]byte{0x05}, 0)
+ assert.NotNil(t, err)
+ })
+ }
+}
diff --git a/pulsar/impl/compression/lz4.go b/pulsar/impl/compression/lz4.go
new file mode 100644
index 0000000..cb725dd
--- /dev/null
+++ b/pulsar/impl/compression/lz4.go
@@ -0,0 +1,28 @@
+package compression
+
+import (
+ "github.com/cloudflare/golz4"
+)
+
+type lz4Provider struct {
+}
+
+func NewLz4Provider() Provider {
+ return &lz4Provider{}
+}
+
+func (lz4Provider) Compress(data []byte) []byte {
+ maxSize := lz4.CompressBound(data)
+ compressed := make([]byte, maxSize)
+ size, err := lz4.Compress(data, compressed)
+ if err != nil {
+ panic("Failed to compress")
+ }
+ return compressed[:size]
+}
+
+func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+ uncompressed := make([]byte, originalSize)
+ err := lz4.Uncompress(compressedData, uncompressed)
+ return uncompressed, err
+}
diff --git a/pulsar/impl/compression/noop.go b/pulsar/impl/compression/noop.go
new file mode 100644
index 0000000..2267d66
--- /dev/null
+++ b/pulsar/impl/compression/noop.go
@@ -0,0 +1,16 @@
+package compression
+
+type noopProvider struct {
+}
+
+func NewNoopProvider() Provider {
+ return &noopProvider{}
+}
+
+func (noopProvider) Compress(data []byte) []byte {
+ return data
+}
+
+func (noopProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+ return compressedData, nil
+}
diff --git a/pulsar/impl/compression/zlib.go b/pulsar/impl/compression/zlib.go
new file mode 100644
index 0000000..ea184e6
--- /dev/null
+++ b/pulsar/impl/compression/zlib.go
@@ -0,0 +1,35 @@
+package compression
+
+import (
+ "bytes"
+ "compress/zlib"
+)
+
+type zlibProvider struct {
+}
+
+func NewZLibProvider() Provider {
+ return &zlibProvider{}
+}
+
+func (zlibProvider) Compress(data []byte) []byte {
+ var b bytes.Buffer
+ w := zlib.NewWriter(&b)
+ w.Write(data)
+ w.Close()
+
+ return b.Bytes()
+}
+
+func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+ r, err := zlib.NewReader(bytes.NewBuffer(compressedData))
+ if err != nil {
+ return nil, err
+ }
+
+ uncompressed := make([]byte, originalSize)
+ r.Read(uncompressed)
+ r.Close()
+
+ return uncompressed, nil
+}
diff --git a/pulsar/impl/compression/zstd.go b/pulsar/impl/compression/zstd.go
new file mode 100644
index 0000000..bf61be8
--- /dev/null
+++ b/pulsar/impl/compression/zstd.go
@@ -0,0 +1,20 @@
+package compression
+
+import (
+ zstd "github.com/valyala/gozstd"
+)
+
+type zstdProvider struct {
+}
+
+func NewZStdProvider() Provider {
+ return &zstdProvider{}
+}
+
+func (zstdProvider) Compress(data []byte) []byte {
+ return zstd.Compress(nil, data)
+}
+
+func (zstdProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+ return zstd.Decompress(nil, compressedData)
+}