You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2021/12/22 06:22:24 UTC

[pulsar-client-go] branch master updated: [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bbfb8e  [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)
2bbfb8e is described below

commit 2bbfb8e4a66c63843c935a50971474abfa25cdf7
Author: Ben Schofield <bs...@users.noreply.github.com>
AuthorDate: Wed Dec 22 06:22:18 2021 +0000

    [Issue 513] Correct apparent logic error in batchContainer's hasSpace() method (#678)
    
    * Correct apparent logic error in batchContainer's hasSpace() method.
    
    * Make the same change to keyBasedBatchContainer's hasSpace() method.
    
    * Fix comment length to pass style checks.
    
    * Allow TestMaxMessageSize() to run
    
    * Add TestMaxBatchSize() to validate that this limit is respected. Based on the results of the test, change the < in hasSpace() to be a <=.
    
    * Further correct logic in hasSpace() / IsFull()
    
    * Remember to make the change in both places!
    
    Co-authored-by: ben <be...@cyber.casa>
---
 pulsar/internal/batch_builder.go           |  7 ++---
 pulsar/internal/key_based_batch_builder.go |  7 ++---
 pulsar/producer_test.go                    | 42 ++++++++++++++++++++++++++++--
 3 files changed, 48 insertions(+), 8 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 7e47304..9d18f26 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -150,14 +150,15 @@ func NewBatchBuilder(
 	return &bc, nil
 }
 
-// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch
 func (bc *batchContainer) IsFull() bool {
-	return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize)
+	return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize)
 }
 
+// hasSpace should return true if and only if the batch container can accommodate another message of length payload.
 func (bc *batchContainer) hasSpace(payload []byte) bool {
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to batch.
diff --git a/pulsar/internal/key_based_batch_builder.go b/pulsar/internal/key_based_batch_builder.go
index 24d564b..d09138c 100644
--- a/pulsar/internal/key_based_batch_builder.go
+++ b/pulsar/internal/key_based_batch_builder.go
@@ -106,18 +106,19 @@ func NewKeyBasedBatchBuilder(
 	return bb, nil
 }
 
-// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
+// IsFull checks if the size in the current batch meets or exceeds the maximum size allowed by the batch
 func (bc *keyBasedBatchContainer) IsFull() bool {
-	return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() > uint32(bc.maxBatchSize)
+	return bc.numMessages >= bc.maxMessages || bc.buffer.ReadableBytes() >= uint32(bc.maxBatchSize)
 }
 
 func (bc *keyBasedBatchContainer) IsMultiBatches() bool {
 	return true
 }
 
+// hasSpace should return true if and only if the batch container can accommodate another message of length payload.
 func (bc *keyBasedBatchContainer) hasSpace(payload []byte) bool {
 	msgSize := uint32(len(payload))
-	return bc.numMessages+1 < bc.maxMessages || (bc.buffer.ReadableBytes()+msgSize) < uint32(bc.maxBatchSize)
+	return bc.numMessages+1 <= bc.maxMessages && bc.buffer.ReadableBytes()+msgSize <= uint32(bc.maxBatchSize)
 }
 
 // Add will add single message to key-based batch with message key.
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index f914017..124c828 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -857,19 +857,57 @@ func TestDelayAbsolute(t *testing.T) {
 	canc()
 }
 
-func TestMaxMessageSize(t *testing.T) {
+func TestMaxBatchSize(t *testing.T) {
+	// Set to be < serverMaxMessageSize
+	batchMaxMessageSize := 512 * 1024
+
 	client, err := NewClient(ClientOptions{
 		URL: serviceURL,
 	})
 	assert.NoError(t, err)
 	defer client.Close()
+
 	producer, err := client.CreateProducer(ProducerOptions{
-		Topic: newTopicName(),
+		Topic:           newTopicName(),
+		BatchingMaxSize: uint(batchMaxMessageSize),
 	})
 	assert.NoError(t, err)
 	assert.NotNil(t, producer)
 	defer producer.Close()
+
+	for bias := -1; bias <= 1; bias++ {
+		payload := make([]byte, batchMaxMessageSize+bias)
+		ID, err := producer.Send(context.Background(), &ProducerMessage{
+			Payload: payload,
+		})
+		if bias <= 0 {
+			assert.NoError(t, err)
+			assert.NotNil(t, ID)
+		} else {
+			assert.Equal(t, errFailAddToBatch, err)
+		}
+	}
+}
+
+func TestMaxMessageSize(t *testing.T) {
 	serverMaxMessageSize := 1024 * 1024
+
+	client, err := NewClient(ClientOptions{
+		URL: serviceURL,
+	})
+	assert.NoError(t, err)
+	defer client.Close()
+
+	// Need to set BatchingMaxSize > serverMaxMessageSize to avoid errMessageTooLarge
+	// being masked by an earlier errFailAddToBatch
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic:           newTopicName(),
+		BatchingMaxSize: uint(2 * serverMaxMessageSize),
+	})
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+	defer producer.Close()
+
 	for bias := -1; bias <= 1; bias++ {
 		payload := make([]byte, serverMaxMessageSize+bias)
 		ID, err := producer.Send(context.Background(), &ProducerMessage{