You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2022/05/11 15:09:16 UTC

[pulsar-client-go] branch master updated: Fix sequenceID is not equal to cause the connection to be closed incorrectly (#774)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new d9b1083  Fix sequenceID is not equal to cause the connection to be closed incorrectly (#774)
d9b1083 is described below

commit d9b108351798569fd1c5b1c7a27a3da96fc2decd
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Wed May 11 23:09:10 2022 +0800

    Fix sequenceID is not equal to cause the connection to be closed incorrectly (#774)
    
    Signed-off-by: xiaolongran <xi...@tencent.com>
    
    ### Motivation
    
    When processing the sendReceipt command, if the sequenceID returned by the broker is greater than the sequenceID in the current pendingQueue, we need to close the current connection to fix the inconsistency between the broker and the client state.
    When the sequenceID returned by the broker is smaller than the sequenceID in the current pendingQueue, we do not need to close the current connection, and expect to increment the value of the returned sequenceID when the broker retries next time.
    
    The current code processing logic is just the opposite, resulting in the failure to recover after the first situation occurs, and a phenomenon similar to the following occurs:
---
 pulsar/producer_partition.go | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 87d1d69..5a2694c 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -785,15 +785,15 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 	}
 
 	if pi.sequenceID < response.GetSequenceId() {
-		// Ignoring the ack since it's referring to a message that has already timed out.
+		// Force connection closing so that messages can be re-transmitted in a new connection
 		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
 			response.GetSequenceId(), pi.sequenceID)
+		p._getConn().Close()
 		return
 	} else if pi.sequenceID > response.GetSequenceId() {
-		// Force connection closing so that messages can be re-transmitted in a new connection
+		// Ignoring the ack since it's referring to a message that has already timed out.
 		p.log.Warnf("Received ack for %v on sequenceId %v - expected: %v, closing connection", response.GetMessageId(),
 			response.GetSequenceId(), pi.sequenceID)
-		p._getConn().Close()
 		return
 	} else {
 		// The ack was indeed for the expected item in the queue, we can remove it and trigger the callback