You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/09 02:30:38 UTC
[pulsar-client-go] branch master updated: Share buffer pool across
all partitions (#310)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 9cbe36f Share buffer pool across all partitions (#310)
9cbe36f is described below
commit 9cbe36f0e8d9ee3547bd2a38eee6efa3b2ad04c5
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Jul 8 19:30:31 2020 -0700
Share buffer pool across all partitions (#310)
## Motivation
When a producer is publishing on many partitions, there can be significant memory overhead in maintaining a per-partition pool. Instead, there's not significant perf impact in using a single shared buffer pool.
---
perf/perf-producer.go | 4 ++++
pulsar/internal/batch_builder.go | 3 +++
pulsar/producer_partition.go | 32 +++++++++++++++-----------------
3 files changed, 22 insertions(+), 17 deletions(-)
diff --git a/perf/perf-producer.go b/perf/perf-producer.go
index 0085c07..ba6e197 100644
--- a/perf/perf-producer.go
+++ b/perf/perf-producer.go
@@ -36,6 +36,7 @@ type ProduceArgs struct {
Topic string
Rate int
BatchingTimeMillis int
+ BatchingMaxSize int
MessageSize int
ProducerQueueSize int
}
@@ -62,6 +63,8 @@ func newProducerCommand() *cobra.Command {
"Publish rate. Set to 0 to go unthrottled")
flags.IntVarP(&produceArgs.BatchingTimeMillis, "batching-time", "b", 1,
"Batching grouping time in millis")
+ flags.IntVarP(&produceArgs.BatchingMaxSize, "batching-max-size", "", 128,
+ "Max size of a batch (in KB)")
flags.IntVarP(&produceArgs.MessageSize, "size", "s", 1024,
"Message size")
flags.IntVarP(&produceArgs.ProducerQueueSize, "queue-size", "q", 1000,
@@ -86,6 +89,7 @@ func produce(produceArgs *ProduceArgs, stop <-chan struct{}) {
Topic: produceArgs.Topic,
MaxPendingMessages: produceArgs.ProducerQueueSize,
BatchingMaxPublishDelay: time.Millisecond * time.Duration(produceArgs.BatchingTimeMillis),
+ BatchingMaxSize: uint(produceArgs.BatchingMaxSize * 1024),
})
if err != nil {
log.Fatal(err)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index a88bada..18ec3c4 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -170,6 +170,9 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
bb.msgMetadata.UncompressedSize = &uncompressedSize
buffer := bb.buffersPool.GetBuffer()
+ if buffer == nil {
+ buffer = NewBuffer(int(uncompressedSize * 3 / 2))
+ }
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)
callbacks = bb.callbacks
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 239fcd6..9243804 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -45,6 +45,8 @@ const (
var (
errFailAddBatch = errors.New("message send failed")
errMessageTooLarge = errors.New("message size exceeds MaxMessageSize")
+
+ buffersPool sync.Pool
)
type partitionProducer struct {
@@ -62,8 +64,7 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker
// Channel where app is posting messages to be published
- eventsChan chan interface{}
- buffersPool sync.Pool
+ eventsChan chan interface{}
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
@@ -89,18 +90,13 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}
p := &partitionProducer{
- state: producerInit,
- log: log.WithField("topic", topic),
- client: client,
- topic: topic,
- options: options,
- producerID: client.rpcClient.NewProducerID(),
- eventsChan: make(chan interface{}, maxPendingMessages),
- buffersPool: sync.Pool{
- New: func() interface{} {
- return internal.NewBuffer(1024)
- },
- },
+ state: producerInit,
+ log: log.WithField("topic", topic),
+ client: client,
+ topic: topic,
+ options: options,
+ producerID: client.rpcClient.NewProducerID(),
+ eventsChan: make(chan interface{}, maxPendingMessages),
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
@@ -189,8 +185,10 @@ func (p *partitionProducer) grabCnx() error {
type connectionClosed struct{}
func (p *partitionProducer) GetBuffer() internal.Buffer {
- b := p.buffersPool.Get().(internal.Buffer)
- b.Clear()
+ b, ok := buffersPool.Get().(internal.Buffer)
+ if ok {
+ b.Clear()
+ }
return b
}
@@ -452,7 +450,7 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// Mark this pending item as done
pi.completed = true
// Return buffer to the pool since we're now done using it
- p.buffersPool.Put(pi.batchData)
+ buffersPool.Put(pi.batchData)
}
func (p *partitionProducer) internalClose(req *closeProducer) {