You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/13 15:40:22 UTC
[pulsar-client-go] branch master updated: Expose BatchingMaxSize
from ProducerOptions (#280)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 62203d7 Expose BatchingMaxSize from ProducerOptions (#280)
62203d7 is described below
commit 62203d7bf6e0ac82ffff0d4fed66520094b66406
Author: dferstay <df...@users.noreply.github.com>
AuthorDate: Sat Jun 13 08:40:15 2020 -0700
Expose BatchingMaxSize from ProducerOptions (#280)
Previously, the producer maximum batch size was hard-coded to 128 KB.
Now, the produdcer maximum batch size is exposed via ProducerOptions
and defaults to 128 KB
Co-authored-by: Daniel Ferstay <df...@splunk.com>
---
pulsar/internal/batch_builder.go | 22 +++++++++++++++-------
pulsar/producer.go | 7 ++++++-
pulsar/producer_partition.go | 4 ++--
pulsar/producer_test.go | 2 +-
4 files changed, 24 insertions(+), 11 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 3b54eba..35cdf71 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -29,10 +29,9 @@ import (
)
const (
- // MaxBatchSize will be the largest size for a batch sent from this particular producer.
- // This is used as a baseline to allocate a new buffer that can hold the entire batch
- // without needing costly re-allocations.
- MaxBatchSize = 128 * 1024
+ // DefaultMaxBatchSize init default for maximum number of bytes per batch
+ DefaultMaxBatchSize = 128 * 1024
+
// DefaultMaxMessagesPerBatch init default num of entries in per batch.
DefaultMaxMessagesPerBatch = 1000
)
@@ -47,6 +46,11 @@ type BatchBuilder struct {
// Max number of message allowed in the batch
maxMessages uint
+ // The largest size for a batch sent from this praticular producer.
+ // This is used as a baseline to allocate a new buffer that can hold the entire batch
+ // without needing costly re-allocations.
+ maxBatchSize uint
+
producerName string
producerID uint64
@@ -58,15 +62,19 @@ type BatchBuilder struct {
}
// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
-func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
+func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
+ if maxBatchSize == 0 {
+ maxBatchSize = DefaultMaxBatchSize
+ }
bb := &BatchBuilder{
buffer: NewBuffer(4096),
numMessages: 0,
maxMessages: maxMessages,
+ maxBatchSize: maxBatchSize,
producerName: producerName,
producerID: producerID,
cmdSend: baseCommand(pb.BaseCommand_SEND,
@@ -93,12 +101,12 @@ func NewBatchBuilder(maxMessages uint, producerName string, producerID uint64,
// IsFull check if the size in the current batch exceeds the maximum size allowed by the batch
func (bb *BatchBuilder) IsFull() bool {
- return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > MaxBatchSize
+ return bb.numMessages >= bb.maxMessages || bb.buffer.ReadableBytes() > uint32(bb.maxBatchSize)
}
func (bb *BatchBuilder) hasSpace(payload []byte) bool {
msgSize := uint32(len(payload))
- return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > MaxBatchSize
+ return bb.numMessages > 0 && (bb.buffer.ReadableBytes()+msgSize) > uint32(bb.maxBatchSize)
}
// Add will add single message to batch.
diff --git a/pulsar/producer.go b/pulsar/producer.go
index a951426..2d630f1 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -109,8 +109,13 @@ type ProducerOptions struct {
// BatchingMaxMessages set the maximum number of messages permitted in a batch. (default: 1000)
// If set to a value greater than 1, messages will be queued until this threshold is reached or
- // batch interval has elapsed.
+ // BatchingMaxSize (see below) has been reached or the batch interval has elapsed.
BatchingMaxMessages uint
+
+ // BatchingMaxSize sets the maximum number of bytes permitted in a batch. (default 128 KB)
+ // If set to a value greater than 1, messages will be queued until this threshold is reached or
+ // BatchingMaxMessages (see above) has been reached or the batch interval has elapsed.
+ BatchingMaxSize uint
}
// Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 2eb3696..0f3d198 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -151,8 +151,8 @@ func (p *partitionProducer) grabCnx() error {
p.producerName = res.Response.ProducerSuccess.GetProducerName()
if p.batchBuilder == nil {
- p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.producerName,
- p.producerID, pb.CompressionType(p.options.CompressionType))
+ p.batchBuilder, err = internal.NewBatchBuilder(p.options.BatchingMaxMessages, p.options.BatchingMaxSize,
+ p.producerName, p.producerID, pb.CompressionType(p.options.CompressionType))
if err != nil {
return err
}
diff --git a/pulsar/producer_test.go b/pulsar/producer_test.go
index 9f76873..2c028c2 100644
--- a/pulsar/producer_test.go
+++ b/pulsar/producer_test.go
@@ -661,7 +661,7 @@ func TestBatchMessageFlushing(t *testing.T) {
}
defer producer.Close()
- maxBytes := internal.MaxBatchSize
+ maxBytes := internal.DefaultMaxBatchSize
genbytes := func(n int) []byte {
c := []byte("a")[0]
bytes := make([]byte, n)