You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/04/20 16:59:46 UTC

[pulsar-client-go] branch master updated: Fixed race condition in producer Flush() operation (#229)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new fcda9b5  Fixed race condition in producer Flush() operation (#229)
fcda9b5 is described below

commit fcda9b53af920601ae3e9da17d7c5e0bb508d82d
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Mon Apr 20 09:59:38 2020 -0700

    Fixed race condition in producer Flush() operation (#229)
---
 pulsar/producer_partition.go | 21 +++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index f5ffd45..e214afd 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -304,6 +304,7 @@ type pendingItem struct {
 	batchData    []byte
 	sequenceID   uint64
 	sendRequests []interface{}
+	completed    bool
 }
 
 func (p *partitionProducer) internalFlushCurrentBatch() {
@@ -329,6 +330,19 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		return
 	}
 
+	// lock the pending request while adding requests
+	// since the ReceivedSendReceipt func iterates over this list
+	pi.Lock()
+	defer pi.Unlock()
+
+	if pi.completed {
+		// The last item in the queue has been completed while we were
+		// looking at it. It's safe at this point to assume that every
+		// message enqueued before Flush() was called are now persisted
+		fr.waitGroup.Done()
+		return
+	}
+
 	sendReq := &sendRequest{
 		msg: nil,
 		callback: func(id MessageID, message *ProducerMessage, e error) {
@@ -337,11 +351,7 @@ func (p *partitionProducer) internalFlush(fr *flushRequest) {
 		},
 	}
 
-	// lock the pending request while adding requests
-	// since the ReceivedSendReceipt func iterates over this list
-	pi.Lock()
 	pi.sendRequests = append(pi.sendRequests, sendReq)
-	pi.Unlock()
 }
 
 func (p *partitionProducer) Send(ctx context.Context, msg *ProducerMessage) (MessageID, error) {
@@ -422,6 +432,9 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 			sr.callback(msgID, sr.msg, nil)
 		}
 	}
+
+	// Mark this pending item as done
+	pi.completed = true
 }
 
 func (p *partitionProducer) internalClose(req *closeProducer) {