You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/11/01 06:47:36 UTC
[GitHub] [pulsar-client-go] wolfstudy commented on a change in pull request #651: fix issue 650,different handle sequence value
wolfstudy commented on a change in pull request #651:
URL: https://github.com/apache/pulsar-client-go/pull/651#discussion_r739988750
##########
File path: pulsar/producer_partition.go
##########
@@ -784,57 +784,57 @@ func (p *partitionProducer) ReceivedSendReceipt(response *pb.CommandSendReceipt)
return
}
- if pi.sequenceID != response.GetSequenceId() {
+ if pi.sequenceID < response.GetSequenceId() {
Review comment:
@baomingyu The java logic as follows:
```
if (sequenceId > op.sequenceId) {
log.warn("[{}] [{}] Got ack for msg. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
// Force connection closing so that messages can be re-transmitted in a new connection
cnx.channel().close();
return;
} else if (sequenceId < op.sequenceId) {
// Ignoring the ack since it's referring to a message that has already timed out.
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Got ack for timed out msg. expecting: {} - {} - got: {} - {}", topic, producerName,
op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId);
}
return;
} else {
// Add check `sequenceId >= highestSequenceId` for backward compatibility.
if (sequenceId >= highestSequenceId || highestSequenceId == op.highestSequenceId) {
// Message was persisted correctly
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received ack for msg {} ", topic, producerName, sequenceId);
}
pendingMessages.remove();
releaseSemaphoreForSendOp(op);
} else {
log.warn("[{}] [{}] Got ack for batch msg error. expecting: {} - {} - got: {} - {} - queue-size: {}", topic, producerName,
op.sequenceId, op.highestSequenceId, sequenceId, highestSequenceId, pendingMessages.size());
// Force connection closing so that messages can be re-transmitted in a new connection
cnx.channel().close();
return;
}
}
```
In the previous processing, when the sequence ID is greater than the sequenceID of the response, we also close the current connection, expecting it to restore the logic here through internal retry.
The current implementation of pr ignores the case of `pi.sequenceID <response.GetSequenceId()`. When `pi.sequenceID <response.GetSequenceId()`, we should return directly here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org