You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/18 21:57:51 UTC

[pulsar-client-go] branch master updated: Avoid contention on producer mutex on critical path (#286)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 861d7af  Avoid contention on producer mutex on critical path (#286)
861d7af is described below

commit 861d7af1fbc7d9b252fc430fccb9db7dec9e7924
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 18 14:57:45 2020 -0700

    Avoid contention on producer mutex on critical path (#286)
    
    * Avoid contention on producer mutex on critical path
    
    * Circumvent the race detector
    
    * Removed space
---
 pulsar/producer_impl.go | 47 +++++++++++++++++++++++++++--------------------
 1 file changed, 27 insertions(+), 20 deletions(-)

diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index fc9019a..35dae28 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -22,6 +22,7 @@ import (
 	"sync"
 	"sync/atomic"
 	"time"
+	"unsafe"
 
 	log "github.com/sirupsen/logrus"
 
@@ -29,11 +30,12 @@ import (
 )
 
 type producer struct {
-	sync.Mutex
+	sync.RWMutex
 	client        *client
 	options       *ProducerOptions
 	topic         string
 	producers     []Producer
+	producersPtr  unsafe.Pointer
 	numPartitions uint32
 	messageRouter func(*ProducerMessage, TopicMetadata) int
 	ticker        *time.Ticker
@@ -115,6 +117,7 @@ func (p *producer) internalCreatePartitionsProducers() error {
 
 	p.Lock()
 	defer p.Unlock()
+
 	oldProducers := p.producers
 
 	if oldProducers != nil {
@@ -179,6 +182,7 @@ func (p *producer) internalCreatePartitionsProducers() error {
 		return err
 	}
 
+	atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
 	atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
 	return nil
 }
@@ -188,8 +192,8 @@ func (p *producer) Topic() string {
 }
 
 func (p *producer) Name() string {
-	p.Lock()
-	defer p.Unlock()
+	p.RLock()
+	defer p.RUnlock()
 
 	return p.producers[0].Name()
 }
@@ -199,27 +203,30 @@ func (p *producer) NumPartitions() uint32 {
 }
 
 func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
-	p.Lock()
-	partition := p.messageRouter(msg, p)
-	pp := p.producers[partition]
-	p.Unlock()
-
-	return pp.Send(ctx, msg)
+	return p.getPartition(msg).Send(ctx, msg)
 }
 
 func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
 	callback func(MessageID, *ProducerMessage, error)) {
-	p.Lock()
-	partition := p.messageRouter(msg, p)
-	pp := p.producers[partition]
-	p.Unlock()
+	p.getPartition(msg).SendAsync(ctx, msg, callback)
+}
 
-	pp.SendAsync(ctx, msg, callback)
+func (p *producer) getPartition(msg *ProducerMessage) Producer {
+	// Since partitions can only increase, it's ok if the producers list
+	// is updated in between. The numPartition is updated only after the list.
+	partition := p.messageRouter(msg, p)
+	producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
+	if partition >= len(producers) {
+		// We read the old producers list while the count was already
+		// updated
+		partition %= len(producers)
+	}
+	return producers[partition]
 }
 
 func (p *producer) LastSequenceID() int64 {
-	p.Lock()
-	defer p.Unlock()
+	p.RLock()
+	defer p.RUnlock()
 
 	var maxSeq int64 = -1
 	for _, pp := range p.producers {
@@ -232,8 +239,8 @@ func (p *producer) LastSequenceID() int64 {
 }
 
 func (p *producer) Flush() error {
-	p.Lock()
-	defer p.Unlock()
+	p.RLock()
+	defer p.RUnlock()
 
 	for _, pp := range p.producers {
 		if err := pp.Flush(); err != nil {
@@ -245,8 +252,8 @@ func (p *producer) Flush() error {
 }
 
 func (p *producer) Close() {
-	p.Lock()
-	defer p.Unlock()
+	p.RLock()
+	defer p.RUnlock()
 	if p.ticker != nil {
 		p.ticker.Stop()
 	}