You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/13 23:46:59 UTC

git commit: KAFKA-1188 Stale LeaderAndIsr request could be handled by the broker on Controller failover; reviewed by Neha, Jun

Updated Branches:
  refs/heads/trunk 7e154a36f -> 3d830c9ef


KAFKA-1188 Stale LeaderAndIsr request could be handled by the broker on Controller failover; reviewed by Neha, Jun


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d830c9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d830c9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d830c9e

Branch: refs/heads/trunk
Commit: 3d830c9ef1a186cd36e4f5c7bdc2dfef2d51edc7
Parents: 7e154a3
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 13 14:46:46 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Feb 13 14:46:52 2014 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  36 +++---
 .../kafka/controller/ReplicaStateMachine.scala  |   5 +-
 .../scala/kafka/server/ReplicaManager.scala     | 117 +++++++++++--------
 3 files changed, 87 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3d830c9e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1087a2e..882b6da 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,8 +28,10 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.controller.KafkaController
 import org.apache.log4j.Logger
 import kafka.message.ByteBufferMessageSet
-import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
+import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping}
 import java.io.IOException
+import scala.Some
+import kafka.common.TopicAndPartition
 
 
 /**
@@ -190,10 +192,11 @@ class Partition(val topic: String,
 
   /**
    *  Make the local replica the follower by setting the new leader and ISR to empty
+   *  If the leader replica id does not change, return false to indicate the replica manager
    */
   def makeFollower(controllerId: Int,
                    partitionStateInfo: PartitionStateInfo,
-                   leaders: Set[Broker], correlationId: Int): Boolean = {
+                   correlationId: Int): Boolean = {
     leaderIsrUpdateLock synchronized {
       val allReplicas = partitionStateInfo.allReplicas
       val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -202,23 +205,18 @@ class Partition(val topic: String,
       // record the epoch of the controller that made the leadership decision. This is useful while updating the isr
       // to maintain the decision maker controller's epoch in the zookeeper path
       controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
-      // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
-      leaders.find(_.id == newLeaderBrokerId) match {
-        case Some(leaderBroker) =>
-          // add replicas that are new
-          allReplicas.foreach(r => getOrCreateReplica(r))
-          // remove assigned replicas that have been removed by the controller
-          (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
-          inSyncReplicas = Set.empty[Replica]
-          leaderEpoch = leaderAndIsr.leaderEpoch
-          zkVersion = leaderAndIsr.zkVersion
-          leaderReplicaIdOpt = Some(newLeaderBrokerId)
-        case None => // we should not come here
-          stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
-                                   "controller %d epoch %d for partition [%s,%d] new leader %d")
-                                    .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
-                                            topic, partitionId, newLeaderBrokerId))
-      }
+      // add replicas that are new
+      allReplicas.foreach(r => getOrCreateReplica(r))
+      // remove assigned replicas that have been removed by the controller
+      (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
+      inSyncReplicas = Set.empty[Replica]
+      leaderEpoch = leaderAndIsr.leaderEpoch
+      zkVersion = leaderAndIsr.zkVersion
+
+      if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId)
+        return false;
+
+      leaderReplicaIdOpt = Some(newLeaderBrokerId)
       true
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d830c9e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5e016d5..37a4800 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -231,8 +231,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
               case Some(currLeaderIsrAndControllerEpoch) =>
                 controller.removeReplicaFromIsr(topic, partition, replicaId) match {
                   case Some(updatedLeaderIsrAndControllerEpoch) =>
-                    // send the shrunk ISR state change request only to the leader
-                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
+                    // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
+                    val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+                    brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
                       topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
                     replicaState.put(partitionAndReplica, OfflineReplica)
                     stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"

http://git-wip-us.apache.org/repos/asf/kafka/blob/3d830c9e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 21bba48..3dd562c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,21 +16,21 @@
  */
 package kafka.server
 
-import kafka.cluster.{Broker, Partition, Replica}
 import collection._
 import mutable.HashMap
-import org.I0Itec.zkclient.ZkClient
-import java.io.{File, IOException}
-import java.util.concurrent.atomic.AtomicBoolean
+import kafka.cluster.{Broker, Partition, Replica}
 import kafka.utils._
 import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
-import java.util.concurrent.TimeUnit
 import kafka.common._
 import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
 import kafka.controller.KafkaController
 import org.apache.log4j.Logger
+import org.I0Itec.zkclient.ZkClient
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.atomic.AtomicBoolean
+import java.io.{IOException, File}
+import java.util.concurrent.TimeUnit
 
 
 object ReplicaManager {
@@ -215,9 +215,9 @@ class ReplicaManager(val config: KafkaConfig,
       val responseMap = new collection.mutable.HashMap[(String, Int), Short]
       if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
         leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
-        stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." +
-          " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId,
-                                                         leaderAndISRRequest.controllerEpoch, controllerEpoch))
+        stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
+          "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
+          leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
         }
         (responseMap, ErrorMapping.StaleControllerEpochCode)
       } else {
@@ -236,17 +236,17 @@ class ReplicaManager(val config: KafkaConfig,
             if(partitionStateInfo.allReplicas.contains(config.brokerId))
               partitionState.put(partition, partitionStateInfo)
             else {
-              stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " +
-                "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]")
-                .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
-                partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId))
+              stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
+                "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
+                .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
+                topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
             }
           } else {
             // Otherwise record the error code in response
-            stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " +
-              "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d")
-              .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
-              partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch))
+            stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
+              "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
+              .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
+              topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch))
             responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
           }
         }
@@ -345,10 +345,11 @@ class ReplicaManager(val config: KafkaConfig,
    */
   private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
                             leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
-    partitionState.foreach(state =>
+    partitionState.foreach { state =>
       stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
         "starting the become-follower transition for partition %s")
-        .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
+        .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
+    }
 
     for (partition <- partitionState.keys)
       responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
@@ -358,47 +359,63 @@ class ReplicaManager(val config: KafkaConfig,
         leaderPartitions --= partitionState.keySet
       }
 
-      partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) =>
-        partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
+      var partitionsToMakeFollower: Set[Partition] = Set()
 
-      replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
-      partitionState.foreach { state =>
+      // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
+      partitionState.foreach{ case (partition, partitionStateInfo) =>
+        val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
+        val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
+        leaders.find(_.id == newLeaderBrokerId) match {
+          case Some(leaderBroker) =>
+            if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
+              partitionsToMakeFollower += partition
+            else
+              stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
+                "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader")
+                .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+                partition.topic, partition.partitionId, newLeaderBrokerId))
+          case None =>
+            // The leader broker should always be present in the leaderAndIsrRequest.
+            // If not, we should record the error message and abort the transition process for this partition
+            stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
+              "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available")
+              .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+              partition.topic, partition.partitionId, newLeaderBrokerId))
+        }
+      }
+
+      replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
+      partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
           "%d epoch %d with correlation id %d for partition %s")
-          .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId)))
+          .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
       }
 
-      logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) =>
-        new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
-      })
-      partitionState.foreach { state =>
-        stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " +
+      logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap)
+
+      partitionsToMakeFollower.foreach { partition =>
+        stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
           "become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
-          TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch))
+          partition.topic, partition.partitionId, correlationId, controllerId, epoch))
       }
-      if (!isShuttingDown.get()) {
-        val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]()
-        partitionState.foreach {
-          case (partition, partitionStateInfo) =>
-            val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
-            leaders.find(_.id == leader) match {
-              case Some(leaderBroker) =>
-                partitionAndOffsets.put(new TopicAndPartition(partition), 
-                                        BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset))
-              case None =>
-                stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " +
-                                         "controller %d epoch %d for partition %s since the designated leader %d " +
-                                         "cannot be found in live or shutting down brokers %s").format(localBrokerId,
-                                         correlationId, controllerId, epoch, partition, leader, leaders.mkString(",")))
-            }
+
+      if (isShuttingDown.get()) {
+        partitionsToMakeFollower.foreach { partition =>
+          stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
+            "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId,
+            controllerId, epoch, partition.topic, partition.partitionId))
         }
-        replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
       }
       else {
-        partitionState.foreach { state =>
-          stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +
-            "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
-            controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
+        // we do not need to check if the leader exists again since this has been done at the beginning of this process
+        val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
+          new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap
+        replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+
+        partitionsToMakeFollower.foreach { partition =>
+          stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
+            "%d epoch %d with correlation id %d for partition [%s,%d]")
+            .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId))
         }
       }
     } catch {