You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:55 UTC

[pulsar-client-go] 18/38: Added compression codecs and tests

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit f9fa72736270b99fb95476645d81cc3fd0704993
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Apr 11 17:50:11 2019 -0700

    Added compression codecs and tests
---
 pulsar/impl/compression/compression.go      | 14 ++++++++
 pulsar/impl/compression/compression_test.go | 52 +++++++++++++++++++++++++++++
 pulsar/impl/compression/lz4.go              | 28 ++++++++++++++++
 pulsar/impl/compression/noop.go             | 16 +++++++++
 pulsar/impl/compression/zlib.go             | 35 +++++++++++++++++++
 pulsar/impl/compression/zstd.go             | 20 +++++++++++
 6 files changed, 165 insertions(+)

diff --git a/pulsar/impl/compression/compression.go b/pulsar/impl/compression/compression.go
new file mode 100644
index 0000000..107485b
--- /dev/null
+++ b/pulsar/impl/compression/compression.go
@@ -0,0 +1,14 @@
+package compression
+
+type Provider interface {
+	Compress(data []byte) []byte
+
+	Decompress(compressedData []byte, originalSize int) ([]byte, error)
+}
+
+var (
+	NoopProvider = NewNoopProvider()
+	ZLibProvider = NewZLibProvider()
+	Lz4Provider  = NewLz4Provider()
+	ZStdProvider = NewZStdProvider()
+)
diff --git a/pulsar/impl/compression/compression_test.go b/pulsar/impl/compression/compression_test.go
new file mode 100644
index 0000000..ef57402
--- /dev/null
+++ b/pulsar/impl/compression/compression_test.go
@@ -0,0 +1,52 @@
+package compression
+
+import (
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+type testProvider struct {
+	name     string
+	provider Provider
+
+	// Compressed data for "hello"
+	compressedHello []byte
+}
+
+var providers = []testProvider{
+	{"zlib", ZLibProvider, []byte{0x78, 0x9c, 0xca, 0x48, 0xcd, 0xc9, 0xc9, 0x07, 0x00, 0x00, 0x00, 0xff, 0xff}},
+	{"lz4", Lz4Provider, []byte{0x50, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
+	{"zstd", ZStdProvider, []byte{0x28, 0xb5, 0x2f, 0xfd, 0x20, 0x05, 0x29, 0x00, 0x00, 0x68, 0x65, 0x6c, 0x6c, 0x6f}},
+}
+
+func TestCompression(t *testing.T) {
+	for _, p := range providers {
+		t.Run(p.name, func(t *testing.T) {
+			hello := []byte("test compression data")
+			compressed := p.provider.Compress(hello)
+			uncompressed, err := p.provider.Decompress(compressed, len(hello))
+			assert.Nil(t, err)
+			assert.ElementsMatch(t, hello, uncompressed)
+		})
+	}
+}
+
+func TestJavaCompatibility(t *testing.T) {
+	for _, p := range providers {
+		t.Run(p.name, func(t *testing.T) {
+			hello := []byte("hello")
+			uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello))
+			assert.Nil(t, err)
+			assert.ElementsMatch(t, hello, uncompressed)
+		})
+	}
+}
+
+func TestDecompressionError(t *testing.T) {
+	for _, p := range providers {
+		t.Run(p.name, func(t *testing.T) {
+			_, err := p.provider.Decompress([]byte{0x05}, 0)
+			assert.NotNil(t, err)
+		})
+	}
+}
diff --git a/pulsar/impl/compression/lz4.go b/pulsar/impl/compression/lz4.go
new file mode 100644
index 0000000..cb725dd
--- /dev/null
+++ b/pulsar/impl/compression/lz4.go
@@ -0,0 +1,28 @@
+package compression
+
+import (
+	"github.com/cloudflare/golz4"
+)
+
+type lz4Provider struct {
+}
+
+func NewLz4Provider() Provider {
+	return &lz4Provider{}
+}
+
+func (lz4Provider) Compress(data []byte) []byte {
+	maxSize := lz4.CompressBound(data)
+	compressed := make([]byte, maxSize)
+	size, err := lz4.Compress(data, compressed)
+	if err != nil {
+		panic("Failed to compress")
+	}
+	return compressed[:size]
+}
+
+func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+	uncompressed := make([]byte, originalSize)
+	err := lz4.Uncompress(compressedData, uncompressed)
+	return uncompressed, err
+}
diff --git a/pulsar/impl/compression/noop.go b/pulsar/impl/compression/noop.go
new file mode 100644
index 0000000..2267d66
--- /dev/null
+++ b/pulsar/impl/compression/noop.go
@@ -0,0 +1,16 @@
+package compression
+
+type noopProvider struct {
+}
+
+func NewNoopProvider() Provider {
+	return &noopProvider{}
+}
+
+func (noopProvider) Compress(data []byte) []byte {
+	return data
+}
+
+func (noopProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+	return compressedData, nil
+}
diff --git a/pulsar/impl/compression/zlib.go b/pulsar/impl/compression/zlib.go
new file mode 100644
index 0000000..ea184e6
--- /dev/null
+++ b/pulsar/impl/compression/zlib.go
@@ -0,0 +1,35 @@
+package compression
+
+import (
+	"bytes"
+	"compress/zlib"
+)
+
+type zlibProvider struct {
+}
+
+func NewZLibProvider() Provider {
+	return &zlibProvider{}
+}
+
+func (zlibProvider) Compress(data []byte) []byte {
+	var b bytes.Buffer
+	w := zlib.NewWriter(&b)
+	w.Write(data)
+	w.Close()
+
+	return b.Bytes()
+}
+
+func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+	r, err := zlib.NewReader(bytes.NewBuffer(compressedData))
+	if err != nil {
+		return nil, err
+	}
+
+	uncompressed := make([]byte, originalSize)
+	r.Read(uncompressed)
+	r.Close()
+
+	return uncompressed, nil
+}
diff --git a/pulsar/impl/compression/zstd.go b/pulsar/impl/compression/zstd.go
new file mode 100644
index 0000000..bf61be8
--- /dev/null
+++ b/pulsar/impl/compression/zstd.go
@@ -0,0 +1,20 @@
+package compression
+
+import (
+	zstd "github.com/valyala/gozstd"
+)
+
+type zstdProvider struct {
+}
+
+func NewZStdProvider() Provider {
+	return &zstdProvider{}
+}
+
+func (zstdProvider) Compress(data []byte) []byte {
+	return zstd.Compress(nil, data)
+}
+
+func (zstdProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
+	return zstd.Decompress(nil, compressedData)
+}