You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 22:00:32 UTC
[pulsar-client-go] branch master updated: Use pure Go LZ4
compression
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a913fc6 Use pure Go LZ4 compression
new b7700c0 Merge pull request #2 from merlimat/lz4-go
a913fc6 is described below
commit a913fc686980249ffac5ab92a2f02cbdd1039bf8
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sun May 12 09:09:10 2019 -0700
Use pure Go LZ4 compression
---
pulsar/internal/compression/lz4.go | 40 +++++++++++++++++++++++++++++++++-----
1 file changed, 35 insertions(+), 5 deletions(-)
diff --git a/pulsar/internal/compression/lz4.go b/pulsar/internal/compression/lz4.go
index da1868c..f5a6d06 100644
--- a/pulsar/internal/compression/lz4.go
+++ b/pulsar/internal/compression/lz4.go
@@ -20,7 +20,7 @@
package compression
import (
- "github.com/cloudflare/golz4"
+ "github.com/pierrec/lz4"
)
type lz4Provider struct {
@@ -31,17 +31,47 @@ func NewLz4Provider() Provider {
}
func (lz4Provider) Compress(data []byte) []byte {
- maxSize := lz4.CompressBound(data)
+ const tableSize = 1 << 16
+ hashTable := make([]int, tableSize)
+
+ maxSize := lz4.CompressBlockBound(len(data))
compressed := make([]byte, maxSize)
- size, err := lz4.Compress(data, compressed)
+ size, err := lz4.CompressBlock(data, compressed, hashTable)
if err != nil {
panic("Failed to compress")
}
- return compressed[:size]
+
+ if size == 0 {
+ // The data block was not compressed. Just repeat it with
+ // the block header flag to signal it's not compressed
+ headerSize := writeSize(len(data), compressed)
+ copy(compressed[headerSize:], data)
+ return compressed[:len(data)+headerSize]
+ } else {
+ return compressed[:size]
+ }
+}
+
+// Write the encoded size for the uncompressed payload
+func writeSize(size int, dst []byte) int {
+ if size < 0xF {
+ dst[0] |= byte(size << 4)
+ return 1
+ } else {
+ dst[0] |= 0xF0
+ l := size - 0xF
+ i := 1
+ for ; l >= 0xFF; l -= 0xFF {
+ dst[i] = 0xFF
+ i++
+ }
+ dst[i] = byte(l)
+ return i + 1
+ }
}
func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
uncompressed := make([]byte, originalSize)
- err := lz4.Uncompress(compressedData, uncompressed)
+ _, err := lz4.UncompressBlock(compressedData, uncompressed)
return uncompressed, err
}