You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/07/27 23:22:27 UTC
[pulsar-client-go] branch master updated: Fix producer state by
reconnecting when receiving unexpected receipts (#335) (#336)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 0c7fd4a Fix producer state by reconnecting when receiving unexpected receipts (#335) (#336)
0c7fd4a is described below
commit 0c7fd4a417edc45c40db59a4ff2d0371cacf6c8b
Author: Denis Vergnes <de...@gmail.com>
AuthorDate: Mon Jul 27 16:22:18 2020 -0700
Fix producer state by reconnecting when receiving unexpected receipts (#335) (#336)
---
pulsar/producer_partition.go | 12 ++++++++++--
1 file changed, 10 insertions(+), 2 deletions(-)
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 18f22f6..a16193f 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -458,13 +458,21 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
pi, ok := p.pendingQueue.Peek().(*pendingItem)
if !ok {
- p.log.Warnf("Received ack for %v although the pending queue is empty", response.GetMessageId())
+ // if we receive a receipt although the pending queue is empty, the state of the broker and the producer differs.
+ // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves
+ // the state discrepancy.
+ p.log.Warnf("Received ack for %v although the pending queue is empty, closing connection", response.GetMessageId())
+ p.cnx.Close()
return
}
if pi.sequenceID != response.GetSequenceId() {
- p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v", response.GetMessageId(),
+ // if we receive a receipt that is not the one expected, the state of the broker and the producer differs.
+ // At that point, it is better to close the connection to the broker to reconnect to a broker hopping it solves
+ // the state discrepancy.
+ p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
response.GetSequenceId(), pi.sequenceID)
+ p.cnx.Close()
return
}