You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/22 16:05:06 UTC
[pulsar-client-go] branch master updated: Used pooled buffering for
compression and batch serialization (#292)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 2d66c7b Used pooled buffering for compression and batch serialization (#292)
2d66c7b is described below
commit 2d66c7bf49c7a36ff8412d4b139cdd62dc5b26cc
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Jun 22 09:04:55 2020 -0700
Used pooled buffering for compression and batch serialization (#292)
---
pulsar/client_impl_test.go | 5 ++-
pulsar/consumer_partition.go | 2 +-
pulsar/internal/batch_builder.go | 25 ++++++++++---
pulsar/internal/buffer.go | 13 ++++---
pulsar/internal/commands.go | 31 ++++++++++------
pulsar/internal/compression/compression.go | 9 +++--
.../internal/compression/compression_bench_test.go | 12 ++++--
pulsar/internal/compression/compression_test.go | 24 ++++++++++--
pulsar/internal/compression/lz4.go | 43 ++++++++++++++++------
pulsar/internal/compression/noop.go | 28 ++++++++++++--
pulsar/internal/compression/zlib.go | 26 ++++++++-----
pulsar/internal/compression/zstd_cgo.go | 12 ++++--
pulsar/internal/compression/zstd_go.go | 24 +++++++-----
pulsar/internal/connection.go | 35 +++++++++++++-----
pulsar/producer_partition.go | 9 ++++-
15 files changed, 212 insertions(+), 86 deletions(-)
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 8917c46..308c8e0 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -21,6 +21,7 @@ import (
"fmt"
"io/ioutil"
"testing"
+ "time"
"github.com/stretchr/testify/assert"
)
@@ -34,7 +35,8 @@ func TestClient(t *testing.T) {
func TestTLSConnectionCAError(t *testing.T) {
client, err := NewClient(ClientOptions{
- URL: serviceURLTLS,
+ URL: serviceURLTLS,
+ OperationTimeout: 5 * time.Second,
})
assert.NoError(t, err)
@@ -105,6 +107,7 @@ func TestTLSConnectionHostNameVerification(t *testing.T) {
func TestTLSConnectionHostNameVerificationError(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar+ssl://127.0.0.1:6651",
+ OperationTimeout: 5 * time.Second,
TLSTrustCertsFilePath: caCertsPath,
TLSValidateHostname: true,
})
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index b7bf261..dc4607b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -859,7 +859,7 @@ func (pc *partitionConsumer) Decompress(msgMeta *pb.MessageMetadata, payload int
pc.compressionProviders[msgMeta.GetCompression()] = provider
}
- uncompressed, err := provider.Decompress(payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
+ uncompressed, err := provider.Decompress(nil, payload.ReadableSlice(), int(msgMeta.GetUncompressedSize()))
if err != nil {
return nil, err
}
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 61c5ced..db6074e 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -35,6 +35,10 @@ const (
DefaultMaxMessagesPerBatch = 1000
)
+type ConnectionHolder interface {
+ GetConnection() Connection
+}
+
// BatchBuilder wraps the objects needed to build a batch.
type BatchBuilder struct {
buffer Buffer
@@ -58,11 +62,13 @@ type BatchBuilder struct {
callbacks []interface{}
compressionProvider compression.Provider
+ cnxHolder ConnectionHolder
}
// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
- compressionType pb.CompressionType, level compression.Level) (*BatchBuilder, error) {
+ compressionType pb.CompressionType, level compression.Level,
+ cnxHolder ConnectionHolder) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
@@ -85,6 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
},
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType, level),
+ cnxHolder: cnxHolder,
}
if compressionType != pb.CompressionType_NONE {
@@ -149,7 +156,7 @@ func (bb *BatchBuilder) reset() {
}
// Flush all the messages buffered in the client and wait until all messages have been successfully persisted.
-func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks []interface{}) {
+func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks []interface{}) {
if bb.numMessages == 0 {
// No-Op for empty batch
return nil, 0, nil
@@ -160,16 +167,22 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceID uint64, callbacks
bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
uncompressedSize := bb.buffer.ReadableBytes()
- compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
bb.msgMetadata.UncompressedSize = &uncompressedSize
- buffer := NewBuffer(4096)
- serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
+ cnx := bb.cnxHolder.GetConnection()
+ var buffer Buffer
+ if cnx == nil {
+ buffer = NewBuffer(int(uncompressedSize))
+ } else {
+ buffer = cnx.GetBufferFromPool()
+ }
+
+ serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)
callbacks = bb.callbacks
sequenceID = bb.cmdSend.Send.GetSequenceId()
bb.reset()
- return buffer.ReadableSlice(), sequenceID, callbacks
+ return buffer, sequenceID, callbacks
}
func (bb *BatchBuilder) Close() error {
diff --git a/pulsar/internal/buffer.go b/pulsar/internal/buffer.go
index e4abac9..c6d007d 100644
--- a/pulsar/internal/buffer.go
+++ b/pulsar/internal/buffer.go
@@ -63,6 +63,7 @@ type Buffer interface {
PutUint32(n uint32, writerIdx uint32)
Resize(newSize uint32)
+ ResizeIfNeeded(spaceNeeded uint32)
// Clear will clear the current buffer data.
Clear()
@@ -154,9 +155,9 @@ func (b *buffer) Resize(newSize uint32) {
b.writerIdx = size
}
-func (b *buffer) resizeIfNeeded(spaceNeeded int) {
- if b.WritableBytes() < uint32(spaceNeeded) {
- capacityNeeded := uint32(cap(b.data) + spaceNeeded)
+func (b *buffer) ResizeIfNeeded(spaceNeeded uint32) {
+ if b.WritableBytes() < spaceNeeded {
+ capacityNeeded := uint32(cap(b.data)) + spaceNeeded
minCapacityIncrease := uint32(cap(b.data) * 3 / 2)
if capacityNeeded < minCapacityIncrease {
capacityNeeded = minCapacityIncrease
@@ -174,7 +175,7 @@ func (b *buffer) ReadUint16() uint16 {
}
func (b *buffer) WriteUint32(n uint32) {
- b.resizeIfNeeded(4)
+ b.ResizeIfNeeded(4)
binary.BigEndian.PutUint32(b.WritableSlice(), n)
b.writerIdx += 4
}
@@ -184,13 +185,13 @@ func (b *buffer) PutUint32(n uint32, idx uint32) {
}
func (b *buffer) WriteUint16(n uint16) {
- b.resizeIfNeeded(2)
+ b.ResizeIfNeeded(2)
binary.BigEndian.PutUint16(b.WritableSlice(), n)
b.writerIdx += 2
}
func (b *buffer) Write(s []byte) {
- b.resizeIfNeeded(len(s))
+ b.ResizeIfNeeded(uint32(len(s)))
copy(b.WritableSlice(), s)
b.writerIdx += uint32(len(s))
}
diff --git a/pulsar/internal/commands.go b/pulsar/internal/commands.go
index 1ee3517..0fb1125 100644
--- a/pulsar/internal/commands.go
+++ b/pulsar/internal/commands.go
@@ -21,6 +21,8 @@ import (
"errors"
"fmt"
+ "github.com/apache/pulsar-client-go/pulsar/internal/compression"
+
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
@@ -211,19 +213,17 @@ func addSingleMessageToBatch(wb Buffer, smm proto.Message, payload []byte) {
wb.Write(payload)
}
-func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message, payload []byte) {
+func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
+ uncompressedPayload Buffer,
+ compressionProvider compression.Provider) {
// Wire format
// [TOTAL_SIZE] [CMD_SIZE][CMD] [MAGIC_NUMBER][CHECKSUM] [METADATA_SIZE][METADATA] [PAYLOAD]
cmdSize := proto.Size(cmdSend)
msgMetadataSize := proto.Size(msgMetadata)
- payloadSize := len(payload)
-
- magicAndChecksumLength := 2 + 4 /* magic + checksumLength */
- headerContentSize := 4 + cmdSize + magicAndChecksumLength + 4 + msgMetadataSize
- // cmdLength + cmdSize + magicLength + checksumSize + msgMetadataLength + msgMetadataSize
- totalSize := headerContentSize + payloadSize
- wb.WriteUint32(uint32(totalSize)) // External frame
+ frameSizeIdx := wb.WriterIndex()
+ wb.WriteUint32(0) // Skip frame size until we now the size
+ frameStartIdx := wb.WriterIndex()
// Write cmd
wb.WriteUint32(uint32(cmdSize))
@@ -248,13 +248,20 @@ func serializeBatch(wb Buffer, cmdSend proto.Message, msgMetadata proto.Message,
}
wb.Write(serialized)
- wb.Write(payload)
+
+ // Make sure the buffer has enough space to hold the compressed data
+ // and perform the compression in-place
+ maxSize := uint32(compressionProvider.CompressMaxSize(int(uncompressedPayload.ReadableBytes())))
+ wb.ResizeIfNeeded(maxSize)
+ b := compressionProvider.Compress(wb.WritableSlice()[:0], uncompressedPayload.ReadableSlice())
+ wb.WrittenBytes(uint32(len(b)))
// Write checksum at created checksum-placeholder
- endIdx := wb.WriterIndex()
- checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, endIdx-metadataStartIdx))
+ frameEndIdx := wb.WriterIndex()
+ checksum := Crc32cCheckSum(wb.Get(metadataStartIdx, frameEndIdx-metadataStartIdx))
- // set computed checksum
+ // Set Sizes and checksum in the fixed-size header
+ wb.PutUint32(frameEndIdx-frameStartIdx, frameSizeIdx) // External frame
wb.PutUint32(checksum, checksumIdx)
}
diff --git a/pulsar/internal/compression/compression.go b/pulsar/internal/compression/compression.go
index 90adeeb..a4504bf 100644
--- a/pulsar/internal/compression/compression.go
+++ b/pulsar/internal/compression/compression.go
@@ -29,15 +29,18 @@ const (
// Provider is a interface of compression providers
type Provider interface {
+ // Return the max possible size for a compressed buffer given the uncompressed data size
+ CompressMaxSize(originalSize int) int
+
// Compress a []byte, the param is a []byte with the uncompressed content.
// The reader/writer indexes will not be modified. The return is a []byte
// with the compressed content.
- Compress(data []byte) []byte
+ Compress(dst, src []byte) []byte
// Decompress a []byte. The buffer needs to have been compressed with the matching Encoder.
- // The compressedData is compressed content, originalSize is the size of the original content.
+ // The src is compressed content. If dst is passed, the decompressed data will be written there
// The return were the result will be passed, if err is nil, the buffer was decompressed, no nil otherwise.
- Decompress(compressedData []byte, originalSize int) ([]byte, error)
+ Decompress(dst, src []byte, originalSize int) ([]byte, error)
// Returns a new instance of the same provider, with the same exact configuration
Clone() Provider
diff --git a/pulsar/internal/compression/compression_bench_test.go b/pulsar/internal/compression/compression_bench_test.go
index 1f40321..de0ca48 100644
--- a/pulsar/internal/compression/compression_bench_test.go
+++ b/pulsar/internal/compression/compression_bench_test.go
@@ -33,11 +33,12 @@ func testCompression(b *testing.B, provider Provider) {
}
dataLen := int64(len(data))
+ compressed := make([]byte, 1024*1024)
b.ResetTimer()
for i := 0; i < b.N; i++ {
- provider.Compress(data)
+ provider.Compress(compressed[:0], data)
b.SetBytes(dataLen)
}
}
@@ -49,14 +50,15 @@ func testDecompression(b *testing.B, provider Provider) {
b.Error(err)
}
- dataCompressed := provider.Compress(data)
+ dataCompressed := provider.Compress(nil, data)
+ dataDecompressed := make([]byte, 1024*1024)
dataLen := int64(len(data))
b.ResetTimer()
for i := 0; i < b.N; i++ {
- provider.Decompress(dataCompressed, int(dataLen))
+ provider.Decompress(dataDecompressed[:0], dataCompressed, int(dataLen))
b.SetBytes(dataLen)
}
}
@@ -108,8 +110,10 @@ func BenchmarkCompressionParallel(b *testing.B) {
b.Run(p.name, func(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
localProvider := p.provider.Clone()
+ compressed := make([]byte, 1024*1024)
+
for pb.Next() {
- localProvider.Compress(data)
+ localProvider.Compress(compressed[:0], data)
b.SetBytes(dataLen)
}
})
diff --git a/pulsar/internal/compression/compression_test.go b/pulsar/internal/compression/compression_test.go
index cfb00b2..7155e31 100644
--- a/pulsar/internal/compression/compression_test.go
+++ b/pulsar/internal/compression/compression_test.go
@@ -43,8 +43,24 @@ func TestCompression(t *testing.T) {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("test compression data")
- compressed := p.provider.Compress(hello)
- uncompressed, err := p.provider.Decompress(compressed, len(hello))
+ compressed := make([]byte, 1024)
+ compressed = p.provider.Compress(compressed, hello)
+
+ uncompressed := make([]byte, 1024)
+ uncompressed, err := p.provider.Decompress(uncompressed, compressed, len(hello))
+ assert.Nil(t, err)
+ assert.ElementsMatch(t, hello, uncompressed)
+ })
+ }
+}
+
+func TestCompressionNoBuffers(t *testing.T) {
+ for _, provider := range providers {
+ p := provider
+ t.Run(p.name, func(t *testing.T) {
+ hello := []byte("test compression data")
+ compressed := p.provider.Compress(nil, hello)
+ uncompressed, err := p.provider.Decompress(nil, compressed, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
@@ -56,7 +72,7 @@ func TestJavaCompatibility(t *testing.T) {
p := provider
t.Run(p.name, func(t *testing.T) {
hello := []byte("hello")
- uncompressed, err := p.provider.Decompress(p.compressedHello, len(hello))
+ uncompressed, err := p.provider.Decompress(nil, p.compressedHello, len(hello))
assert.Nil(t, err)
assert.ElementsMatch(t, hello, uncompressed)
})
@@ -67,7 +83,7 @@ func TestDecompressionError(t *testing.T) {
for _, provider := range providers {
p := provider
t.Run(p.name, func(t *testing.T) {
- _, err := p.provider.Decompress([]byte{0x05}, 10)
+ _, err := p.provider.Decompress(nil, []byte{0x05}, 10)
assert.NotNil(t, err)
})
}
diff --git a/pulsar/internal/compression/lz4.go b/pulsar/internal/compression/lz4.go
index 5a1f0f4..745ea83 100644
--- a/pulsar/internal/compression/lz4.go
+++ b/pulsar/internal/compression/lz4.go
@@ -21,6 +21,10 @@ import (
"github.com/pierrec/lz4"
)
+const (
+ minLz4DestinationBufferSize = 1024 * 1024
+)
+
type lz4Provider struct {
hashTable []int
}
@@ -34,10 +38,23 @@ func NewLz4Provider() Provider {
}
}
-func (l *lz4Provider) Compress(data []byte) []byte {
+func (l *lz4Provider) CompressMaxSize(originalSize int) int {
+ s := lz4.CompressBlockBound(originalSize)
+ if s < minLz4DestinationBufferSize {
+ return minLz4DestinationBufferSize
+ }
+
+ return s
+}
+
+func (l *lz4Provider) Compress(dst, data []byte) []byte {
maxSize := lz4.CompressBlockBound(len(data))
- compressed := make([]byte, maxSize)
- size, err := lz4.CompressBlock(data, compressed, l.hashTable)
+ if cap(dst) >= maxSize {
+ dst = dst[0:maxSize] // Reuse dst buffer
+ } else {
+ dst = make([]byte, maxSize)
+ }
+ size, err := lz4.CompressBlock(data, dst, l.hashTable)
if err != nil {
panic("Failed to compress")
}
@@ -45,11 +62,11 @@ func (l *lz4Provider) Compress(data []byte) []byte {
if size == 0 {
// The data block was not compressed. Just repeat it with
// the block header flag to signal it's not compressed
- headerSize := writeSize(len(data), compressed)
- copy(compressed[headerSize:], data)
- return compressed[:len(data)+headerSize]
+ headerSize := writeSize(len(data), dst)
+ copy(dst[headerSize:], data)
+ return dst[:len(data)+headerSize]
}
- return compressed[:size]
+ return dst[:size]
}
// Write the encoded size for the uncompressed payload
@@ -69,10 +86,14 @@ func writeSize(size int, dst []byte) int {
return i + 1
}
-func (lz4Provider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
- uncompressed := make([]byte, originalSize)
- _, err := lz4.UncompressBlock(compressedData, uncompressed)
- return uncompressed, err
+func (lz4Provider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+ if cap(dst) >= originalSize {
+ dst = dst[0:originalSize] // Reuse dst buffer
+ } else {
+ dst = make([]byte, originalSize)
+ }
+ _, err := lz4.UncompressBlock(src, dst)
+ return dst, err
}
func (lz4Provider) Close() error {
diff --git a/pulsar/internal/compression/noop.go b/pulsar/internal/compression/noop.go
index 48318c5..78acb52 100644
--- a/pulsar/internal/compression/noop.go
+++ b/pulsar/internal/compression/noop.go
@@ -17,6 +17,10 @@
package compression
+import (
+ "bytes"
+)
+
type noopProvider struct{}
// NewNoopProvider returns a Provider interface that does not compress the data
@@ -24,12 +28,28 @@ func NewNoopProvider() Provider {
return &noopProvider{}
}
-func (noopProvider) Compress(data []byte) []byte {
- return data
+func (noopProvider) CompressMaxSize(originalSize int) int {
+ return originalSize
}
-func (noopProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
- return compressedData, nil
+func (noopProvider) Compress(dst, src []byte) []byte {
+ if dst == nil {
+ dst = make([]byte, len(src))
+ }
+
+ b := bytes.NewBuffer(dst[:0])
+ b.Write(src)
+ return dst[:len(src)]
+}
+
+func (noopProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+ if dst == nil {
+ dst = make([]byte, len(src))
+ }
+
+ b := bytes.NewBuffer(dst[:0])
+ b.Write(src)
+ return dst[:len(src)], nil
}
func (noopProvider) Close() error {
diff --git a/pulsar/internal/compression/zlib.go b/pulsar/internal/compression/zlib.go
index fe25206..44c4566 100644
--- a/pulsar/internal/compression/zlib.go
+++ b/pulsar/internal/compression/zlib.go
@@ -30,11 +30,15 @@ func NewZLibProvider() Provider {
return &zlibProvider{}
}
-func (zlibProvider) Compress(data []byte) []byte {
- var b bytes.Buffer
- w := zlib.NewWriter(&b)
+func (zlibProvider) CompressMaxSize(originalSize int) int {
+ return int(float32(originalSize) * 1.10)
+}
+
+func (zlibProvider) Compress(dst, src []byte) []byte {
+ var b = bytes.NewBuffer(dst[:0])
+ w := zlib.NewWriter(b)
- if _, err := w.Write(data); err != nil {
+ if _, err := w.Write(src); err != nil {
return nil
}
if err := w.Close(); err != nil {
@@ -44,14 +48,18 @@ func (zlibProvider) Compress(data []byte) []byte {
return b.Bytes()
}
-func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
- r, err := zlib.NewReader(bytes.NewReader(compressedData))
+func (zlibProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+ r, err := zlib.NewReader(bytes.NewReader(src))
if err != nil {
return nil, err
}
- uncompressed := make([]byte, originalSize)
- if _, err = io.ReadFull(r, uncompressed); err != nil {
+ if cap(dst) >= originalSize {
+ dst = dst[0:originalSize] // Reuse dst buffer
+ } else {
+ dst = make([]byte, originalSize)
+ }
+ if _, err = io.ReadFull(r, dst); err != nil {
return nil, err
}
@@ -59,7 +67,7 @@ func (zlibProvider) Decompress(compressedData []byte, originalSize int) ([]byte,
return nil, err
}
- return uncompressed, nil
+ return dst, nil
}
func (zlibProvider) Clone() Provider {
diff --git a/pulsar/internal/compression/zstd_cgo.go b/pulsar/internal/compression/zstd_cgo.go
index 9c43cc4..9611aee 100644
--- a/pulsar/internal/compression/zstd_cgo.go
+++ b/pulsar/internal/compression/zstd_cgo.go
@@ -55,8 +55,12 @@ func NewZStdProvider(level Level) Provider {
return newCGoZStdProvider(level)
}
-func (z *zstdCGoProvider) Compress(data []byte) []byte {
- out, err := z.ctx.CompressLevel(nil, data, z.zstdLevel)
+func (z *zstdCGoProvider) CompressMaxSize(originalSize int) int {
+ return zstd.CompressBound(originalSize)
+}
+
+func (z *zstdCGoProvider) Compress(dst, src []byte) []byte {
+ out, err := z.ctx.CompressLevel(dst, src, z.zstdLevel)
if err != nil {
log.WithError(err).Fatal("Failed to compress")
}
@@ -64,8 +68,8 @@ func (z *zstdCGoProvider) Compress(data []byte) []byte {
return out
}
-func (z *zstdCGoProvider) Decompress(compressedData []byte, originalSize int) ([]byte, error) {
- return z.ctx.Decompress(nil, compressedData)
+func (z *zstdCGoProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+ return z.ctx.Decompress(dst, src)
}
func (z *zstdCGoProvider) Close() error {
diff --git a/pulsar/internal/compression/zstd_go.go b/pulsar/internal/compression/zstd_go.go
index 06f203e..ae85078 100644
--- a/pulsar/internal/compression/zstd_go.go
+++ b/pulsar/internal/compression/zstd_go.go
@@ -19,7 +19,6 @@ package compression
import (
"github.com/klauspost/compress/zstd"
- "github.com/pkg/errors"
)
type zstdProvider struct {
@@ -44,16 +43,23 @@ func newPureGoZStdProvider(level Level) Provider {
return p
}
-func (p *zstdProvider) Compress(data []byte) []byte {
- return p.encoder.EncodeAll(data, []byte{})
+func (p *zstdProvider) CompressMaxSize(srcSize int) int {
+ // from zstd.h
+ // this formula ensures that bound(A) + bound(B) <= bound(A+B) as long as A and B >= 128 KB
+ lowLimit := 128 << 10 // 128 kB
+ var margin int
+ if srcSize < lowLimit {
+ margin = (lowLimit - srcSize) >> 11
+ }
+ return srcSize + (srcSize >> 8) + margin
}
-func (p *zstdProvider) Decompress(compressedData []byte, originalSize int) (dst []byte, err error) {
- dst, err = p.decoder.DecodeAll(compressedData, nil)
- if err == nil && len(dst) != originalSize {
- return nil, errors.New("Invalid uncompressed size")
- }
- return
+func (p *zstdProvider) Compress(dst, src []byte) []byte {
+ return p.encoder.EncodeAll(src, dst)
+}
+
+func (p *zstdProvider) Decompress(dst, src []byte, originalSize int) ([]byte, error) {
+ return p.decoder.DecodeAll(src, dst)
}
func (p *zstdProvider) Close() error {
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 573e607..9e6d79f 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -65,13 +65,14 @@ type ConnectionListener interface {
type Connection interface {
SendRequest(requestID uint64, req *pb.BaseCommand, callback func(*pb.BaseCommand, error))
SendRequestNoWait(req *pb.BaseCommand)
- WriteData(data []byte)
+ WriteData(data Buffer)
RegisterListener(id uint64, listener ConnectionListener)
UnregisterListener(id uint64)
AddConsumeHandler(id uint64, handler ConsumerHandler)
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
+ GetBufferFromPool() Buffer
Close()
}
@@ -148,7 +149,7 @@ type connection struct {
incomingRequestsCh chan *request
incomingCmdCh chan *incomingCmd
closeCh chan interface{}
- writeRequestsCh chan []byte
+ writeRequestsCh chan Buffer
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener
@@ -160,6 +161,8 @@ type connection struct {
auth auth.Provider
maxMessageSize int32
+
+ buffersPool sync.Pool
}
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
@@ -187,9 +190,14 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
// partition produces writing on a single connection. In general it's
// good to keep this above the number of partition producers assigned
// to a single connection.
- writeRequestsCh: make(chan []byte, 256),
+ writeRequestsCh: make(chan Buffer, 256),
listeners: make(map[uint64]ConnectionListener),
consumerHandlers: make(map[uint64]ConsumerHandler),
+ buffersPool: sync.Pool{
+ New: func() interface{} {
+ return NewBuffer(1024)
+ },
+ },
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
@@ -344,6 +352,8 @@ func (c *connection) run() {
return
}
c.internalWriteData(data)
+ // Return buffer to the pool since we're now done using it
+ c.buffersPool.Put(data)
case <-c.pingTicker.C:
c.sendPing()
@@ -368,13 +378,13 @@ func (c *connection) runPingCheck() {
}
}
-func (c *connection) WriteData(data []byte) {
+func (c *connection) WriteData(data Buffer) {
c.writeRequestsCh <- data
}
-func (c *connection) internalWriteData(data []byte) {
- c.log.Debug("Write data: ", len(data))
- if _, err := c.cnx.Write(data); err != nil {
+func (c *connection) internalWriteData(data Buffer) {
+ c.log.Debug("Write data: ", data.ReadableBytes())
+ if _, err := c.cnx.Write(data.ReadableSlice()); err != nil {
c.log.WithError(err).Warn("Failed to write on connection")
c.Close()
}
@@ -398,8 +408,7 @@ func (c *connection) writeCommand(cmd proto.Message) {
}
c.writeBuffer.Write(serialized)
- data := c.writeBuffer.ReadableSlice()
- c.internalWriteData(data)
+ c.internalWriteData(c.writeBuffer)
}
func (c *connection) receivedCommand(cmd *pb.BaseCommand, headersAndPayload Buffer) {
@@ -469,7 +478,7 @@ func (c *connection) internalReceivedCommand(cmd *pb.BaseCommand, headersAndPayl
}
}
-func (c *connection) Write(data []byte) {
+func (c *connection) Write(data Buffer) {
c.writeRequestsCh <- data
}
@@ -768,3 +777,9 @@ func (c *connection) ID() string {
func (c *connection) GetMaxMessageSize() int32 {
return c.maxMessageSize
}
+
+func (c *connection) GetBufferFromPool() Buffer {
+ b := c.buffersPool.Get().(Buffer)
+ b.Clear()
+ return b
+}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 32b0da0..b8bdd0d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -155,7 +155,8 @@ func (p *partitionProducer) grabCnx() error {
if p.batchBuilder == nil {
p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType),
- compression.Level(p.options.CompressionLevel))
+ compression.Level(p.options.CompressionLevel),
+ p)
if err != nil {
return err
}
@@ -180,6 +181,10 @@ func (p *partitionProducer) grabCnx() error {
type connectionClosed struct{}
+func (p *partitionProducer) GetConnection() internal.Connection {
+ return p.cnx
+}
+
func (p *partitionProducer) ConnectionClosed() {
// Trigger reconnection in the produce goroutine
p.log.WithField("cnx", p.cnx.ID()).Warn("Connection was closed")
@@ -310,7 +315,7 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
type pendingItem struct {
sync.Mutex
- batchData []byte
+ batchData internal.Buffer
sequenceID uint64
sendRequests []interface{}
completed bool