You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/01 06:47:36 UTC

[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #651: fix issue 650,different handle sequence value

wolfstudy commented on a change in pull request #651:
URL: https://github.com/apache/pulsar-client-go/pull/651#discussion_r739988750



##########
File path: pulsar/producer_partition.go
##########
@@ -784,57 +784,57 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
 		return
 	}
 
-	if pi.sequenceID != response.GetSequenceId() {
+	if pi.sequenceID < response.GetSequenceId() {

Review comment:
       @baomingyu The java logic as follows:
   
   ```
   if (sequenceId > op.sequenceId) {
                   log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
                           op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
                   // Force connection closing so that messages can be re-transmitted in a new connection
                   cnx.channel().close();
                   return;
               } else if (sequenceId < op.sequenceId) {
                   // Ignoring the ack since it's referring to a message that has already timed out.
                   if (log.isDebugEnabled()) {
                       log.debug("[{}] [{}] Got ack for timed out msg. expecting: {} - {} - got: {} - {}", topic, producerName,
                               op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId);
                   }
                   return;
               } else {
                   // Add check `sequenceId >= highestSequenceId` for backward compatibility.
                   if (sequenceId >= highestSequenceId || highestSequenceId == op.highestSequenceId) {
                       // Message was persisted correctly
                       if (log.isDebugEnabled()) {
                           log.debug("[{}] [{}] Received ack for msg {} ", topic, producerName, sequenceId);
                       }
                       pendingMessages.remove();
                       releaseSemaphoreForSendOp(op);
                   } else {
                       log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
                               op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
                       // Force connection closing so that messages can be re-transmitted in a new connection
                       cnx.channel().close();
                       return;
                   }
               }
   ```
   
   In the previous processing, when the sequence ID is greater than the sequenceID of the response, we also close the current connection, expecting it to restore the logic here through internal retry.
   
   The current implementation of pr ignores the case of `pi.sequenceID <response.GetSequenceId()`. When `pi.sequenceID <response.GetSequenceId()`, we should return directly here




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org