You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/09 02:30:38 UTC

[pulsar-client-go] branch master updated: Share buffer pool across all partitions (#310)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 9cbe36f  Share buffer pool across all partitions (#310)
9cbe36f is described below

commit 9cbe36f0e8d9ee3547bd2a38eee6efa3b2ad04c5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jul 8 19:30:31 2020 -0700

    Share buffer pool across all partitions (#310)
    
    ## Motivation
    
    When a producer is publishing on many partitions, there can be significant memory overhead in maintaining a per-partition pool. Instead, there's not significant perf impact in using a single shared buffer pool.
---
 perf/perf-producer.go            |  4 ++++
 pulsar/internal/batch_builder.go |  3 +++
 pulsar/producer_partition.go     | 32 +++++++++++++++-----------------
 3 files changed, 22 insertions(+), 17 deletions(-)

diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 0085c07..ba6e197 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -36,6 +36,7 @@ type ProduceArgs struct {
 	Topic              string
 	Rate               int
 	BatchingTimeMillis int
+	BatchingMaxSize    int
 	MessageSize        int
 	ProducerQueueSize  int
 }
@@ -62,6 +63,8 @@ func newProducerCommand() *cobra.Command {
 		"Publish rate. Set to 0 to go unthrottled")
 	flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
 		"Batching grouping time in millis")
+	flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "", 128,
+		"Max size of a batch (in KB)")
 	flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
 		"Message size")
 	flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
@@ -86,6 +89,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
 		Topic:                   produceArgs.Topic,
 		MaxPendingMessages:      produceArgs.ProducerQueueSize,
 		BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
+		BatchingMaxSize:         uint(produceArgs.BatchingMaxSize * 1024),
 	})
 	if err != nil {
 		log.Fatal(err)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index a88bada..18ec3c4 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -170,6 +170,9 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
 	bb.msgMetadata.UncompressedSize = &uncompressedSize
 
 	buffer := bb.buffersPool.GetBuffer()
+	if buffer == nil {
+		buffer = NewBuffer(int(uncompressedSize * 3 / 2))
+	}
 	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)
 
 	callbacks = bb.callbacks
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 239fcd6..9243804 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -45,6 +45,8 @@ const (
 var (
 	errFailAddBatch    = errors.New("message send failed")
 	errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+
+	buffersPool sync.Pool
 )
 
 type partitionProducer struct {
@@ -62,8 +64,7 @@ type partitionProducer struct {
 	batchFlushTicker    *time.Ticker
 
 	// Channel where app is posting messages to be published
-	eventsChan  chan interface{}
-	buffersPool sync.Pool
+	eventsChan chan interface{}
 
 	publishSemaphore internal.Semaphore
 	pendingQueue     internal.BlockingQueue
@@ -89,18 +90,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	}
 
 	p := &partitionProducer{
-		state:      producerInit,
-		log:        log.WithField("topic", topic),
-		client:     client,
-		topic:      topic,
-		options:    options,
-		producerID: client.rpcClient.NewProducerID(),
-		eventsChan: make(chan interface{}, maxPendingMessages),
-		buffersPool: sync.Pool{
-			New: func() interface{} {
-				return internal.NewBuffer(1024)
-			},
-		},
+		state:            producerInit,
+		log:              log.WithField("topic", topic),
+		client:           client,
+		topic:            topic,
+		options:          options,
+		producerID:       client.rpcClient.NewProducerID(),
+		eventsChan:       make(chan interface{}, maxPendingMessages),
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -189,8 +185,10 @@ func (p *partitionProducer) grabCnx() error {
 type connectionClosed struct{}
 
 func (p *partitionProducer) GetBuffer() internal.Buffer {
-	b := p.buffersPool.Get().(internal.Buffer)
-	b.Clear()
+	b, ok := buffersPool.Get().(internal.Buffer)
+	if ok {
+		b.Clear()
+	}
 	return b
 }
 
@@ -452,7 +450,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	// Mark this pending item as done
 	pi.completed = true
 	// Return buffer to the pool since we're now done using it
-	p.buffersPool.Put(pi.batchData)
+	buffersPool.Put(pi.batchData)
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {