You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/06/14 00:44:10 UTC

[GitHub] [kafka] artemlivshits commented on a diff in pull request #12181: KAFKA-13916; Fenced replicas should not be allowed to join the ISR in KRaft (KIP-841, Part 2)

artemlivshits commented on code in PR #12181:
URL: https://github.com/apache/kafka/pull/12181#discussion_r896208216


##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
   ): Boolean = {
     alterPartitionListener.markFailed()
     error match {
-      case Errors.OPERATION_NOT_ATTEMPTED =>
-        // Since the operation was not attempted, it is safe to reset back to the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, LeaderRecoveryState.RECOVERED)
-        debug(s"Failed to alter partition to $proposedIsrState since there is a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state $partitionState")
+      case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+        // Care must be taken when resetting to the last committed state since we may not
+        // know in general whether the request was applied or not taking into account retries
+        // and controller changes which might have occurred before we received the response.
+        // However, when the controller returns INELIGIBLE_REPLICA (or OPERATION_NOT_COMMITTED),

Review Comment:
   Comment error code doesn't seem to match the code.



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
   ): Boolean = {
     alterPartitionListener.markFailed()
     error match {
-      case Errors.OPERATION_NOT_ATTEMPTED =>
-        // Since the operation was not attempted, it is safe to reset back to the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, LeaderRecoveryState.RECOVERED)
-        debug(s"Failed to alter partition to $proposedIsrState since there is a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state $partitionState")
+      case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+        // Care must be taken when resetting to the last committed state since we may not
+        // know in general whether the request was applied or not taking into account retries
+        // and controller changes which might have occurred before we received the response.
+        // However, when the controller returns INELIGIBLE_REPLICA (or OPERATION_NOT_COMMITTED),
+        // the controller is explicitly telling us 1) that the current partition epoch is correct,
+        // and 2) that the request was not applied. Even if the controller that sent the response
+        // is stale, we are guaranteed from the monotonicity of the controller epoch that the
+        // request could not have been applied by any past or future controller.
+        partitionState = proposedIsrState.lastCommittedState

Review Comment:
   In KRaft mode, could the state be updated via metadata and applied concurrently such that processing this would override a concurrently updated last state?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -1569,24 +1628,47 @@ class Partition(val topicPartition: TopicPartition,
   ): Boolean = {
     alterPartitionListener.markFailed()
     error match {
-      case Errors.OPERATION_NOT_ATTEMPTED =>
-        // Since the operation was not attempted, it is safe to reset back to the committed state.
-        partitionState = CommittedPartitionState(proposedIsrState.isr, LeaderRecoveryState.RECOVERED)
-        debug(s"Failed to alter partition to $proposedIsrState since there is a pending AlterPartition still inflight. " +
-          s"partition state has been reset to the latest committed state $partitionState")
+      case Errors.OPERATION_NOT_ATTEMPTED | Errors.INELIGIBLE_REPLICA =>
+        // Care must be taken when resetting to the last committed state since we may not
+        // know in general whether the request was applied or not taking into account retries
+        // and controller changes which might have occurred before we received the response.
+        // However, when the controller returns INELIGIBLE_REPLICA (or OPERATION_NOT_COMMITTED),
+        // the controller is explicitly telling us 1) that the current partition epoch is correct,
+        // and 2) that the request was not applied. Even if the controller that sent the response
+        // is stale, we are guaranteed from the monotonicity of the controller epoch that the
+        // request could not have been applied by any past or future controller.
+        partitionState = proposedIsrState.lastCommittedState
+        info(s"Failed to alter partition to $proposedIsrState since the controller rejected the request with $error. " +
+          s"Partition state has been reset to the latest committed state $partitionState.")
         false
       case Errors.UNKNOWN_TOPIC_OR_PARTITION =>
         debug(s"Failed to alter partition to $proposedIsrState since the controller doesn't know about " +
-          "this topic or partition. Giving up.")
+          "this topic or partition. Partition state may be out of thing, awaiting new the latest metadata.")

Review Comment:
   out of sync?



##########
core/src/main/scala/kafka/cluster/Partition.scala:
##########
@@ -847,21 +861,35 @@ class Partition(val topicPartition: TopicPartition,
   }
 
   private def needsExpandIsr(followerReplica: Replica): Boolean = {
-    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerAtHighwatermark(followerReplica)
+    canAddReplicaToIsr(followerReplica.brokerId) && isFollowerInSync(followerReplica)
   }
 
   private def canAddReplicaToIsr(followerReplicaId: Int): Boolean = {
     val current = partitionState
-    !current.isInflight && !current.isr.contains(followerReplicaId)
+    !current.isInflight &&
+      !current.isr.contains(followerReplicaId) &&
+      isReplicaIsrEligible(followerReplicaId)
   }
 
-  private def isFollowerAtHighwatermark(followerReplica: Replica): Boolean = {
+  private def isFollowerInSync(followerReplica: Replica): Boolean = {
     leaderLogIfLocal.exists { leaderLog =>
       val followerEndOffset = followerReplica.stateSnapshot.logEndOffset
       followerEndOffset >= leaderLog.highWatermark && leaderEpochStartOffsetOpt.exists(followerEndOffset >= _)
     }
   }
 
+  private def isReplicaIsrEligible(followerReplicaId: Int): Boolean = {
+    metadataCache match {
+      // In KRaft mode, only replicas which are not fenced nor in controlled shutdown are
+      // allowed to join the ISR. This does not apply to ZK mode.
+      case kRaftMetadataCache: KRaftMetadataCache =>
+        !kRaftMetadataCache.isBrokerFenced(followerReplicaId) &&
+          !kRaftMetadataCache.isBrokerShuttingDown(followerReplicaId)
+
+      case _ => true
+    }

Review Comment:
   Would it be better to encapsulate the KRaft / ZK behavior difference in the metadataCache?  Then this function would just call the metadataCache without explicit checking the kind of the cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org