You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/02/20 01:07:57 UTC

[kafka] branch trunk updated: KAFKA-9672: Leader with ISR as a superset of replicas (#9631)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d3612eb  KAFKA-9672: Leader with ISR as a superset of replicas (#9631)
d3612eb is described below

commit d3612ebc775ea401e6170b3248d146b228d85d1f
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Feb 19 17:07:01 2021 -0800

    KAFKA-9672: Leader with ISR as a superset of replicas (#9631)
    
    It is possible for the the controller to send LeaderAndIsr requests with
    an ISR that contains ids not in the replica set. This is used during
    reassignment so that the partition leader doesn't add replicas back to
    the ISR. This is needed because the controller updates ZK and the
    replicas through two rounds:
    
    1. The first round of ZK updates and LeaderAndIsr requests shrinks the ISR.
    
    2. The second round of ZK updates and LeaderAndIsr requests shrinks the replica
    set.
    
    This could be avoided by doing 1. and 2. in one round. Unfortunately the
    current controller implementation makes that non-trivial.
    
    This commit changes the leader to allow the state where the ISR contains
    ids that are not in the replica set and to remove such ids from the ISR
    if required.
    
    Reviewers: Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/cluster/Partition.scala | 21 +++++++++++----------
 1 file changed, 11 insertions(+), 10 deletions(-)

diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index cfd029b..99ae4ab 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -352,10 +352,6 @@ class Partition(val topicPartition: TopicPartition,
 
   def getReplica(replicaId: Int): Option[Replica] = Option(remoteReplicasMap.get(replicaId))
 
-  private def getReplicaOrException(replicaId: Int): Replica = getReplica(replicaId).getOrElse{
-    throw new NotLeaderOrFollowerException(s"Replica with id $replicaId is not available on broker $localBrokerId")
-  }
-
   private def checkCurrentLeaderEpoch(remoteLeaderEpochOpt: Optional[Integer]): Errors = {
     if (!remoteLeaderEpochOpt.isPresent) {
       Errors.NONE
@@ -826,7 +822,7 @@ class Partition(val topicPartition: TopicPartition,
             case (brokerId, logEndOffset) => s"broker $brokerId: $logEndOffset"
           }
 
-          val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).map(getReplicaOrException)
+          val curInSyncReplicaObjects = (curMaximalIsr - localBrokerId).flatMap(getReplica)
           val replicaInfo = curInSyncReplicaObjects.map(replica => (replica.brokerId, replica.logEndOffset))
           val localLogInfo = (localBrokerId, localLogOrException.logEndOffset)
           val (ackedReplicas, awaitingReplicas) = (replicaInfo + localLogInfo).partition { _._2 >= requiredOffset}
@@ -948,11 +944,15 @@ class Partition(val topicPartition: TopicPartition,
         val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
         if (outOfSyncReplicaIds.nonEmpty) {
           val outOfSyncReplicaLog = outOfSyncReplicaIds.map { replicaId =>
-            s"(brokerId: $replicaId, endOffset: ${getReplicaOrException(replicaId).logEndOffset})"
+            val logEndOffsetMessage = getReplica(replicaId)
+              .map(_.logEndOffset.toString)
+              .getOrElse("unknown")
+            s"(brokerId: $replicaId, endOffset: $logEndOffsetMessage)"
           }.mkString(" ")
           val newIsrLog = (isrState.isr -- outOfSyncReplicaIds).mkString(",")
           info(s"Shrinking ISR from ${isrState.isr.mkString(",")} to $newIsrLog. " +
-               s"Leader: (highWatermark: ${leaderLog.highWatermark}, endOffset: ${leaderLog.logEndOffset}). " +
+               s"Leader: (highWatermark: ${leaderLog.highWatermark}, " +
+               s"endOffset: ${leaderLog.logEndOffset}). " +
                s"Out of sync replicas: $outOfSyncReplicaLog.")
 
           shrinkIsr(outOfSyncReplicaIds)
@@ -978,9 +978,10 @@ class Partition(val topicPartition: TopicPartition,
                                   leaderEndOffset: Long,
                                   currentTimeMs: Long,
                                   maxLagMs: Long): Boolean = {
-    val followerReplica = getReplicaOrException(replicaId)
-    followerReplica.logEndOffset != leaderEndOffset &&
-      (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+    getReplica(replicaId).fold(true) { followerReplica =>
+      followerReplica.logEndOffset != leaderEndOffset &&
+        (currentTimeMs - followerReplica.lastCaughtUpTimeMs) > maxLagMs
+    }
   }
 
   /**