You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rf...@apache.org on 2022/06/23 00:51:20 UTC

[pulsar-client-go] branch master updated: [issue 791] allow to add at least one message on batch builder (#792)

This is an automated email from the ASF dual-hosted git repository.

rfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new c558ddf  [issue 791] allow to add at least one message on batch builder (#792)
c558ddf is described below

commit c558ddfa10e04b88d33bc6daacc9aa067d9f28dc
Author: ming <it...@gmail.com>
AuthorDate: Wed Jun 22 20:51:14 2022 -0400

    [issue 791] allow to add at least one message on batch builder (#792)
    
    * allow to add at least one message on batch builder
    
    * add unit test for batching disabled
---
 pulsar/internal/batch_builder.go           |  4 ++++
 pulsar/internal/key_based_batch_builder.go |  5 ++++
 pulsar/producer_test.go                    | 38 ++++++++++++++++++++++++------
 3 files changed, 40 insertions(+), 7 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 9d18f26..fb7598e 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -157,6 +157,10 @@ func (bc *batchContainer) IsFull() bool {
 
 // hasSpace should return true if and only if the batch container can accommodate another message of length payload.
 func (bc *batchContainer) hasSpace(payload []byte) bool {
+	if bc.numMessages == 0 {
+		// allow to add at least one message when batching is disabled
+		return true
+	}
 	msgSize := uint32(len(payload))
 	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index d09138c..667e855 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -117,6 +117,11 @@ func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
 
 // hasSpace should return true if and only if the batch container can accommodate another message of length payload.
 func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
+	if bc.numMessages == 0 {
+		// allow to add at least one message
+		// and a single max message size is checked in the producer partition, therefore no need to validate batch size
+		return true
+	}
 	msgSize := uint32(len(payload))
 	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 541c1fe..6b2b5d9 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -877,20 +877,44 @@ func TestMaxBatchSize(t *testing.T) {
 	assert.NotNil(t, producer)
 	defer producer.Close()
 
-	for bias := -1; bias <= 1; bias++ {
+	for bias := -1; bias <= 3; bias++ {
 		payload := make([]byte, batchMaxMessageSize+bias)
 		ID, err := producer.Send(context.Background(), &ProducerMessage{
 			Payload: payload,
 		})
-		if bias <= 0 {
-			assert.NoError(t, err)
-			assert.NotNil(t, ID)
-		} else {
-			assert.Equal(t, errFailAddToBatch, err)
-		}
+		// regardless max batch size, if the batch size limit is reached, batching is triggered to send messages
+		assert.NoError(t, err)
+		assert.NotNil(t, ID)
 	}
 }
 
+func TestBatchingDisabled(t *testing.T) {
+	defaultMaxMessageSize := 128 * 1024
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	// when batching is disabled, the batching size has no effect
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		DisableBatching: true,
+		BatchingMaxSize: uint(defaultMaxBatchSize),
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
+	payload := make([]byte, defaultMaxMessageSize+100)
+	ID, err := producer.Send(context.Background(), &ProducerMessage{
+		Payload: payload,
+	})
+	// regardless max batch size, if the batch size limit is reached, batching is triggered to send messages
+	assert.NoError(t, err)
+	assert.NotNil(t, ID)
+}
+
 func TestMaxMessageSize(t *testing.T) {
 	serverMaxMessageSize := 1024 * 1024