You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/04/20 16:59:46 UTC
[pulsar-client-go] branch master updated: Fixed race condition in
producer Flush() operation (#229)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new fcda9b5 Fixed race condition in producer Flush() operation (#229)
fcda9b5 is described below
commit fcda9b53af920601ae3e9da17d7c5e0bb508d82d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Apr 20 09:59:38 2020 -0700
Fixed race condition in producer Flush() operation (#229)
---
pulsar/producer_partition.go | 21 +++++++++++++++++----
1 file changed, 17 insertions(+), 4 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f5ffd45..e214afd 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -304,6 +304,7 @@ type pendingItem struct {
batchData []byte
sequenceID uint64
sendRequests []interface{}
+ completed bool
}
func (p *partitionProducer) internalFlushCurrentBatch() {
@@ -329,6 +330,19 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
return
}
+ // lock the pending request while adding requests
+ // since the ReceivedSendReceipt func iterates over this list
+ pi.Lock()
+ defer pi.Unlock()
+
+ if pi.completed {
+ // The last item in the queue has been completed while we were
+ // looking at it. It's safe at this point to assume that every
+ // message enqueued before Flush() was called are now persisted
+ fr.waitGroup.Done()
+ return
+ }
+
sendReq := &sendRequest{
msg: nil,
callback: func(id MessageID, message *ProducerMessage, e error) {
@@ -337,11 +351,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
},
}
- // lock the pending request while adding requests
- // since the ReceivedSendReceipt func iterates over this list
- pi.Lock()
pi.sendRequests = append(pi.sendRequests, sendReq)
- pi.Unlock()
}
func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
@@ -422,6 +432,9 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
sr.callback(msgID, sr.msg, nil)
}
}
+
+ // Mark this pending item as done
+ pi.completed = true
}
func (p *partitionProducer) internalClose(req *closeProducer) {