You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/27 23:22:27 UTC

[pulsar-client-go] branch master updated: Fix producer state by reconnecting when receiving unexpected receipts (#335) (#336)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c7fd4a  Fix producer state by reconnecting when receiving unexpected receipts (#335) (#336)
0c7fd4a is described below

commit 0c7fd4a417edc45c40db59a4ff2d0371cacf6c8b
Author: Denis Vergnes <de...@gmail.com>
AuthorDate: Mon Jul 27 16:22:18 2020 -0700

    Fix producer state by reconnecting when receiving unexpected receipts (#335) (#336)
---
 pulsar/producer_partition.go | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 18f22f6..a16193f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -458,13 +458,21 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	pi, ok := p.pendingQueue.Peek().(*pendingItem)
 
 	if !ok {
-		p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId())
+		// if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
+		// At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves
+		// the state discrepancy.
+		p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId())
+		p.cnx.Close()
 		return
 	}
 
 	if pi.sequenceID != response.GetSequenceId() {
-		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(),
+		// if we receive a receipt that is not the one expected, the state of the broker and the producer differs.
+		// At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves
+		// the state discrepancy.
+		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
 			response.GetSequenceId(), pi.sequenceID)
+		p.cnx.Close()
 		return
 	}