You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2021/05/17 21:31:40 UTC
[kafka] branch 2.8 updated: KAFKA-12751: Reset AlterIsr in-flight
state for duplicate update requests (#10633)
This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new aebca9b KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)
aebca9b is described below
commit aebca9b69bec2b5e7d0744cf1acf732d6bc1d9b7
Author: Rajini Sivaram <ra...@googlemail.com>
AuthorDate: Mon May 17 19:31:39 2021 +0100
KAFKA-12751: Reset AlterIsr in-flight state for duplicate update requests (#10633)
Reviewers: David Arthur <da...@confluent.io>
---
core/src/main/scala/kafka/cluster/Partition.scala | 7 ++++++-
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala | 7 +++++++
2 files changed, 13 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 5442fc8..917ca50 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1412,10 +1412,15 @@ class Partition(val topicPartition: TopicPartition,
if (leaderAndIsr.leaderEpoch != leaderEpoch) {
debug(s"Ignoring new ISR ${leaderAndIsr} since we have a stale leader epoch $leaderEpoch.")
isrChangeListener.markFailed()
- } else if (leaderAndIsr.zkVersion <= zkVersion) {
+ } else if (leaderAndIsr.zkVersion < zkVersion) {
debug(s"Ignoring new ISR ${leaderAndIsr} since we have a newer version $zkVersion.")
isrChangeListener.markFailed()
} else {
+ // This is one of two states:
+ // 1) leaderAndIsr.zkVersion > zkVersion: Controller updated to new version with proposedIsrState.
+ // 2) leaderAndIsr.zkVersion == zkVersion: No update was performed since proposed and actual state are the same.
+ // In both cases, we want to move from Pending to Committed state to ensure new updates are processed.
+
isrState = CommittedIsr(leaderAndIsr.isr.toSet)
zkVersion = leaderAndIsr.zkVersion
info(s"ISR updated to ${isrState.isr.mkString(",")} and version updated to [$zkVersion]")
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 9e66969..d569c89 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -1612,6 +1612,13 @@ class PartitionTest extends AbstractPartitionTest {
case _ => fail("Expected a committed ISR following Zk expansion")
}
+ // Verify duplicate request. In-flight state should be reset even though version hasn't changed.
+ doAnswer(_ => (true, 2))
+ .when(kafkaZkClient)
+ .conditionalUpdatePath(anyString(), any(), ArgumentMatchers.eq(2), any())
+ partition.expandIsr(follower3)
+ TestUtils.waitUntilTrue(() => !partition.isrState.isInflight, "Expected ISR state to be committed", 100)
+
scheduler.shutdown()
}