You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/06 15:47:58 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12085: KAFKA-13790; ReplicaManager should be robust to all partition updates from kraft metadata log

hachikuji commented on code in PR #12085:
URL: https://github.com/apache/kafka/pull/12085#discussion_r866948293


##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2105,6 +2106,203 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  @Test
+  def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state is initialized with unknown offset because it is not
+    // in the ISR.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = 0L,
+      logStartOffset = UnifiedLog.UnknownOffset,
+      logEndOffset = UnifiedLog.UnknownOffset
+    )
+
+    // Follower fetches and updates its replica state.
+    partition.updateFollowerFetchState(
+      followerId = followerId,
+      followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = partition.localLogOrException.logEndOffset
+    )
+
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()

Review Comment:
   I think a legitimate case where this can happen in KRaft is a partition reassignment. The leader epoch is not bumped when we add replicas (only later when we remove them).



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -2105,6 +2106,203 @@ class PartitionTest extends AbstractPartitionTest {
     verify(spyConfigRepository, times(2)).topicConfig(topicPartition.topic())
   }
 
+  @Test
+  def testDoNotResetReplicaStateIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state is initialized with unknown offset because it is not
+    // in the ISR.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = 0L,
+      logStartOffset = UnifiedLog.UnknownOffset,
+      logEndOffset = UnifiedLog.UnknownOffset
+    )
+
+    // Follower fetches and updates its replica state.
+    partition.updateFollowerFetchState(
+      followerId = followerId,
+      followerFetchOffsetMetadata = LogOffsetMetadata(0L),
+      followerStartOffset = 0L,
+      followerFetchTimeMs = time.milliseconds(),
+      leaderEndOffset = partition.localLogOrException.logEndOffset
+    )
+
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(2)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(false)
+
+    assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId)))
+    assertEquals(2, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+
+    // Follower's state has not been reset.
+    assertReplicaState(partition, followerId,
+      lastCaughtUpTimeMs = time.milliseconds(),
+      logStartOffset = 0L,
+      logEndOffset = 0L
+    )
+  }
+
+  @Test
+  def testDoNotUpdateEpochStartOffsetIfLeaderEpochIsNotBumped(): Unit = {
+    val controllerEpoch = 3
+    val leaderId = brokerId
+    val followerId = brokerId + 1
+    val replicas = List(leaderId, followerId)
+    val leaderEpoch = 8
+    val topicId = Uuid.randomUuid()
+
+    val initialLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(1)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(true)
+
+    assertTrue(partition.makeLeader(initialLeaderState, offsetCheckpoints, Some(topicId)))
+    assertEquals(1, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+    assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+
+    val leaderLog = partition.localLogOrException
+    assertEquals(Some(EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+
+    // Write to the log to increment the log end offset.
+    leaderLog.appendAsLeader(MemoryRecords.withRecords(0L, CompressionType.NONE, 0,
+      new SimpleRecord("k1".getBytes, "v1".getBytes),
+      new SimpleRecord("k1".getBytes, "v1".getBytes)
+    ), leaderEpoch = leaderEpoch)
+
+    // makeLeader is called again with the same leader epoch but with
+    // a newer partition epoch.
+    val updatedLeaderState = new LeaderAndIsrPartitionState()
+      .setControllerEpoch(controllerEpoch)
+      .setLeader(leaderId)
+      .setLeaderEpoch(leaderEpoch)
+      .setIsr(List(leaderId).map(Int.box).asJava)
+      .setPartitionEpoch(2)
+      .setReplicas(replicas.map(Int.box).asJava)
+      .setIsNew(false)
+
+    assertFalse(partition.makeLeader(updatedLeaderState, offsetCheckpoints, Some(topicId)))
+    assertEquals(2, partition.getPartitionEpoch)
+    assertEquals(leaderEpoch, partition.getLeaderEpoch)
+    assertEquals(Set(leaderId), partition.partitionState.isr)
+    assertEquals(Some(0L), partition.leaderEpochStartOffsetOpt)
+    assertEquals(Some(EpochEntry(leaderEpoch, 0L)), leaderLog.leaderEpochCache.flatMap(_.latestEntry))
+  }
+
+  @Test
+  def testIgnoreLeaderPartitionStateChangeWithOlderEpoch(): Unit = {

Review Comment:
   nit: ....WithOlderPartitionEpoch?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -548,21 +548,35 @@ class Partition(val topicPartition: TopicPartition,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid]): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper path
+      // Partition state changes are expected to have an partition epoch larger or equal
+      // to the current partition epoch. The later is allowed because the partition epoch

Review Comment:
   nit: latter



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -548,21 +548,35 @@ class Partition(val topicPartition: TopicPartition,
                  highWatermarkCheckpoints: OffsetCheckpoints,
                  topicId: Option[Uuid]): Boolean = {
     val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
-      // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
-      // to maintain the decision maker controller's epoch in the zookeeper path
+      // Partition state changes are expected to have an partition epoch larger or equal
+      // to the current partition epoch. The later is allowed because the partition epoch
+      // is also updated by the AlterPartition response so the new epoch might be known
+      // before a LeaderAndIsr request is received or before an update is received via
+      // the metadata log.
+      if (partitionState.partitionEpoch < partitionEpoch) {
+        stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " +
+          s"and partition state $partitionState since the leader is already at a newer partition epoch $partitionEpoch.")
+        return false
+      }
+
+      // Record the epoch of the controller that made the leadership decision. This is useful while updating the isr
+      // to maintain the decision maker controller's epoch in the zookeeper path.
       controllerEpoch = partitionState.controllerEpoch
 
+      val currentTimeMs = time.milliseconds
+      val isNewLeader = !isLeader
+      val isNewLeaderEpoch = partitionState.leaderEpoch > leaderEpoch
       val isr = partitionState.isr.asScala.map(_.toInt).toSet
       val addingReplicas = partitionState.addingReplicas.asScala.map(_.toInt)
       val removingReplicas = partitionState.removingReplicas.asScala.map(_.toInt)
 
-      if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value()) {
-        stateChangeLogger.info(
-          s"The topic partition $topicPartition was marked as RECOVERING. Leader log recovery is not implemented. " +
-          "Marking the topic partition as RECOVERED."
-        )
+      if (partitionState.leaderRecoveryState == LeaderRecoveryState.RECOVERING.value) {
+        stateChangeLogger.info(s"The topic partition $topicPartition was marked as RECOVERING. Leader log recovery " +

Review Comment:
   A little outside the scope here, but I think this message might cause confusion. Could we leave out the bit about recovery not being implemented?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -576,51 +590,56 @@ class Partition(val topicPartition: TopicPartition,
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
-            s"state change for the partition $topicPartition with leader epoch: $leaderEpoch ", e)
-
+            s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
           return false
       }
 
       val leaderLog = localLogOrException
-      val leaderEpochStartOffset = leaderLog.logEndOffset
-      stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
-        s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark} " +
-        s"ISR ${isr.mkString("[", ",", "]")} addingReplicas ${addingReplicas.mkString("[", ",", "]")} " +
-        s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. Previous leader epoch was $leaderEpoch.")
 
-      // We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
-      leaderEpoch = partitionState.leaderEpoch
-      leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
-      partitionEpoch = partitionState.partitionEpoch
-
-      // In the case of successive leader elections in a short time period, a follower may have
-      // entries in its log from a later epoch than any entry in the new leader's log. In order
-      // to ensure that these followers can truncate to the right offset, we must cache the new
-      // leader epoch and the start offset since it should be larger than any epoch that a follower
-      // would try to query.
-      leaderLog.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
-
-      val isNewLeader = !isLeader
-      val currentTimeMs = time.milliseconds
+      // We update the epoch start offset and the replicas' state only if the leader epoch
+      // has changed.
+      if (isNewLeaderEpoch) {
+        val leaderEpochStartOffset = leaderLog.logEndOffset
+        stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId starts at leader epoch ${partitionState.leaderEpoch} from " +
+          s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark} " +
+          s"ISR ${isr.mkString("[", ",", "]")} addingReplicas ${addingReplicas.mkString("[", ",", "]")} " +
+          s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. Previous leader epoch was $leaderEpoch.")
+
+        // In the case of successive leader elections in a short time period, a follower may have
+        // entries in its log from a later epoch than any entry in the new leader's log. In order
+        // to ensure that these followers can truncate to the right offset, we must cache the new
+        // leader epoch and the start offset since it should be larger than any epoch that a follower
+        // would try to query.
+        leaderLog.maybeAssignEpochStartOffset(partitionState.leaderEpoch, leaderEpochStartOffset)
+
+        // Initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and
+        // lastFetchLeaderLogEndOffset.
+        remoteReplicas.foreach { replica =>
+          replica.resetReplicaState(
+            currentTimeMs = currentTimeMs,
+            leaderEndOffset = leaderEpochStartOffset,
+            isNewLeader = isNewLeader,
+            isFollowerInSync = partitionState.isr.contains(replica.brokerId)
+          )
+        }
 
-      // Initialize lastCaughtUpTime of replicas as well as their lastFetchTimeMs and
-      // lastFetchLeaderLogEndOffset.
-      remoteReplicas.foreach { replica =>
-        replica.resetReplicaState(
-          currentTimeMs = currentTimeMs,
-          leaderEndOffset = leaderEpochStartOffset,
-          isNewLeader = isNewLeader,
-          isFollowerInSync = partitionState.isr.contains(replica.brokerId)
-        )
+        // We update the leader epoch and the leader epoch start offset iff the
+        // leader epoch changed.
+        leaderEpoch = partitionState.leaderEpoch
+        leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
+      } else {
+        stateChangeLogger.info(s"Skipped the become-leader state change for $topicPartition with topic id $topicId " +

Review Comment:
   As long as we're logging at info, I think it would be good to include a bit more information (such as ISR and partition epoch).



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -576,51 +590,56 @@ class Partition(val topicPartition: TopicPartition,
       } catch {
         case e: ZooKeeperClientException =>
           stateChangeLogger.error(s"A ZooKeeper client exception has occurred and makeLeader will be skipping the " +
-            s"state change for the partition $topicPartition with leader epoch: $leaderEpoch ", e)
-
+            s"state change for the partition $topicPartition with leader epoch: $leaderEpoch.", e)
           return false
       }
 
       val leaderLog = localLogOrException
-      val leaderEpochStartOffset = leaderLog.logEndOffset
-      stateChangeLogger.info(s"Leader $topicPartition starts at leader epoch ${partitionState.leaderEpoch} from " +
-        s"offset $leaderEpochStartOffset with high watermark ${leaderLog.highWatermark} " +
-        s"ISR ${isr.mkString("[", ",", "]")} addingReplicas ${addingReplicas.mkString("[", ",", "]")} " +
-        s"removingReplicas ${removingReplicas.mkString("[", ",", "]")}. Previous leader epoch was $leaderEpoch.")
 
-      // We cache the leader epoch here, persisting it only if it's local (hence having a log dir)
-      leaderEpoch = partitionState.leaderEpoch
-      leaderEpochStartOffsetOpt = Some(leaderEpochStartOffset)
-      partitionEpoch = partitionState.partitionEpoch
-
-      // In the case of successive leader elections in a short time period, a follower may have
-      // entries in its log from a later epoch than any entry in the new leader's log. In order
-      // to ensure that these followers can truncate to the right offset, we must cache the new
-      // leader epoch and the start offset since it should be larger than any epoch that a follower
-      // would try to query.
-      leaderLog.maybeAssignEpochStartOffset(leaderEpoch, leaderEpochStartOffset)
-
-      val isNewLeader = !isLeader
-      val currentTimeMs = time.milliseconds
+      // We update the epoch start offset and the replicas' state only if the leader epoch
+      // has changed.
+      if (isNewLeaderEpoch) {
+        val leaderEpochStartOffset = leaderLog.logEndOffset
+        stateChangeLogger.info(s"Leader $topicPartition with topic id $topicId starts at leader epoch ${partitionState.leaderEpoch} from " +

Review Comment:
   Could we add partition epoch to this message? Similarly in the follower message.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org