You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:56 UTC

[pulsar-client-go] 19/38: Support compression in producer

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit 0eb04d510ed5817685b3e1f7280ff4eb4fd2856e
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri May 3 12:34:36 2019 -0700

    Support compression in producer
---
 pulsar/impl/batch_builder.go      | 37 ++++++++++++++++--
 pulsar/impl_partition_producer.go |  8 ++--
 pulsar/producer_test.go           | 80 +++++++++++++++++++++++++++++++++++++++
 pulsar/util_test.go               | 12 ++++++
 4 files changed, 129 insertions(+), 8 deletions(-)

diff --git a/pulsar/impl/batch_builder.go b/pulsar/impl/batch_builder.go
index e8c0af8..0619154 100644
--- a/pulsar/impl/batch_builder.go
+++ b/pulsar/impl/batch_builder.go
@@ -3,6 +3,7 @@ package impl
 import (
 	"github.com/golang/protobuf/proto"
 	log "github.com/sirupsen/logrus"
+	"pulsar-client-go-native/pulsar/impl/compression"
 	pb "pulsar-client-go-native/pulsar/pulsar_proto"
 	"time"
 )
@@ -28,13 +29,16 @@ type BatchBuilder struct {
 	cmdSend     *pb.BaseCommand
 	msgMetadata *pb.MessageMetadata
 	callbacks   []interface{}
+
+	compressionProvider compression.Provider
 }
 
-func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *BatchBuilder {
+func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64,
+	compressionType pb.CompressionType) *BatchBuilder {
 	if maxMessages == 0 {
 		maxMessages = DefaultMaxMessagesPerBatch
 	}
-	return &BatchBuilder{
+	bb := &BatchBuilder{
 		buffer:       NewBuffer(4096),
 		numMessages:  0,
 		maxMessages:  maxMessages,
@@ -47,8 +51,15 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerId uint64) *
 		msgMetadata: &pb.MessageMetadata{
 			ProducerName: &producerName,
 		},
-		callbacks: []interface{}{},
+		callbacks:           []interface{}{},
+		compressionProvider: getCompressionProvider(compressionType),
+	}
+
+	if compressionType != pb.CompressionType_NONE {
+		bb.msgMetadata.Compression = &compressionType
 	}
+
+	return bb
 }
 
 func (bb *BatchBuilder) IsFull() bool {
@@ -108,11 +119,29 @@ func (bb *BatchBuilder) Flush() (batchData []byte, sequenceId uint64, callbacks
 	bb.msgMetadata.NumMessagesInBatch = proto.Int32(int32(bb.numMessages))
 	bb.cmdSend.Send.NumMessages = proto.Int32(int32(bb.numMessages))
 
+	compressed := bb.compressionProvider.Compress(bb.buffer.ReadableSlice())
+
 	buffer := NewBuffer(4096)
-	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer.ReadableSlice())
+	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, compressed)
 
 	callbacks = bb.callbacks
 	sequenceId = bb.cmdSend.Send.GetSequenceId()
 	bb.reset()
 	return buffer.ReadableSlice(), sequenceId, callbacks
 }
+
+func getCompressionProvider(compressionType pb.CompressionType) compression.Provider {
+	switch compressionType {
+	case pb.CompressionType_NONE:
+		return compression.NoopProvider
+	case pb.CompressionType_LZ4:
+		return compression.Lz4Provider
+	case pb.CompressionType_ZLIB:
+		return compression.ZLibProvider
+	case pb.CompressionType_ZSTD:
+		return compression.ZStdProvider
+	default:
+		log.Panic("Unsupported compression type: ", compressionType)
+		return nil
+	}
+}
diff --git a/pulsar/impl_partition_producer.go b/pulsar/impl_partition_producer.go
index 2557d1d..b87384d 100644
--- a/pulsar/impl_partition_producer.go
+++ b/pulsar/impl_partition_producer.go
@@ -115,7 +115,8 @@ func (p *partitionProducer) grabCnx() error {
 
 	p.producerName = res.Response.ProducerSuccess.ProducerName
 	if p.batchBuilder == nil {
-		p.batchBuilder = impl.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName, p.producerId)
+		p.batchBuilder = impl.NewBatchBuilder(p.options.BatchingMaxMessages, *p.producerName,
+			p.producerId, pb.CompressionType(p.options.CompressionType))
 	}
 	if p.sequenceIdGenerator == nil {
 		nextSequenceId := uint64(res.Response.ProducerSuccess.GetLastSequenceId() + 1)
@@ -235,8 +236,8 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 }
 
 type pendingItem struct {
-	batchData   []byte
-	sequenceId  uint64
+	batchData    []byte
+	sequenceId   uint64
 	sendRequests []interface{}
 }
 
@@ -393,4 +394,3 @@ type flushRequest struct {
 	waitGroup *sync.WaitGroup
 	err       error
 }
-
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
new file mode 100644
index 0000000..ab3d739
--- /dev/null
+++ b/pulsar/producer_test.go
@@ -0,0 +1,80 @@
+package pulsar
+
+import (
+	"context"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestSimpleProducer(t *testing.T) {
+	client, err := NewClient(ClientOptions{
+		URL: serviceUrl,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	for i := 0; i < 10; i++ {
+		err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: []byte("hello"),
+		})
+
+		assert.NoError(t, err)
+	}
+
+	err = producer.Close()
+	assert.NoError(t, err)
+
+	err = client.Close()
+	assert.NoError(t, err)
+}
+
+func TestProducerCompression(t *testing.T) {
+
+	type testProvider struct {
+		name            string
+		compressionType CompressionType
+	}
+
+	var providers = []testProvider{
+		{"zlib", ZLib},
+		{"lz4", LZ4},
+		{"zstd", ZSTD},
+	}
+
+	for _, p := range providers {
+		t.Run(p.name, func(t *testing.T) {
+			client, err := NewClient(ClientOptions{
+				URL: serviceUrl,
+			})
+			assert.NoError(t, err)
+
+			producer, err := client.CreateProducer(ProducerOptions{
+				Topic:           newTopicName(),
+				CompressionType: p.compressionType,
+			})
+
+			assert.NoError(t, err)
+			assert.NotNil(t, producer)
+
+			for i := 0; i < 10; i++ {
+				err := producer.Send(context.Background(), &ProducerMessage{
+					Payload: []byte("hello"),
+				})
+
+				assert.NoError(t, err)
+			}
+
+			err = producer.Close()
+			assert.NoError(t, err)
+
+			err = client.Close()
+			assert.NoError(t, err)
+		})
+	}
+}
diff --git a/pulsar/util_test.go b/pulsar/util_test.go
new file mode 100644
index 0000000..4bc67d3
--- /dev/null
+++ b/pulsar/util_test.go
@@ -0,0 +1,12 @@
+package pulsar
+
+import (
+	"fmt"
+	"time"
+)
+
+const serviceUrl = "pulsar://localhost:6650"
+
+func newTopicName() string {
+	return fmt.Sprintf("my-topic-%v", time.Now().Nanosecond())
+}