You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/13 15:40:22 UTC

[pulsar-client-go] branch master updated: Expose BatchingMaxSize from ProducerOptions (#280)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 62203d7  Expose BatchingMaxSize from ProducerOptions (#280)
62203d7 is described below

commit 62203d7bf6e0ac82ffff0d4fed66520094b66406
Author: dferstay <df...@users.noreply.github.com>
AuthorDate: Sat Jun 13 08:40:15 2020 -0700

    Expose BatchingMaxSize from ProducerOptions (#280)
    
    Previously, the producer maximum batch size was hard-coded to 128 KB.
    
    Now, the produdcer maximum batch size is exposed via ProducerOptions
    and defaults to 128 KB
    
    Co-authored-by: Daniel Ferstay <df...@splunk.com>
---
 pulsar/internal/batch_builder.go | 22 +++++++++++++++-------
 pulsar/producer.go               |  7 ++++++-
 pulsar/producer_partition.go     |  4 ++--
 pulsar/producer_test.go          |  2 +-
 4 files changed, 24 insertions(+), 11 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 3b54eba..35cdf71 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -29,10 +29,9 @@ import (
 )
 
 const (
-	// MaxBatchSize will be the largest size for a batch sent from this particular producer.
-	// This is used as a baseline to allocate a new buffer that can hold the entire batch
-	// without needing costly re-allocations.
-	MaxBatchSize = 128 * 1024
+	// DefaultMaxBatchSize init default for maximum number of bytes per batch
+	DefaultMaxBatchSize = 128 * 1024
+
 	// DefaultMaxMessagesPerBatch init default num of entries in per batch.
 	DefaultMaxMessagesPerBatch = 1000
 )
@@ -47,6 +46,11 @@ type BatchBuilder struct {
 	// Max number of message allowed in the batch
 	maxMessages uint
 
+	// The largest size for a batch sent from this praticular producer.
+	// This is used as a baseline to allocate a new buffer that can hold the entire batch
+	// without needing costly re-allocations.
+	maxBatchSize uint
+
 	producerName string
 	producerID   uint64
 
@@ -58,15 +62,19 @@ type BatchBuilder struct {
 }
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
-func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
+func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType) (*BatchBuilder, error) {
 	if maxMessages == 0 {
 		maxMessages = DefaultMaxMessagesPerBatch
 	}
+	if maxBatchSize == 0 {
+		maxBatchSize = DefaultMaxBatchSize
+	}
 	bb := &BatchBuilder{
 		buffer:       NewBuffer(4096),
 		numMessages:  0,
 		maxMessages:  maxMessages,
+		maxBatchSize: maxBatchSize,
 		producerName: producerName,
 		producerID:   producerID,
 		cmdSend: baseCommand(pb.BaseCommand_SEND,
@@ -93,12 +101,12 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
 
 // IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
 func (bb *BatchBuilder) IsFull() bool {
-	return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
+	return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > uint32(bb.maxBatchSize)
 }
 
 func (bb *BatchBuilder) hasSpace(payload []byte) bool {
 	msgSize := uint32(len(payload))
-	return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
+	return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > uint32(bb.maxBatchSize)
 }
 
 // Add will add single message to batch.
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a951426..2d630f1 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -109,8 +109,13 @@ type ProducerOptions struct {
 
 	// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000)
 	// If set to a value greater than 1, messages will be queued until this threshold is reached or
-	// batch interval has elapsed.
+	// BatchingMaxSize (see below) has been reached or the batch interval has elapsed.
 	BatchingMaxMessages uint
+
+	// BatchingMaxSize sets the maximum number of bytes permitted in a batch. (default 128 KB)
+	// If set to a value greater than 1, messages will be queued until this threshold is reached or
+	// BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
+	BatchingMaxSize uint
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2eb3696..0f3d198 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -151,8 +151,8 @@ func (p *partitionProducer) grabCnx() error {
 
 	p.producerName = res.Response.ProducerSuccess.GetProducerName()
 	if p.batchBuilder == nil {
-		p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName,
-			p.producerID, pb.CompressionType(p.options.CompressionType))
+		p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
+			p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType))
 		if err != nil {
 			return err
 		}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 9f76873..2c028c2 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -661,7 +661,7 @@ func TestBatchMessageFlushing(t *testing.T) {
 	}
 	defer producer.Close()
 
-	maxBytes := internal.MaxBatchSize
+	maxBytes := internal.DefaultMaxBatchSize
 	genbytes := func(n int) []byte {
 		c := []byte("a")[0]
 		bytes := make([]byte, n)