You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/05/17 21:31:40 UTC

[kafka] branch 2.8 updated: KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new aebca9b  KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)
aebca9b is described below

commit aebca9b69bec2b5e7d0744cf1acf732d6bc1d9b7
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon May 17 19:31:39 2021 +0100

    KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)
    
    Reviewers: David Arthur <da...@confluent.io>
---
 core/src/main/scala/kafka/cluster/Partition.scala          | 7 ++++++-
 core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 7 +++++++
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 5442fc8..917ca50 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1412,10 +1412,15 @@ class Partition(val topicPartition: TopicPartition,
           if (leaderAndIsr.leaderEpoch != leaderEpoch) {
             debug(s"Ignoring new ISR ${leaderAndIsr} since we have a stale leader epoch $leaderEpoch.")
             isrChangeListener.markFailed()
-          } else if (leaderAndIsr.zkVersion <= zkVersion) {
+          } else if (leaderAndIsr.zkVersion < zkVersion) {
             debug(s"Ignoring new ISR ${leaderAndIsr} since we have a newer version $zkVersion.")
             isrChangeListener.markFailed()
           } else {
+            // This is one of two states:
+            //   1) leaderAndIsr.zkVersion > zkVersion: Controller updated to new version with proposedIsrState.
+            //   2) leaderAndIsr.zkVersion == zkVersion: No update was performed since proposed and actual state are the same.
+            // In both cases, we want to move from Pending to Committed state to ensure new updates are processed.
+
             isrState = CommittedIsr(leaderAndIsr.isr.toSet)
             zkVersion = leaderAndIsr.zkVersion
             info(s"ISR updated to ${isrState.isr.mkString(",")} and version updated to [$zkVersion]")
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 9e66969..d569c89 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1612,6 +1612,13 @@ class PartitionTest extends AbstractPartitionTest {
       case _ => fail("Expected a committed ISR following Zk expansion")
     }
 
+    // Verify duplicate request. In-flight state should be reset even though version hasn't changed.
+    doAnswer(_ => (true, 2))
+      .when(kafkaZkClient)
+      .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(2), any())
+    partition.expandIsr(follower3)
+    TestUtils.waitUntilTrue(() => !partition.isrState.isInflight, "Expected ISR state to be committed", 100)
+
     scheduler.shutdown()
   }