You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 22:00:32 UTC

[pulsar-client-go] branch master updated: Use pure Go LZ4 compression

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a913fc6  Use pure Go LZ4 compression
     new b7700c0  Merge pull request #2 from merlimat/lz4-go
a913fc6 is described below

commit a913fc686980249ffac5ab92a2f02cbdd1039bf8
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun May 12 09:09:10 2019 -0700

    Use pure Go LZ4 compression
---
 pulsar/internal/compression/lz4.go | 40 +++++++++++++++++++++++++++++++++-----
 1 file changed, 35 insertions(+), 5 deletions(-)

diff --git a/pulsar/internal/compression/lz4.go b/pulsar/internal/compression/lz4.go
index da1868c..f5a6d06 100644
--- a/pulsar/internal/compression/lz4.go
+++ b/pulsar/internal/compression/lz4.go
@@ -20,7 +20,7 @@
 package compression
 
 import (
-	"github.com/cloudflare/golz4"
+	"github.com/pierrec/lz4"
 )
 
 type lz4Provider struct {
@@ -31,17 +31,47 @@ func NewLz4Provider() Provider {
 }
 
 func (lz4Provider) Compress(data []byte) []byte {
-	maxSize := lz4.CompressBound(data)
+	const tableSize = 1 << 16
+	hashTable := make([]int, tableSize)
+
+	maxSize := lz4.CompressBlockBound(len(data))
 	compressed := make([]byte, maxSize)
-	size, err := lz4.Compress(data, compressed)
+	size, err := lz4.CompressBlock(data, compressed, hashTable)
 	if err != nil {
 		panic("Failed to compress")
 	}
-	return compressed[:size]
+
+	if size == 0 {
+		// The data block was not compressed. Just repeat it with
+		// the block header flag to signal it's not compressed
+		headerSize := writeSize(len(data), compressed)
+		copy(compressed[headerSize:], data)
+		return compressed[:len(data)+headerSize]
+	} else {
+		return compressed[:size]
+	}
+}
+
+// Write the encoded size for the uncompressed payload
+func writeSize(size int, dst []byte) int {
+	if size < 0xF {
+		dst[0] |= byte(size << 4)
+		return 1
+	} else {
+		dst[0] |= 0xF0
+		l := size - 0xF
+		i := 1
+		for ; l >= 0xFF; l -= 0xFF {
+			dst[i] = 0xFF
+			i++
+		}
+		dst[i] = byte(l)
+		return i + 1
+	}
 }
 
 func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
 	uncompressed := make([]byte, originalSize)
-	err := lz4.Uncompress(compressedData, uncompressed)
+	_, err := lz4.UncompressBlock(compressedData, uncompressed)
 	return uncompressed, err
 }