You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/05/17 21:55:29 UTC

[kafka] branch 2.7 updated: KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new aee9edc  KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)
aee9edc is described below

commit aee9edce6a39138d3a5ce87d4511040ccd40fa8f
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon May 17 19:31:39 2021 +0100

    KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)
    
    Reviewers: David Arthur <da...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 4e06198..586afbf 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1442,10 +1442,15 @@ class Partition(val topicPartition: TopicPartition,
           if (leaderAndIsr.leaderEpoch != leaderEpoch) {
             debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we have a stale leader epoch $leaderEpoch.")
             isrChangeListener.markFailed()
-          } else if (leaderAndIsr.zkVersion <= zkVersion) {
+          } else if (leaderAndIsr.zkVersion < zkVersion) {
             debug(s"Ignoring ISR from AlterIsr with ${leaderAndIsr} since we have a newer version $zkVersion.")
             isrChangeListener.markFailed()
           } else {
+            // This is one of two states:
+            //   1) leaderAndIsr.zkVersion > zkVersion: Controller updated to new version with proposedIsrState.
+            //   2) leaderAndIsr.zkVersion == zkVersion: No update was performed since proposed and actual state are the same.
+            // In both cases, we want to move from Pending to Committed state to ensure new updates are processed.
+
             isrState = CommittedIsr(leaderAndIsr.isr.toSet)
             zkVersion = leaderAndIsr.zkVersion
             info(s"ISR updated from AlterIsr to ${isrState.isr.mkString(",")} and version updated to [$zkVersion]")