You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zi...@apache.org on 2022/12/09 07:14:30 UTC

[pulsar-client-go] branch master updated: fix: fix 879 (#902)

This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 055b00b  fix: fix 879 (#902)
055b00b is described below

commit 055b00b83ccfef4ea12a593e3678cb4bdd18311c
Author: Jiaqi Shen <18...@163.com>
AuthorDate: Fri Dec 9 15:14:25 2022 +0800

    fix: fix 879 (#902)
---
 pulsar/consumer_test.go      | 35 ++++++++++++++++++++++++++++++-----
 pulsar/producer_partition.go |  6 ++++++
 2 files changed, 36 insertions(+), 5 deletions(-)

diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index 989b906..95462ba 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -804,12 +804,22 @@ func TestConsumerCompression(t *testing.T) {
 	topicName := newTopicName()
 	ctx := context.Background()
 
-	producer, err := client.CreateProducer(ProducerOptions{
+	// enable batching
+	p1, err := client.CreateProducer(ProducerOptions{
 		Topic:           topicName,
 		CompressionType: LZ4,
 	})
 	assert.Nil(t, err)
-	defer producer.Close()
+	defer p1.Close()
+
+	// disable batching
+	p2, err := client.CreateProducer(ProducerOptions{
+		Topic:           topicName,
+		CompressionType: LZ4,
+		DisableBatching: true,
+	})
+	assert.Nil(t, err)
+	defer p2.Close()
 
 	consumer, err := client.Subscribe(ConsumerOptions{
 		Topic:            topicName,
@@ -821,8 +831,16 @@ func TestConsumerCompression(t *testing.T) {
 	const N = 100
 
 	for i := 0; i < N; i++ {
-		if _, err := producer.Send(ctx, &ProducerMessage{
-			Payload: []byte(fmt.Sprintf("msg-content-%d", i)),
+		if _, err := p1.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("msg-content-%d-batching-enabled", i)),
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	for i := 0; i < N; i++ {
+		if _, err := p2.Send(ctx, &ProducerMessage{
+			Payload: []byte(fmt.Sprintf("msg-content-%d-batching-disabled", i)),
 		}); err != nil {
 			t.Fatal(err)
 		}
@@ -831,7 +849,14 @@ func TestConsumerCompression(t *testing.T) {
 	for i := 0; i < N; i++ {
 		msg, err := consumer.Receive(ctx)
 		assert.Nil(t, err)
-		assert.Equal(t, fmt.Sprintf("msg-content-%d", i), string(msg.Payload()))
+		assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-enabled", i), string(msg.Payload()))
+		consumer.Ack(msg)
+	}
+
+	for i := 0; i < N; i++ {
+		msg, err := consumer.Receive(ctx)
+		assert.Nil(t, err)
+		assert.Equal(t, fmt.Sprintf("msg-content-%d-batching-disabled", i), string(msg.Payload()))
 		consumer.Ack(msg)
 	}
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index fc564cb..a08ee04 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -574,6 +574,12 @@ func (p *partitionProducer) internalSend(request *sendRequest) {
 		compressedPayload = p.compressionProvider.Compress(nil, uncompressedPayload)
 		compressedSize = len(compressedPayload)
 		checkSize = compressedSize
+
+		// set the compress type in msgMetaData
+		compressionType := pb.CompressionType(p.options.CompressionType)
+		if compressionType != pb.CompressionType_NONE {
+			mm.Compression = &compressionType
+		}
 	} else {
 		// final check for batching message is in serializeMessage
 		// this is a double check