You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/24 00:58:49 UTC

[GitHub] [kafka] hachikuji commented on a diff in pull request #12181: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841)

hachikuji commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r879967287


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1574,6 +1586,13 @@ class Partition(val topicPartition: TopicPartition,
         debug(s"Failed to alter partition to $proposedIsrState since there is a pending AlterPartition still inflight. " +
           s"partition state has been reset to the latest committed state $partitionState")
         false
+      case Errors.INELIGIBLE_REPLICA =>
+        // Since the operation was rejected, it is safe to reset back to the committed state. This
+        // assumes that the current state was still the correct expected state.
+        partitionState = CommittedPartitionState(proposedIsrState.isr, LeaderRecoveryState.RECOVERED)

Review Comment:
   Would it be useful if `PendingPartitionChange` had a reference to the previous committed state?



##########
metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java:
##########
@@ -498,6 +498,23 @@ public UsableBroker next() {
         }
     }
 
+    /**
+     * Check if the given broker is active. A broker is active if it is unfenced,
+     * and not shutting down.

Review Comment:
   I'm still inclined to add another state to the broker's registration to represent the fact that it is shutting down. This seems useful for the cluster to know as a whole. At a minimum, it may be useful for debugging, but it it would also prevent unnecessary AlterPartition requests from getting sent to the controller.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1574,6 +1586,13 @@ class Partition(val topicPartition: TopicPartition,
         debug(s"Failed to alter partition to $proposedIsrState since there is a pending AlterPartition still inflight. " +
           s"partition state has been reset to the latest committed state $partitionState")
         false
+      case Errors.INELIGIBLE_REPLICA =>

Review Comment:
   We don't know which replica was ineligible. I wonder if we can get into a tight retry loop while we are awaiting the fenced state. Should we have a minimum backoff before we attempt additional ISR changes?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -846,21 +846,33 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerAtHighwatermark(followerReplica)
+    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerInSync(followerReplica)
   }
 
   private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
     val current = partitionState
-    !current.isInflight && !current.isr.contains(followerReplicaId)
+    !current.isInflight &&
+      !current.isr.contains(followerReplicaId) &&
+      isBrokerIsrEligible(followerReplicaId)
   }
 
-  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+  private def isFollowerInSync(followerReplica: Replica): Boolean = {
     leaderLogIfLocal.exists { leaderLog =>
       val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
       followerEndOffset >= leaderLog.highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
     }
   }
 
+  private def isBrokerIsrEligible(brokerId: Int): Boolean = {
+    // With KRaft, a broker is considered alive if it is unfenced. If it
+    // is fenced or not present, the leader prevent it to be added back
+    // to the ISR.
+    // With ZK, the leader only prevent dead brokers to be added back
+    // to the ISR. This is really unlikely given that replicas are

Review Comment:
   I didn't follow this comment. Can you clarify?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org