You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "dajac (via GitHub)" <gi...@apache.org> on 2023/06/02 20:00:28 UTC

[GitHub] [kafka] dajac commented on a diff in pull request #13765: KAFKA-15021; Skip leader epoch bump on ISR shrink

dajac commented on code in PR #13765:
URL: https://github.com/apache/kafka/pull/13765#discussion_r1214750188


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1091,8 +1091,10 @@ class Partition(val topicPartition: TopicPartition,
       // Note here we are using the "maximal", see explanation above
       val replicaState = replica.stateSnapshot
       if (replicaState.logEndOffsetMetadata.messageOffset < newHighWatermark.messageOffset &&
-        (replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs)
-          || partitionState.maximalIsr.contains(replica.brokerId))) {
+          ((replicaState.isCaughtUp(leaderLogEndOffset.messageOffset, currentTimeMs, replicaLagTimeMaxMs) &&
+            isReplicaIsrEligible(replica.brokerId)) ||

Review Comment:
   Is it worth extracting this condition into an helper method (e.g. isIsrEligibleAndCaughtUp)? That would simplify the condition.



##########
core/src/test/scala/unit/kafka/cluster/PartitionTest.scala:
##########
@@ -1456,6 +1456,105 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(alterPartitionListener.failures.get, 1)
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("fenced", "shutdown", "unfenced"))
+  def testHWMIncreasesWithFencedOrShutdownFollower(brokerState: String): Unit = {

Review Comment:
   nit: s/HWM/HighWatermark?



##########
core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala:
##########
@@ -357,6 +363,51 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness {
     }
   }
 
+  @ParameterizedTest
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendToPartitionWithFollowerShutdown(quorum: String): Unit = {

Review Comment:
   nit: `*ShouldNotTimeout`? it would be great to capture the issue in the test name or to add a comment about it.



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -260,11 +265,17 @@ private void tryElection(PartitionChangeRecord record) {
      * NO_LEADER_CHANGE, a leader epoch bump will automatically occur. That takes care of
      * case 1. In this function, we check for cases 2 and 3, and handle them by manually
      * setting record.leader to the current leader.
+     *
+     * In MV before 3.6 there was a bug (KAFKA-15021) in the brokers' replica manager
+     * that required that the leader epoch be bump whenever the ISR shrank. In MV 3.6 this leader
+     * bump is not required when the ISR shrinks.
      */
     void triggerLeaderEpochBumpIfNeeded(PartitionChangeRecord record) {

Review Comment:
   For my understanding, do we bump the leader epoch when the ISR is expanded? My understanding is that we don't.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org