You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/26 21:04:56 UTC
[pulsar-client-go] branch master updated: Fixed pooled buffer
lifecycle (#300)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new ba79ab0 Fixed pooled buffer lifecycle (#300)
ba79ab0 is described below
commit ba79ab06da013f8fc6850ce40cd7613aeddce9d1
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Fri Jun 26 14:04:48 2020 -0700
Fixed pooled buffer lifecycle (#300)
---
pulsar/internal/batch_builder.go | 19 ++++++-------------
pulsar/internal/connection.go | 16 ----------------
pulsar/producer_partition.go | 30 ++++++++++++++++++++----------
3 files changed, 26 insertions(+), 39 deletions(-)
diff --git a/pulsar/internal/batch_builder.go b/pulsar/internal/batch_builder.go
index 90e1b9a..a88bada 100644
--- a/pulsar/internal/batch_builder.go
+++ b/pulsar/internal/batch_builder.go
@@ -35,8 +35,8 @@ const (
DefaultMaxMessagesPerBatch = 1000
)
-type ConnectionHolder interface {
- GetConnection() Connection
+type BuffersPool interface {
+ GetBuffer() Buffer
}
// BatchBuilder wraps the objects needed to build a batch.
@@ -62,13 +62,13 @@ type BatchBuilder struct {
callbacks []interface{}
compressionProvider compression.Provider
- cnxHolder ConnectionHolder
+ buffersPool BuffersPool
}
// NewBatchBuilder init batch builder and return BatchBuilder pointer. Build a new batch message container.
func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, producerID uint64,
compressionType pb.CompressionType, level compression.Level,
- cnxHolder ConnectionHolder) (*BatchBuilder, error) {
+ bufferPool BuffersPool) (*BatchBuilder, error) {
if maxMessages == 0 {
maxMessages = DefaultMaxMessagesPerBatch
}
@@ -91,7 +91,7 @@ func NewBatchBuilder(maxMessages uint, maxBatchSize uint, producerName string, p
},
callbacks: []interface{}{},
compressionProvider: getCompressionProvider(compressionType, level),
- cnxHolder: cnxHolder,
+ buffersPool: bufferPool,
}
if compressionType != pb.CompressionType_NONE {
@@ -169,14 +169,7 @@ func (bb *BatchBuilder) Flush() (batchData Buffer, sequenceID uint64, callbacks
uncompressedSize := bb.buffer.ReadableBytes()
bb.msgMetadata.UncompressedSize = &uncompressedSize
- cnx := bb.cnxHolder.GetConnection()
- var buffer Buffer
- if cnx == nil {
- buffer = NewBuffer(int(uncompressedSize))
- } else {
- buffer = cnx.GetBufferFromPool()
- }
-
+ buffer := bb.buffersPool.GetBuffer()
serializeBatch(buffer, bb.cmdSend, bb.msgMetadata, bb.buffer, bb.compressionProvider)
callbacks = bb.callbacks
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 071309a..06d1543 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -72,7 +72,6 @@ type Connection interface {
DeleteConsumeHandler(id uint64)
ID() string
GetMaxMessageSize() int32
- GetBufferFromPool() Buffer
Close()
}
@@ -161,8 +160,6 @@ type connection struct {
auth auth.Provider
maxMessageSize int32
-
- buffersPool sync.Pool
}
func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSOptions,
@@ -193,11 +190,6 @@ func newConnection(logicalAddr *url.URL, physicalAddr *url.URL, tlsOptions *TLSO
writeRequestsCh: make(chan Buffer, 256),
listeners: make(map[uint64]ConnectionListener),
consumerHandlers: make(map[uint64]ConsumerHandler),
- buffersPool: sync.Pool{
- New: func() interface{} {
- return NewBuffer(1024)
- },
- },
}
cnx.reader = newConnectionReader(cnx)
cnx.cond = sync.NewCond(cnx)
@@ -352,8 +344,6 @@ func (c *connection) run() {
return
}
c.internalWriteData(data)
- // Return buffer to the pool since we're now done using it
- c.buffersPool.Put(data)
case <-c.pingTicker.C:
c.sendPing()
@@ -778,9 +768,3 @@ func (c *connection) ID() string {
func (c *connection) GetMaxMessageSize() int32 {
return c.maxMessageSize
}
-
-func (c *connection) GetBufferFromPool() Buffer {
- b := c.buffersPool.Get().(Buffer)
- b.Clear()
- return b
-}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 18bb9a3..368119f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -62,7 +62,8 @@ type partitionProducer struct {
batchFlushTicker *time.Ticker
// Channel where app is posting messages to be published
- eventsChan chan interface{}
+ eventsChan chan interface{}
+ buffersPool sync.Pool
publishSemaphore internal.Semaphore
pendingQueue internal.BlockingQueue
@@ -88,13 +89,18 @@ func newPartitionProducer(client *client, topic string, options *ProducerOptions
}
p := &partitionProducer{
- state: producerInit,
- log: log.WithField("topic", topic),
- client: client,
- topic: topic,
- options: options,
- producerID: client.rpcClient.NewProducerID(),
- eventsChan: make(chan interface{}, maxPendingMessages),
+ state: producerInit,
+ log: log.WithField("topic", topic),
+ client: client,
+ topic: topic,
+ options: options,
+ producerID: client.rpcClient.NewProducerID(),
+ eventsChan: make(chan interface{}, maxPendingMessages),
+ buffersPool: sync.Pool{
+ New: func() interface{} {
+ return internal.NewBuffer(1024)
+ },
+ },
batchFlushTicker: time.NewTicker(batchingMaxPublishDelay),
publishSemaphore: internal.NewSemaphore(int32(maxPendingMessages)),
pendingQueue: internal.NewBlockingQueue(maxPendingMessages),
@@ -181,8 +187,10 @@ func (p *partitionProducer) grabCnx() error {
type connectionClosed struct{}
-func (p *partitionProducer) GetConnection() internal.Connection {
- return p.cnx
+func (p *partitionProducer) GetBuffer() internal.Buffer {
+ b := p.buffersPool.Get().(internal.Buffer)
+ b.Clear()
+ return b
}
func (p *partitionProducer) ConnectionClosed() {
@@ -442,6 +450,8 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
// Mark this pending item as done
pi.completed = true
+ // Return buffer to the pool since we're now done using it
+ p.buffersPool.Put(pi.batchData)
}
func (p *partitionProducer) internalClose(req *closeProducer) {