You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/22 16:05:06 UTC

[pulsar-client-go] branch master updated: Used pooled buffering for compression and batch serialization (#292)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2d66c7b  Used pooled buffering for compression and batch serialization (#292)
2d66c7b is described below

commit 2d66c7bf49c7a36ff8412d4b139cdd62dc5b26cc
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 22 09:04:55 2020 -0700

    Used pooled buffering for compression and batch serialization (#292)
---
 pulsar/client_impl_test.go                         |  5 ++-
 pulsar/consumer_partition.go                       |  2 +-
 pulsar/internal/batch_builder.go                   | 25 ++++++++++---
 pulsar/internal/buffer.go                          | 13 ++++---
 pulsar/internal/commands.go                        | 31 ++++++++++------
 pulsar/internal/compression/compression.go         |  9 +++--
 .../internal/compression/compression_bench_test.go | 12 ++++--
 pulsar/internal/compression/compression_test.go    | 24 ++++++++++--
 pulsar/internal/compression/lz4.go                 | 43 ++++++++++++++++------
 pulsar/internal/compression/noop.go                | 28 ++++++++++++--
 pulsar/internal/compression/zlib.go                | 26 ++++++++-----
 pulsar/internal/compression/zstd_cgo.go            | 12 ++++--
 pulsar/internal/compression/zstd_go.go             | 24 +++++++-----
 pulsar/internal/connection.go                      | 35 +++++++++++++-----
 pulsar/producer_partition.go                       |  9 ++++-
 15 files changed, 212 insertions(+), 86 deletions(-)

diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 8917c46..308c8e0 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -21,6 +21,7 @@ import (
 	"fmt"
 	"io/ioutil"
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 )
@@ -34,7 +35,8 @@ func TestClient(t *testing.T) {
 
 func TestTLSConnectionCAError(t *testing.T) {
 	client, err := NewClient(ClientOptions{
-		URL: serviceURLTLS,
+		URL:              serviceURLTLS,
+		OperationTimeout: 5 * time.Second,
 	})
 	assert.NoError(t, err)
 
@@ -105,6 +107,7 @@ func TestTLSConnectionHostNameVerification(t *testing.T) {
 func TestTLSConnectionHostNameVerificationError(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL:                   "pulsar+ssl://127.0.0.1:6651",
+		OperationTimeout:      5 * time.Second,
 		TLSTrustCertsFilePath: caCertsPath,
 		TLSValidateHostname:   true,
 	})
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b7bf261..dc4607b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -859,7 +859,7 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int
 		pc.compressionProviders[msgMeta.GetCompression()] = provider
 	}
 
-	uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
+	uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
 	if err != nil {
 		return nil, err
 	}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 61c5ced..db6074e 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -35,6 +35,10 @@ const (
 	DefaultMaxMessagesPerBatch = 1000
 )
 
+type ConnectionHolder interface {
+	GetConnection() Connection
+}
+
 // BatchBuilder wraps the objects needed to build a batch.
 type BatchBuilder struct {
 	buffer Buffer
@@ -58,11 +62,13 @@ type BatchBuilder struct {
 	callbacks   []interface{}
 
 	compressionProvider compression.Provider
+	cnxHolder           ConnectionHolder
 }
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
 func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
-	compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) {
+	compressionType pb.CompressionType, level compression.Level,
+	cnxHolder ConnectionHolder) (*BatchBuilder, error) {
 	if maxMessages == 0 {
 		maxMessages = DefaultMaxMessagesPerBatch
 	}
@@ -85,6 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
 		},
 		callbacks:           []interface{}{},
 		compressionProvider: getCompressionProvider(compressionType, level),
+		cnxHolder:           cnxHolder,
 	}
 
 	if compressionType != pb.CompressionType_NONE {
@@ -149,7 +156,7 @@ func (bb *BatchBuilder) reset() {
 }
 
 // Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
-func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) {
+func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) {
 	if bb.numMessages == 0 {
 		// No-Op for empty batch
 		return nil, 0, nil
@@ -160,16 +167,22 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
 	bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
 
 	uncompressedSize := bb.buffer.ReadableBytes()
-	compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
 	bb.msgMetadata.UncompressedSize = &uncompressedSize
 
-	buffer := NewBuffer(4096)
-	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
+	cnx := bb.cnxHolder.GetConnection()
+	var buffer Buffer
+	if cnx == nil {
+		buffer = NewBuffer(int(uncompressedSize))
+	} else {
+		buffer = cnx.GetBufferFromPool()
+	}
+
+	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)
 
 	callbacks = bb.callbacks
 	sequenceID = bb.cmdSend.Send.GetSequenceId()
 	bb.reset()
-	return buffer.ReadableSlice(), sequenceID, callbacks
+	return buffer, sequenceID, callbacks
 }
 
 func (bb *BatchBuilder) Close() error {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index e4abac9..c6d007d 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -63,6 +63,7 @@ type Buffer interface {
 	PutUint32(n uint32, writerIdx uint32)
 
 	Resize(newSize uint32)
+	ResizeIfNeeded(spaceNeeded uint32)
 
 	// Clear will clear the current buffer data.
 	Clear()
@@ -154,9 +155,9 @@ func (b *buffer) Resize(newSize uint32) {
 	b.writerIdx = size
 }
 
-func (b *buffer) resizeIfNeeded(spaceNeeded int) {
-	if b.WritableBytes() < uint32(spaceNeeded) {
-		capacityNeeded := uint32(cap(b.data) + spaceNeeded)
+func (b *buffer) ResizeIfNeeded(spaceNeeded uint32) {
+	if b.WritableBytes() < spaceNeeded {
+		capacityNeeded := uint32(cap(b.data)) + spaceNeeded
 		minCapacityIncrease := uint32(cap(b.data) * 3 / 2)
 		if capacityNeeded < minCapacityIncrease {
 			capacityNeeded = minCapacityIncrease
@@ -174,7 +175,7 @@ func (b *buffer) ReadUint16() uint16 {
 }
 
 func (b *buffer) WriteUint32(n uint32) {
-	b.resizeIfNeeded(4)
+	b.ResizeIfNeeded(4)
 	binary.BigEndian.PutUint32(b.WritableSlice(), n)
 	b.writerIdx += 4
 }
@@ -184,13 +185,13 @@ func (b *buffer) PutUint32(n uint32, idx uint32) {
 }
 
 func (b *buffer) WriteUint16(n uint16) {
-	b.resizeIfNeeded(2)
+	b.ResizeIfNeeded(2)
 	binary.BigEndian.PutUint16(b.WritableSlice(), n)
 	b.writerIdx += 2
 }
 
 func (b *buffer) Write(s []byte) {
-	b.resizeIfNeeded(len(s))
+	b.ResizeIfNeeded(uint32(len(s)))
 	copy(b.WritableSlice(), s)
 	b.writerIdx += uint32(len(s))
 }
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 1ee3517..0fb1125 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -21,6 +21,8 @@ import (
 	"errors"
 	"fmt"
 
+	"github.com/apache/pulsar-client-go/pulsar/internal/compression"
+
 	"github.com/golang/protobuf/proto"
 
 	log "github.com/sirupsen/logrus"
@@ -211,19 +213,17 @@ func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
 	wb.Write(payload)
 }
 
-func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message, payload []byte) {
+func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
+	uncompressedPayload Buffer,
+	compressionProvider compression.Provider) {
 	// Wire format
 	// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
 	cmdSize := proto.Size(cmdSend)
 	msgMetadataSize := proto.Size(msgMetadata)
-	payloadSize := len(payload)
-
-	magicAndChecksumLength := 2 + 4 /* magic + checksumLength */
-	headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize
-	// cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize
-	totalSize := headerContentSize + payloadSize
 
-	wb.WriteUint32(uint32(totalSize)) // External frame
+	frameSizeIdx := wb.WriterIndex()
+	wb.WriteUint32(0) // Skip frame size until we now the size
+	frameStartIdx := wb.WriterIndex()
 
 	// Write cmd
 	wb.WriteUint32(uint32(cmdSize))
@@ -248,13 +248,20 @@ func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
 	}
 
 	wb.Write(serialized)
-	wb.Write(payload)
+
+	// Make sure the buffer has enough space to hold the compressed data
+	// and perform the compression in-place
+	maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
+	wb.ResizeIfNeeded(maxSize)
+	b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
+	wb.WrittenBytes(uint32(len(b)))
 
 	// Write checksum at created checksum-placeholder
-	endIdx := wb.WriterIndex()
-	checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx))
+	frameEndIdx := wb.WriterIndex()
+	checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))
 
-	// set computed checksum
+	// Set Sizes and checksum in the fixed-size header
+	wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
 	wb.PutUint32(checksum, checksumIdx)
 }
 
diff --git a/pulsar/internal/compression/compression.go b/pulsar/internal/compression/compression.go
index 90adeeb..a4504bf 100644
--- a/pulsar/internal/compression/compression.go
+++ b/pulsar/internal/compression/compression.go
@@ -29,15 +29,18 @@ const (
 
 // Provider is a interface of compression providers
 type Provider interface {
+	// Return the max possible size for a compressed buffer given the uncompressed data size
+	CompressMaxSize(originalSize int) int
+
 	// Compress a []byte, the param is a []byte with the uncompressed content.
 	// The reader/writer indexes will not be modified. The return is a []byte
 	// with the compressed content.
-	Compress(data []byte) []byte
+	Compress(dst, src []byte) []byte
 
 	// Decompress a []byte. The buffer needs to have been compressed with the matching Encoder.
-	// The compressedData is compressed content, originalSize is the size of the original content.
+	// The src is compressed content. If dst is passed, the decompressed data will be written there
 	// The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
-	Decompress(compressedData []byte, originalSize int) ([]byte, error)
+	Decompress(dst, src []byte, originalSize int) ([]byte, error)
 
 	// Returns a new instance of the same provider, with the same exact configuration
 	Clone() Provider
diff --git a/pulsar/internal/compression/compression_bench_test.go b/pulsar/internal/compression/compression_bench_test.go
index 1f40321..de0ca48 100644
--- a/pulsar/internal/compression/compression_bench_test.go
+++ b/pulsar/internal/compression/compression_bench_test.go
@@ -33,11 +33,12 @@ func testCompression(b *testing.B, provider Provider) {
 	}
 
 	dataLen := int64(len(data))
+	compressed := make([]byte, 1024*1024)
 
 	b.ResetTimer()
 
 	for i := 0; i < b.N; i++ {
-		provider.Compress(data)
+		provider.Compress(compressed[:0], data)
 		b.SetBytes(dataLen)
 	}
 }
@@ -49,14 +50,15 @@ func testDecompression(b *testing.B, provider Provider) {
 		b.Error(err)
 	}
 
-	dataCompressed := provider.Compress(data)
+	dataCompressed := provider.Compress(nil, data)
+	dataDecompressed := make([]byte, 1024*1024)
 
 	dataLen := int64(len(data))
 
 	b.ResetTimer()
 
 	for i := 0; i < b.N; i++ {
-		provider.Decompress(dataCompressed, int(dataLen))
+		provider.Decompress(dataDecompressed[:0], dataCompressed, int(dataLen))
 		b.SetBytes(dataLen)
 	}
 }
@@ -108,8 +110,10 @@ func BenchmarkCompressionParallel(b *testing.B) {
 		b.Run(p.name, func(b *testing.B) {
 			b.RunParallel(func(pb *testing.PB) {
 				localProvider := p.provider.Clone()
+				compressed := make([]byte, 1024*1024)
+
 				for pb.Next() {
-					localProvider.Compress(data)
+					localProvider.Compress(compressed[:0], data)
 					b.SetBytes(dataLen)
 				}
 			})
diff --git a/pulsar/internal/compression/compression_test.go b/pulsar/internal/compression/compression_test.go
index cfb00b2..7155e31 100644
--- a/pulsar/internal/compression/compression_test.go
+++ b/pulsar/internal/compression/compression_test.go
@@ -43,8 +43,24 @@ func TestCompression(t *testing.T) {
 		p := provider
 		t.Run(p.name, func(t *testing.T) {
 			hello := []byte("test compression data")
-			compressed := p.provider.Compress(hello)
-			uncompressed, err := p.provider.Decompress(compressed, len(hello))
+			compressed := make([]byte, 1024)
+			compressed = p.provider.Compress(compressed, hello)
+
+			uncompressed := make([]byte, 1024)
+			uncompressed, err := p.provider.Decompress(uncompressed, compressed, len(hello))
+			assert.Nil(t, err)
+			assert.ElementsMatch(t, hello, uncompressed)
+		})
+	}
+}
+
+func TestCompressionNoBuffers(t *testing.T) {
+	for _, provider := range providers {
+		p := provider
+		t.Run(p.name, func(t *testing.T) {
+			hello := []byte("test compression data")
+			compressed := p.provider.Compress(nil, hello)
+			uncompressed, err := p.provider.Decompress(nil, compressed, len(hello))
 			assert.Nil(t, err)
 			assert.ElementsMatch(t, hello, uncompressed)
 		})
@@ -56,7 +72,7 @@ func TestJavaCompatibility(t *testing.T) {
 		p := provider
 		t.Run(p.name, func(t *testing.T) {
 			hello := []byte("hello")
-			uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello))
+			uncompressed, err := p.provider.Decompress(nil, p.compressedHello, len(hello))
 			assert.Nil(t, err)
 			assert.ElementsMatch(t, hello, uncompressed)
 		})
@@ -67,7 +83,7 @@ func TestDecompressionError(t *testing.T) {
 	for _, provider := range providers {
 		p := provider
 		t.Run(p.name, func(t *testing.T) {
-			_, err := p.provider.Decompress([]byte{0x05}, 10)
+			_, err := p.provider.Decompress(nil, []byte{0x05}, 10)
 			assert.NotNil(t, err)
 		})
 	}
diff --git a/pulsar/internal/compression/lz4.go b/pulsar/internal/compression/lz4.go
index 5a1f0f4..745ea83 100644
--- a/pulsar/internal/compression/lz4.go
+++ b/pulsar/internal/compression/lz4.go
@@ -21,6 +21,10 @@ import (
 	"github.com/pierrec/lz4"
 )
 
+const (
+	minLz4DestinationBufferSize = 1024 * 1024
+)
+
 type lz4Provider struct {
 	hashTable []int
 }
@@ -34,10 +38,23 @@ func NewLz4Provider() Provider {
 	}
 }
 
-func (l *lz4Provider) Compress(data []byte) []byte {
+func (l *lz4Provider) CompressMaxSize(originalSize int) int {
+	s := lz4.CompressBlockBound(originalSize)
+	if s < minLz4DestinationBufferSize {
+		return minLz4DestinationBufferSize
+	}
+
+	return s
+}
+
+func (l *lz4Provider) Compress(dst, data []byte) []byte {
 	maxSize := lz4.CompressBlockBound(len(data))
-	compressed := make([]byte, maxSize)
-	size, err := lz4.CompressBlock(data, compressed, l.hashTable)
+	if cap(dst) >= maxSize {
+		dst = dst[0:maxSize] // Reuse dst buffer
+	} else {
+		dst = make([]byte, maxSize)
+	}
+	size, err := lz4.CompressBlock(data, dst, l.hashTable)
 	if err != nil {
 		panic("Failed to compress")
 	}
@@ -45,11 +62,11 @@ func (l *lz4Provider) Compress(data []byte) []byte {
 	if size == 0 {
 		// The data block was not compressed. Just repeat it with
 		// the block header flag to signal it's not compressed
-		headerSize := writeSize(len(data), compressed)
-		copy(compressed[headerSize:], data)
-		return compressed[:len(data)+headerSize]
+		headerSize := writeSize(len(data), dst)
+		copy(dst[headerSize:], data)
+		return dst[:len(data)+headerSize]
 	}
-	return compressed[:size]
+	return dst[:size]
 }
 
 // Write the encoded size for the uncompressed payload
@@ -69,10 +86,14 @@ func writeSize(size int, dst []byte) int {
 	return i + 1
 }
 
-func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
-	uncompressed := make([]byte, originalSize)
-	_, err := lz4.UncompressBlock(compressedData, uncompressed)
-	return uncompressed, err
+func (lz4Provider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+	if cap(dst) >= originalSize {
+		dst = dst[0:originalSize] // Reuse dst buffer
+	} else {
+		dst = make([]byte, originalSize)
+	}
+	_, err := lz4.UncompressBlock(src, dst)
+	return dst, err
 }
 
 func (lz4Provider) Close() error {
diff --git a/pulsar/internal/compression/noop.go b/pulsar/internal/compression/noop.go
index 48318c5..78acb52 100644
--- a/pulsar/internal/compression/noop.go
+++ b/pulsar/internal/compression/noop.go
@@ -17,6 +17,10 @@
 
 package compression
 
+import (
+	"bytes"
+)
+
 type noopProvider struct{}
 
 // NewNoopProvider returns a Provider interface that does not compress the data
@@ -24,12 +28,28 @@ func NewNoopProvider() Provider {
 	return &noopProvider{}
 }
 
-func (noopProvider) Compress(data []byte) []byte {
-	return data
+func (noopProvider) CompressMaxSize(originalSize int) int {
+	return originalSize
 }
 
-func (noopProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
-	return compressedData, nil
+func (noopProvider) Compress(dst, src []byte) []byte {
+	if dst == nil {
+		dst = make([]byte, len(src))
+	}
+
+	b := bytes.NewBuffer(dst[:0])
+	b.Write(src)
+	return dst[:len(src)]
+}
+
+func (noopProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+	if dst == nil {
+		dst = make([]byte, len(src))
+	}
+
+	b := bytes.NewBuffer(dst[:0])
+	b.Write(src)
+	return dst[:len(src)], nil
 }
 
 func (noopProvider) Close() error {
diff --git a/pulsar/internal/compression/zlib.go b/pulsar/internal/compression/zlib.go
index fe25206..44c4566 100644
--- a/pulsar/internal/compression/zlib.go
+++ b/pulsar/internal/compression/zlib.go
@@ -30,11 +30,15 @@ func NewZLibProvider() Provider {
 	return &zlibProvider{}
 }
 
-func (zlibProvider) Compress(data []byte) []byte {
-	var b bytes.Buffer
-	w := zlib.NewWriter(&b)
+func (zlibProvider) CompressMaxSize(originalSize int) int {
+	return int(float32(originalSize) * 1.10)
+}
+
+func (zlibProvider) Compress(dst, src []byte) []byte {
+	var b = bytes.NewBuffer(dst[:0])
+	w := zlib.NewWriter(b)
 
-	if _, err := w.Write(data); err != nil {
+	if _, err := w.Write(src); err != nil {
 		return nil
 	}
 	if err := w.Close(); err != nil {
@@ -44,14 +48,18 @@ func (zlibProvider) Compress(data []byte) []byte {
 	return b.Bytes()
 }
 
-func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
-	r, err := zlib.NewReader(bytes.NewReader(compressedData))
+func (zlibProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+	r, err := zlib.NewReader(bytes.NewReader(src))
 	if err != nil {
 		return nil, err
 	}
 
-	uncompressed := make([]byte, originalSize)
-	if _, err = io.ReadFull(r, uncompressed); err != nil {
+	if cap(dst) >= originalSize {
+		dst = dst[0:originalSize] // Reuse dst buffer
+	} else {
+		dst = make([]byte, originalSize)
+	}
+	if _, err = io.ReadFull(r, dst); err != nil {
 		return nil, err
 	}
 
@@ -59,7 +67,7 @@ func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte,
 		return nil, err
 	}
 
-	return uncompressed, nil
+	return dst, nil
 }
 
 func (zlibProvider) Clone() Provider {
diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go
index 9c43cc4..9611aee 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -55,8 +55,12 @@ func NewZStdProvider(level Level) Provider {
 	return newCGoZStdProvider(level)
 }
 
-func (z *zstdCGoProvider) Compress(data []byte) []byte {
-	out, err := z.ctx.CompressLevel(nil, data, z.zstdLevel)
+func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int {
+	return zstd.CompressBound(originalSize)
+}
+
+func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
+	out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel)
 	if err != nil {
 		log.WithError(err).Fatal("Failed to compress")
 	}
@@ -64,8 +68,8 @@ func (z *zstdCGoProvider) Compress(data []byte) []byte {
 	return out
 }
 
-func (z *zstdCGoProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
-	return z.ctx.Decompress(nil, compressedData)
+func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+	return z.ctx.Decompress(dst, src)
 }
 
 func (z *zstdCGoProvider) Close() error {
diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go
index 06f203e..ae85078 100644
--- a/pulsar/internal/compression/zstd_go.go
+++ b/pulsar/internal/compression/zstd_go.go
@@ -19,7 +19,6 @@ package compression
 
 import (
 	"github.com/klauspost/compress/zstd"
-	"github.com/pkg/errors"
 )
 
 type zstdProvider struct {
@@ -44,16 +43,23 @@ func newPureGoZStdProvider(level Level) Provider {
 	return p
 }
 
-func (p *zstdProvider) Compress(data []byte) []byte {
-	return p.encoder.EncodeAll(data, []byte{})
+func (p *zstdProvider) CompressMaxSize(srcSize int) int {
+	// from zstd.h
+	// this formula ensures that bound(A) + bound(B) <= bound(A+B) as long as A and B >= 128 KB
+	lowLimit := 128 << 10 // 128 kB
+	var margin int
+	if srcSize < lowLimit {
+		margin = (lowLimit - srcSize) >> 11
+	}
+	return srcSize + (srcSize >> 8) + margin
 }
 
-func (p *zstdProvider) Decompress(compressedData []byte, originalSize int) (dst []byte, err error) {
-	dst, err = p.decoder.DecodeAll(compressedData, nil)
-	if err == nil && len(dst) != originalSize {
-		return nil, errors.New("Invalid uncompressed size")
-	}
-	return
+func (p *zstdProvider) Compress(dst, src []byte) []byte {
+	return p.encoder.EncodeAll(src, dst)
+}
+
+func (p *zstdProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+	return p.decoder.DecodeAll(src, dst)
 }
 
 func (p *zstdProvider) Close() error {
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 573e607..9e6d79f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -65,13 +65,14 @@ type ConnectionListener interface {
 type Connection interface {
 	SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
 	SendRequestNoWait(req *pb.BaseCommand)
-	WriteData(data []byte)
+	WriteData(data Buffer)
 	RegisterListener(id uint64, listener ConnectionListener)
 	UnregisterListener(id uint64)
 	AddConsumeHandler(id uint64, handler ConsumerHandler)
 	DeleteConsumeHandler(id uint64)
 	ID() string
 	GetMaxMessageSize() int32
+	GetBufferFromPool() Buffer
 	Close()
 }
 
@@ -148,7 +149,7 @@ type connection struct {
 	incomingRequestsCh chan *request
 	incomingCmdCh      chan *incomingCmd
 	closeCh            chan interface{}
-	writeRequestsCh    chan []byte
+	writeRequestsCh    chan Buffer
 
 	pendingReqs map[uint64]*request
 	listeners   map[uint64]ConnectionListener
@@ -160,6 +161,8 @@ type connection struct {
 	auth       auth.Provider
 
 	maxMessageSize int32
+
+	buffersPool sync.Pool
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
@@ -187,9 +190,14 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
 		// partition produces writing on a single connection. In general it's
 		// good to keep this above the number of partition producers assigned
 		// to a single connection.
-		writeRequestsCh:  make(chan []byte, 256),
+		writeRequestsCh:  make(chan Buffer, 256),
 		listeners:        make(map[uint64]ConnectionListener),
 		consumerHandlers: make(map[uint64]ConsumerHandler),
+		buffersPool: sync.Pool{
+			New: func() interface{} {
+				return NewBuffer(1024)
+			},
+		},
 	}
 	cnx.reader = newConnectionReader(cnx)
 	cnx.cond = sync.NewCond(cnx)
@@ -344,6 +352,8 @@ func (c *connection) run() {
 				return
 			}
 			c.internalWriteData(data)
+			// Return buffer to the pool since we're now done using it
+			c.buffersPool.Put(data)
 
 		case <-c.pingTicker.C:
 			c.sendPing()
@@ -368,13 +378,13 @@ func (c *connection) runPingCheck() {
 	}
 }
 
-func (c *connection) WriteData(data []byte) {
+func (c *connection) WriteData(data Buffer) {
 	c.writeRequestsCh <- data
 }
 
-func (c *connection) internalWriteData(data []byte) {
-	c.log.Debug("Write data: ", len(data))
-	if _, err := c.cnx.Write(data); err != nil {
+func (c *connection) internalWriteData(data Buffer) {
+	c.log.Debug("Write data: ", data.ReadableBytes())
+	if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
 		c.log.WithError(err).Warn("Failed to write on connection")
 		c.Close()
 	}
@@ -398,8 +408,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
 	}
 
 	c.writeBuffer.Write(serialized)
-	data := c.writeBuffer.ReadableSlice()
-	c.internalWriteData(data)
+	c.internalWriteData(c.writeBuffer)
 }
 
 func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
@@ -469,7 +478,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
 	}
 }
 
-func (c *connection) Write(data []byte) {
+func (c *connection) Write(data Buffer) {
 	c.writeRequestsCh <- data
 }
 
@@ -768,3 +777,9 @@ func (c *connection) ID() string {
 func (c *connection) GetMaxMessageSize() int32 {
 	return c.maxMessageSize
 }
+
+func (c *connection) GetBufferFromPool() Buffer {
+	b := c.buffersPool.Get().(Buffer)
+	b.Clear()
+	return b
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 32b0da0..b8bdd0d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -155,7 +155,8 @@ func (p *partitionProducer) grabCnx() error {
 	if p.batchBuilder == nil {
 		p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
 			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
-			compression.Level(p.options.CompressionLevel))
+			compression.Level(p.options.CompressionLevel),
+			p)
 		if err != nil {
 			return err
 		}
@@ -180,6 +181,10 @@ func (p *partitionProducer) grabCnx() error {
 
 type connectionClosed struct{}
 
+func (p *partitionProducer) GetConnection() internal.Connection {
+	return p.cnx
+}
+
 func (p *partitionProducer) ConnectionClosed() {
 	// Trigger reconnection in the produce goroutine
 	p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
@@ -310,7 +315,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 
 type pendingItem struct {
 	sync.Mutex
-	batchData    []byte
+	batchData    internal.Buffer
 	sequenceID   uint64
 	sendRequests []interface{}
 	completed    bool