You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/19 03:48:13 UTC

[pulsar-client-go] branch master updated: Switched to DataDog zstd wrapper, reusing the compression ctx (#287)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 7912594  Switched to DataDog zstd wrapper, reusing the compression ctx (#287)
7912594 is described below

commit 7912594f37e6805c4496b4c3d6d803de59917e66
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 18 20:48:04 2020 -0700

    Switched to DataDog zstd wrapper, reusing the compression ctx (#287)
    
    --- Before
    
    BenchmarkCompression/zstd-cgo-level-fastest-16       	    2272	    513861 ns/op	 195.78 MB/s
    BenchmarkCompression/zstd-cgo-level-default-16       	    1477	    772904 ns/op	 130.17 MB/s
    BenchmarkCompression/zstd-cgo-level-best-16          	     254	   4670399 ns/op	  21.54 MB/s
    BenchmarkDecompression/zstd-cgo-level-fastest-16     	    8382	    154040 ns/op	 653.11 MB/s
    BenchmarkDecompression/zstd-cgo-level-default-16     	    7524	    156934 ns/op	 641.07 MB/s
    BenchmarkDecompression/zstd-cgo-level-best-16        	    7748	    162531 ns/op	 619.00 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-fastest-16         	   22719	     54002 ns/op	1862.99 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-default-16         	   13485	     88328 ns/op	1139.01 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-best-16            	    2161	    561107 ns/op	 179.30 MB/s
    
    --- After
    
    BenchmarkCompression/zstd-cgo-level-fastest-16       	    2366	    472138 ns/op	 213.09 MB/s
    BenchmarkCompression/zstd-cgo-level-default-16       	    1576	    756111 ns/op	 133.06 MB/s
    BenchmarkCompression/zstd-cgo-level-best-16          	     271	   4452809 ns/op	  22.59 MB/s
    BenchmarkDecompression/zstd-cgo-level-fastest-16     	    9352	    132880 ns/op	 757.12 MB/s
    BenchmarkDecompression/zstd-cgo-level-default-16     	    8473	    142471 ns/op	 706.15 MB/s
    BenchmarkDecompression/zstd-cgo-level-best-16        	    7413	    147722 ns/op	 681.05 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-fastest-16         	   23857	     50493 ns/op	1992.49 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-default-16         	   13832	     87510 ns/op	1149.66 MB/s
    BenchmarkCompressionParallel/zstd-cgo-level-best-16            	    2220	    526170 ns/op	 191.20 MB/s
---
 go.mod                                  |  2 +-
 go.sum                                  |  4 ++--
 pulsar/internal/compression/zstd_cgo.go | 16 ++++++++++++----
 3 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/go.mod b/go.mod
index f807d50..21c774d 100644
--- a/go.mod
+++ b/go.mod
@@ -3,6 +3,7 @@ module github.com/apache/pulsar-client-go
 go 1.12
 
 require (
+	github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
 	github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
 	github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
 	github.com/golang/protobuf v1.3.1
@@ -15,6 +16,5 @@ require (
 	github.com/spf13/cobra v0.0.3
 	github.com/spf13/pflag v1.0.3 // indirect
 	github.com/stretchr/testify v1.4.0
-	github.com/valyala/gozstd v1.7.0
 	github.com/yahoo/athenz v1.8.55
 )
diff --git a/go.sum b/go.sum
index 393865f..333286d 100644
--- a/go.sum
+++ b/go.sum
@@ -1,4 +1,6 @@
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
+github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
 github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
 github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
 github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
@@ -49,8 +51,6 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
-github.com/valyala/gozstd v1.7.0 h1:Ljh5c9zboqLhwTI33al32R72iCZfn0mCbVGcFWbGwRQ=
-github.com/valyala/gozstd v1.7.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
 github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
 github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go
index bf396a5..f6f6c45 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -24,29 +24,37 @@
 package compression
 
 import (
-	zstd "github.com/valyala/gozstd"
+	"github.com/DataDog/zstd"
+	log "github.com/sirupsen/logrus"
 )
 
 type zstdCGoProvider struct {
+	ctx              zstd.Ctx
 	compressionLevel int
 }
 
 func newCGoZStdProvider(compressionLevel int) Provider {
 	return &zstdCGoProvider{
 		compressionLevel: compressionLevel,
+		ctx:              zstd.NewCtx(),
 	}
 }
 
 func NewZStdProvider() Provider {
-	return newCGoZStdProvider(zstd.DefaultCompressionLevel)
+	return newCGoZStdProvider(zstd.DefaultCompression)
 }
 
 func (z *zstdCGoProvider) Compress(data []byte) []byte {
-	return zstd.CompressLevel(nil, data, z.compressionLevel)
+	out, err := z.ctx.CompressLevel(nil, data, z.compressionLevel)
+	if err != nil {
+		log.WithError(err).Fatal("Failed to compress")
+	}
+
+	return out
 }
 
 func (z *zstdCGoProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
-	return zstd.Decompress(nil, compressedData)
+	return z.ctx.Decompress(nil, compressedData)
 }
 
 func (z *zstdCGoProvider) Close() error {