You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/18 21:57:51 UTC
[pulsar-client-go] branch master updated: Avoid contention on
producer mutex on critical path (#286)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 861d7af Avoid contention on producer mutex on critical path (#286)
861d7af is described below
commit 861d7af1fbc7d9b252fc430fccb9db7dec9e7924
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Thu Jun 18 14:57:45 2020 -0700
Avoid contention on producer mutex on critical path (#286)
* Avoid contention on producer mutex on critical path
* Circumvent the race detector
* Removed space
---
pulsar/producer_impl.go | 47 +++++++++++++++++++++++++++--------------------
1 file changed, 27 insertions(+), 20 deletions(-)
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index fc9019a..35dae28 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -22,6 +22,7 @@ import (
"sync"
"sync/atomic"
"time"
+ "unsafe"
log "github.com/sirupsen/logrus"
@@ -29,11 +30,12 @@ import (
)
type producer struct {
- sync.Mutex
+ sync.RWMutex
client *client
options *ProducerOptions
topic string
producers []Producer
+ producersPtr unsafe.Pointer
numPartitions uint32
messageRouter func(*ProducerMessage, TopicMetadata) int
ticker *time.Ticker
@@ -115,6 +117,7 @@ func (p *producer) internalCreatePartitionsProducers() error {
p.Lock()
defer p.Unlock()
+
oldProducers := p.producers
if oldProducers != nil {
@@ -179,6 +182,7 @@ func (p *producer) internalCreatePartitionsProducers() error {
return err
}
+ atomic.StorePointer(&p.producersPtr, unsafe.Pointer(&p.producers))
atomic.StoreUint32(&p.numPartitions, uint32(len(p.producers)))
return nil
}
@@ -188,8 +192,8 @@ func (p *producer) Topic() string {
}
func (p *producer) Name() string {
- p.Lock()
- defer p.Unlock()
+ p.RLock()
+ defer p.RUnlock()
return p.producers[0].Name()
}
@@ -199,27 +203,30 @@ func (p *producer) NumPartitions() uint32 {
}
func (p *producer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
- p.Lock()
- partition := p.messageRouter(msg, p)
- pp := p.producers[partition]
- p.Unlock()
-
- return pp.Send(ctx, msg)
+ return p.getPartition(msg).Send(ctx, msg)
}
func (p *producer) SendAsync(ctx context.Context, msg *ProducerMessage,
callback func(MessageID, *ProducerMessage, error)) {
- p.Lock()
- partition := p.messageRouter(msg, p)
- pp := p.producers[partition]
- p.Unlock()
+ p.getPartition(msg).SendAsync(ctx, msg, callback)
+}
- pp.SendAsync(ctx, msg, callback)
+func (p *producer) getPartition(msg *ProducerMessage) Producer {
+ // Since partitions can only increase, it's ok if the producers list
+ // is updated in between. The numPartition is updated only after the list.
+ partition := p.messageRouter(msg, p)
+ producers := *(*[]Producer)(atomic.LoadPointer(&p.producersPtr))
+ if partition >= len(producers) {
+ // We read the old producers list while the count was already
+ // updated
+ partition %= len(producers)
+ }
+ return producers[partition]
}
func (p *producer) LastSequenceID() int64 {
- p.Lock()
- defer p.Unlock()
+ p.RLock()
+ defer p.RUnlock()
var maxSeq int64 = -1
for _, pp := range p.producers {
@@ -232,8 +239,8 @@ func (p *producer) LastSequenceID() int64 {
}
func (p *producer) Flush() error {
- p.Lock()
- defer p.Unlock()
+ p.RLock()
+ defer p.RUnlock()
for _, pp := range p.producers {
if err := pp.Flush(); err != nil {
@@ -245,8 +252,8 @@ func (p *producer) Flush() error {
}
func (p *producer) Close() {
- p.Lock()
- defer p.Unlock()
+ p.RLock()
+ defer p.RUnlock()
if p.ticker != nil {
p.ticker.Stop()
}