You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:56 UTC
[pulsar-client-go] 19/38: Support compression in producer
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit 0eb04d510ed5817685b3e1f7280ff4eb4fd2856e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri May 3 12:34:36 2019 -0700
Support compression in producer
---
pulsar/impl/batch_builder.go | 37 ++++++++++++++++--
pulsar/impl_partition_producer.go | 8 ++--
pulsar/producer_test.go | 80 +++++++++++++++++++++++++++++++++++++++
pulsar/util_test.go | 12 ++++++
4 files changed, 129 insertions(+), 8 deletions(-)
diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index e8c0af8..0619154 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -3,6 +3,7 @@ package impl
import (
"github.com/golang/protobuf/proto"
log "github.com/sirupsen/logrus"
+ "pulsar-client-go-native/pulsar/impl/compression"
pb "pulsar-client-go-native/pulsar/pulsar_proto"
"time"
)
@@ -28,13 +29,16 @@ type BatchBuilder struct {
cmdSend *pb.BaseCommand
msgMetadata *pb.MessageMetadata
callbacks []interface{}
+
+ compressionProvider compression.Provider
}
-func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *BatchBuilder {
+func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
+ compressionType pb.CompressionType) *BatchBuilder {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
- return &BatchBuilder{
+ bb := &BatchBuilder{
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
@@ -47,8 +51,15 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *
msgMetadata: &pb.MessageMetadata{
ProducerName: &producerName,
},
- callbacks: []interface{}{},
+ callbacks: []interface{}{},
+ compressionProvider: getCompressionProvider(compressionType),
+ }
+
+ if compressionType != pb.CompressionType_NONE {
+ bb.msgMetadata.Compression = &compressionType
}
+
+ return bb
}
func (bb *BatchBuilder) IsFull() bool {
@@ -108,11 +119,29 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceId uint64, callbacks
bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
+ compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
+
buffer := NewBuffer(4096)
- serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer.ReadableSlice())
+ serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
callbacks = bb.callbacks
sequenceId = bb.cmdSend.Send.GetSequenceId()
bb.reset()
return buffer.ReadableSlice(), sequenceId, callbacks
}
+
+func getCompressionProvider(compressionType pb.CompressionType) compression.Provider {
+ switch compressionType {
+ case pb.CompressionType_NONE:
+ return compression.NoopProvider
+ case pb.CompressionType_LZ4:
+ return compression.Lz4Provider
+ case pb.CompressionType_ZLIB:
+ return compression.ZLibProvider
+ case pb.CompressionType_ZSTD:
+ return compression.ZStdProvider
+ default:
+ log.Panic("Unsupported compression type: ", compressionType)
+ return nil
+ }
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 2557d1d..b87384d 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -115,7 +115,8 @@ func (p *partitionProducer) grabCnx() error {
p.producerName = res.Response.ProducerSuccess.ProducerName
if p.batchBuilder == nil {
- p.batchBuilder = impl.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName, p.producerId)
+ p.batchBuilder = impl.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
+ p.producerId, pb.CompressionType(p.options.CompressionType))
}
if p.sequenceIdGenerator == nil {
nextSequenceId := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
@@ -235,8 +236,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
}
type pendingItem struct {
- batchData []byte
- sequenceId uint64
+ batchData []byte
+ sequenceId uint64
sendRequests []interface{}
}
@@ -393,4 +394,3 @@ type flushRequest struct {
waitGroup *sync.WaitGroup
err error
}
-
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
new file mode 100644
index 0000000..ab3d739
--- /dev/null
+++ b/pulsar/producer_test.go
@@ -0,0 +1,80 @@
+package pulsar
+
+import (
+ "context"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func TestSimpleProducer(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceUrl,
+ })
+ assert.NoError(t, err)
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ for i := 0; i < 10; i++ {
+ err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+
+ assert.NoError(t, err)
+ }
+
+ err = producer.Close()
+ assert.NoError(t, err)
+
+ err = client.Close()
+ assert.NoError(t, err)
+}
+
+func TestProducerCompression(t *testing.T) {
+
+ type testProvider struct {
+ name string
+ compressionType CompressionType
+ }
+
+ var providers = []testProvider{
+ {"zlib", ZLib},
+ {"lz4", LZ4},
+ {"zstd", ZSTD},
+ }
+
+ for _, p := range providers {
+ t.Run(p.name, func(t *testing.T) {
+ client, err := NewClient(ClientOptions{
+ URL: serviceUrl,
+ })
+ assert.NoError(t, err)
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newTopicName(),
+ CompressionType: p.compressionType,
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ for i := 0; i < 10; i++ {
+ err := producer.Send(context.Background(), &ProducerMessage{
+ Payload: []byte("hello"),
+ })
+
+ assert.NoError(t, err)
+ }
+
+ err = producer.Close()
+ assert.NoError(t, err)
+
+ err = client.Close()
+ assert.NoError(t, err)
+ })
+ }
+}
diff --git a/pulsar/util_test.go b/pulsar/util_test.go
new file mode 100644
index 0000000..4bc67d3
--- /dev/null
+++ b/pulsar/util_test.go
@@ -0,0 +1,12 @@
+package pulsar
+
+import (
+ "fmt"
+ "time"
+)
+
+const serviceUrl = "pulsar://localhost:6650"
+
+func newTopicName() string {
+ return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
+}