You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/26 21:04:56 UTC

[pulsar-client-go] branch master updated: Fixed pooled buffer lifecycle (#300)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new ba79ab0  Fixed pooled buffer lifecycle (#300)
ba79ab0 is described below

commit ba79ab06da013f8fc6850ce40cd7613aeddce9d1
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 14:04:48 2020 -0700

    Fixed pooled buffer lifecycle (#300)
---
 pulsar/internal/batch_builder.go | 19 ++++++-------------
 pulsar/internal/connection.go    | 16 ----------------
 pulsar/producer_partition.go     | 30 ++++++++++++++++++++----------
 3 files changed, 26 insertions(+), 39 deletions(-)

diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 90e1b9a..a88bada 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -35,8 +35,8 @@ const (
 	DefaultMaxMessagesPerBatch = 1000
 )
 
-type ConnectionHolder interface {
-	GetConnection() Connection
+type BuffersPool interface {
+	GetBuffer() Buffer
 }
 
 // BatchBuilder wraps the objects needed to build a batch.
@@ -62,13 +62,13 @@ type BatchBuilder struct {
 	callbacks   []interface{}
 
 	compressionProvider compression.Provider
-	cnxHolder           ConnectionHolder
+	buffersPool         BuffersPool
 }
 
 // NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
 func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
 	compressionType pb.CompressionType, level compression.Level,
-	cnxHolder ConnectionHolder) (*BatchBuilder, error) {
+	bufferPool BuffersPool) (*BatchBuilder, error) {
 	if maxMessages == 0 {
 		maxMessages = DefaultMaxMessagesPerBatch
 	}
@@ -91,7 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
 		},
 		callbacks:           []interface{}{},
 		compressionProvider: getCompressionProvider(compressionType, level),
-		cnxHolder:           cnxHolder,
+		buffersPool:         bufferPool,
 	}
 
 	if compressionType != pb.CompressionType_NONE {
@@ -169,14 +169,7 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
 	uncompressedSize := bb.buffer.ReadableBytes()
 	bb.msgMetadata.UncompressedSize = &uncompressedSize
 
-	cnx := bb.cnxHolder.GetConnection()
-	var buffer Buffer
-	if cnx == nil {
-		buffer = NewBuffer(int(uncompressedSize))
-	} else {
-		buffer = cnx.GetBufferFromPool()
-	}
-
+	buffer := bb.buffersPool.GetBuffer()
 	serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)
 
 	callbacks = bb.callbacks
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 071309a..06d1543 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -72,7 +72,6 @@ type Connection interface {
 	DeleteConsumeHandler(id uint64)
 	ID() string
 	GetMaxMessageSize() int32
-	GetBufferFromPool() Buffer
 	Close()
 }
 
@@ -161,8 +160,6 @@ type connection struct {
 	auth       auth.Provider
 
 	maxMessageSize int32
-
-	buffersPool sync.Pool
 }
 
 func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
@@ -193,11 +190,6 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
 		writeRequestsCh:  make(chan Buffer, 256),
 		listeners:        make(map[uint64]ConnectionListener),
 		consumerHandlers: make(map[uint64]ConsumerHandler),
-		buffersPool: sync.Pool{
-			New: func() interface{} {
-				return NewBuffer(1024)
-			},
-		},
 	}
 	cnx.reader = newConnectionReader(cnx)
 	cnx.cond = sync.NewCond(cnx)
@@ -352,8 +344,6 @@ func (c *connection) run() {
 				return
 			}
 			c.internalWriteData(data)
-			// Return buffer to the pool since we're now done using it
-			c.buffersPool.Put(data)
 
 		case <-c.pingTicker.C:
 			c.sendPing()
@@ -778,9 +768,3 @@ func (c *connection) ID() string {
 func (c *connection) GetMaxMessageSize() int32 {
 	return c.maxMessageSize
 }
-
-func (c *connection) GetBufferFromPool() Buffer {
-	b := c.buffersPool.Get().(Buffer)
-	b.Clear()
-	return b
-}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 18bb9a3..368119f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -62,7 +62,8 @@ type partitionProducer struct {
 	batchFlushTicker    *time.Ticker
 
 	// Channel where app is posting messages to be published
-	eventsChan chan interface{}
+	eventsChan  chan interface{}
+	buffersPool sync.Pool
 
 	publishSemaphore internal.Semaphore
 	pendingQueue     internal.BlockingQueue
@@ -88,13 +89,18 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
 	}
 
 	p := &partitionProducer{
-		state:            producerInit,
-		log:              log.WithField("topic", topic),
-		client:           client,
-		topic:            topic,
-		options:          options,
-		producerID:       client.rpcClient.NewProducerID(),
-		eventsChan:       make(chan interface{}, maxPendingMessages),
+		state:      producerInit,
+		log:        log.WithField("topic", topic),
+		client:     client,
+		topic:      topic,
+		options:    options,
+		producerID: client.rpcClient.NewProducerID(),
+		eventsChan: make(chan interface{}, maxPendingMessages),
+		buffersPool: sync.Pool{
+			New: func() interface{} {
+				return internal.NewBuffer(1024)
+			},
+		},
 		batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
 		publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
 		pendingQueue:     internal.NewBlockingQueue(maxPendingMessages),
@@ -181,8 +187,10 @@ func (p *partitionProducer) grabCnx() error {
 
 type connectionClosed struct{}
 
-func (p *partitionProducer) GetConnection() internal.Connection {
-	return p.cnx
+func (p *partitionProducer) GetBuffer() internal.Buffer {
+	b := p.buffersPool.Get().(internal.Buffer)
+	b.Clear()
+	return b
 }
 
 func (p *partitionProducer) ConnectionClosed() {
@@ -442,6 +450,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 
 	// Mark this pending item as done
 	pi.completed = true
+	// Return buffer to the pool since we're now done using it
+	p.buffersPool.Put(pi.batchData)
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {