You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "xuexiaoyue (Jira)" <ji...@apache.org> on 2022/04/21 08:46:00 UTC

[jira] [Created] (KAFKA-13843) States tracked in TransactionManager should not be changed when receiving success response on an expired batch

xuexiaoyue created KAFKA-13843:
----------------------------------

             Summary: States tracked in TransactionManager should not be changed when receiving success response on an expired batch
                 Key: KAFKA-13843
                 URL: https://issues.apache.org/jira/browse/KAFKA-13843
             Project: Kafka
          Issue Type: Bug
            Reporter: xuexiaoyue


When a batch's delivery timeout has expired but later the client receives success response from the server. Sender will call `transactionManager.handleCompletedBatch(batch, response)` without judging if it was completed before. And in `transactionManager.handleCompletedBatch` method, some states tracked in `topicPartitionBookkeeper` will be updated incorrectly.
{code:java}
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
    if (transactionManager != null) {
        transactionManager.handleCompletedBatch(batch, response);
    }

    if (batch.complete(response.baseOffset, response.logAppendTime)) {
        maybeRemoveAndDeallocateBatch(batch);
    }
} 

public synchronized void handleCompletedBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response) {
    int lastAckedSequence = maybeUpdateLastAckedSequence(batch.topicPartition, batch.lastSequence());
    log.debug("ProducerId: {}; Set last ack'd sequence number for topic-partition {} to {}",
            batch.producerId(),
            batch.topicPartition,
            lastAckedSequence);

    updateLastAckedOffset(response, batch);
    removeInFlightBatch(batch);
}{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)