You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2014/02/13 23:46:59 UTC
git commit: KAFKA-1188 Stale LeaderAndIsr request could be handled by
the broker on Controller failover; reviewed by Neha, Jun
Updated Branches:
refs/heads/trunk 7e154a36f -> 3d830c9ef
KAFKA-1188 Stale LeaderAndIsr request could be handled by the broker on Controller failover; reviewed by Neha, Jun
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d830c9e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d830c9e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d830c9e
Branch: refs/heads/trunk
Commit: 3d830c9ef1a186cd36e4f5c7bdc2dfef2d51edc7
Parents: 7e154a3
Author: Guozhang Wang <gu...@linkedin.com>
Authored: Thu Feb 13 14:46:46 2014 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Feb 13 14:46:52 2014 -0800
----------------------------------------------------------------------
.../main/scala/kafka/cluster/Partition.scala | 36 +++---
.../kafka/controller/ReplicaStateMachine.scala | 5 +-
.../scala/kafka/server/ReplicaManager.scala | 117 +++++++++++--------
3 files changed, 87 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d830c9e/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1087a2e..882b6da 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -28,8 +28,10 @@ import kafka.metrics.KafkaMetricsGroup
import kafka.controller.KafkaController
import org.apache.log4j.Logger
import kafka.message.ByteBufferMessageSet
-import kafka.common.{NotAssignedReplicaException, TopicAndPartition, NotLeaderForPartitionException, ErrorMapping}
+import kafka.common.{NotAssignedReplicaException, NotLeaderForPartitionException, ErrorMapping}
import java.io.IOException
+import scala.Some
+import kafka.common.TopicAndPartition
/**
@@ -190,10 +192,11 @@ class Partition(val topic: String,
/**
* Make the local replica the follower by setting the new leader and ISR to empty
+ * If the leader replica id does not change, return false to indicate the replica manager
*/
def makeFollower(controllerId: Int,
partitionStateInfo: PartitionStateInfo,
- leaders: Set[Broker], correlationId: Int): Boolean = {
+ correlationId: Int): Boolean = {
leaderIsrUpdateLock synchronized {
val allReplicas = partitionStateInfo.allReplicas
val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
@@ -202,23 +205,18 @@ class Partition(val topic: String,
// record the epoch of the controller that made the leadership decision. This is useful while updating the isr
// to maintain the decision maker controller's epoch in the zookeeper path
controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch
- // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
- leaders.find(_.id == newLeaderBrokerId) match {
- case Some(leaderBroker) =>
- // add replicas that are new
- allReplicas.foreach(r => getOrCreateReplica(r))
- // remove assigned replicas that have been removed by the controller
- (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
- inSyncReplicas = Set.empty[Replica]
- leaderEpoch = leaderAndIsr.leaderEpoch
- zkVersion = leaderAndIsr.zkVersion
- leaderReplicaIdOpt = Some(newLeaderBrokerId)
- case None => // we should not come here
- stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
- "controller %d epoch %d for partition [%s,%d] new leader %d")
- .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
- topic, partitionId, newLeaderBrokerId))
- }
+ // add replicas that are new
+ allReplicas.foreach(r => getOrCreateReplica(r))
+ // remove assigned replicas that have been removed by the controller
+ (assignedReplicas().map(_.brokerId) -- allReplicas).foreach(removeReplica(_))
+ inSyncReplicas = Set.empty[Replica]
+ leaderEpoch = leaderAndIsr.leaderEpoch
+ zkVersion = leaderAndIsr.zkVersion
+
+ if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId)
+ return false;
+
+ leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d830c9e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 5e016d5..37a4800 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -231,8 +231,9 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case Some(currLeaderIsrAndControllerEpoch) =>
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
- // send the shrunk ISR state change request only to the leader
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
+ // send the shrunk ISR state change request to all the remaining alive replicas of the partition.
+ val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
replicaState.put(partitionAndReplica, OfflineReplica)
stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s from %s to %s"
http://git-wip-us.apache.org/repos/asf/kafka/blob/3d830c9e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 21bba48..3dd562c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,21 +16,21 @@
*/
package kafka.server
-import kafka.cluster.{Broker, Partition, Replica}
import collection._
import mutable.HashMap
-import org.I0Itec.zkclient.ZkClient
-import java.io.{File, IOException}
-import java.util.concurrent.atomic.AtomicBoolean
+import kafka.cluster.{Broker, Partition, Replica}
import kafka.utils._
import kafka.log.LogManager
import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
-import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.api.{StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
import kafka.controller.KafkaController
import org.apache.log4j.Logger
+import org.I0Itec.zkclient.ZkClient
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.atomic.AtomicBoolean
+import java.io.{IOException, File}
+import java.util.concurrent.TimeUnit
object ReplicaManager {
@@ -215,9 +215,9 @@ class ReplicaManager(val config: KafkaConfig,
val responseMap = new collection.mutable.HashMap[(String, Int), Short]
if(leaderAndISRRequest.controllerEpoch < controllerEpoch) {
leaderAndISRRequest.partitionStateInfos.foreach { case ((topic, partition), stateInfo) =>
- stateChangeLogger.warn(("Broker %d received LeaderAndIsr request correlation id %d with an old controller epoch %d." +
- " Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.correlationId,
- leaderAndISRRequest.controllerEpoch, controllerEpoch))
+ stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d since " +
+ "its controller epoch %d is old. Latest known controller epoch is %d").format(localBrokerId, leaderAndISRRequest.controllerId,
+ leaderAndISRRequest.correlationId, leaderAndISRRequest.controllerEpoch, controllerEpoch))
}
(responseMap, ErrorMapping.StaleControllerEpochCode)
} else {
@@ -236,17 +236,17 @@ class ReplicaManager(val config: KafkaConfig,
if(partitionStateInfo.allReplicas.contains(config.brokerId))
partitionState.put(partition, partitionStateInfo)
else {
- stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request with correlation id %d from " +
- "controller %d epoch %d as broker is not in assigned replica list %s for partition [%s,%d]")
- .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
- partitionStateInfo.allReplicas.mkString(","), topic, partition.partitionId))
+ stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
+ "epoch %d for partition [%s,%d] as itself is not in assigned replica list %s")
+ .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
+ topic, partition.partitionId, partitionStateInfo.allReplicas.mkString(",")))
}
} else {
// Otherwise record the error code in response
- stateChangeLogger.warn(("Broker %d received invalid LeaderAndIsr request with correlation id %d from " +
- "controller %d epoch %d with an older leader epoch %d for partition [%s,%d], current leader epoch is %d")
- .format(localBrokerId, correlationId, controllerId, leaderAndISRRequest.controllerEpoch,
- partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, topic, partition.partitionId, partitionLeaderEpoch))
+ stateChangeLogger.warn(("Broker %d ignoring LeaderAndIsr request from controller %d with correlation id %d " +
+ "epoch %d for partition [%s,%d] since its associated leader epoch %d is old. Current leader epoch is %d")
+ .format(localBrokerId, controllerId, correlationId, leaderAndISRRequest.controllerEpoch,
+ topic, partition.partitionId, partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch, partitionLeaderEpoch))
responseMap.put((topic, partitionId), ErrorMapping.StaleLeaderEpochCode)
}
}
@@ -345,10 +345,11 @@ class ReplicaManager(val config: KafkaConfig,
*/
private def makeFollowers(controllerId: Int, epoch: Int, partitionState: Map[Partition, PartitionStateInfo],
leaders: Set[Broker], correlationId: Int, responseMap: mutable.Map[(String, Int), Short]) {
- partitionState.foreach(state =>
+ partitionState.foreach { state =>
stateChangeLogger.trace(("Broker %d handling LeaderAndIsr request correlationId %d from controller %d epoch %d " +
"starting the become-follower transition for partition %s")
- .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId))))
+ .format(localBrokerId, correlationId, controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
+ }
for (partition <- partitionState.keys)
responseMap.put((partition.topic, partition.partitionId), ErrorMapping.NoError)
@@ -358,47 +359,63 @@ class ReplicaManager(val config: KafkaConfig,
leaderPartitions --= partitionState.keySet
}
- partitionState.foreach{ case (partition, leaderIsrAndControllerEpoch) =>
- partition.makeFollower(controllerId, leaderIsrAndControllerEpoch, leaders, correlationId)}
+ var partitionsToMakeFollower: Set[Partition] = Set()
- replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(new TopicAndPartition(_)))
- partitionState.foreach { state =>
+ // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1
+ partitionState.foreach{ case (partition, partitionStateInfo) =>
+ val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch
+ val newLeaderBrokerId = leaderIsrAndControllerEpoch.leaderAndIsr.leader
+ leaders.find(_.id == newLeaderBrokerId) match {
+ case Some(leaderBroker) =>
+ if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
+ partitionsToMakeFollower += partition
+ else
+ stateChangeLogger.info(("Broker %d skipped the become-follower state change after marking its partition as follower with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] since the new leader %d is the same as the old leader")
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+ partition.topic, partition.partitionId, newLeaderBrokerId))
+ case None =>
+ // The leader broker should always be present in the leaderAndIsrRequest.
+ // If not, we should record the error message and abort the transition process for this partition
+ stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] since new leader %d is not currently available")
+ .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
+ partition.topic, partition.partitionId, newLeaderBrokerId))
+ }
+ }
+
+ replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(new TopicAndPartition(_)))
+ partitionsToMakeFollower.foreach { partition =>
stateChangeLogger.trace(("Broker %d stopped fetchers as part of become-follower request from controller " +
"%d epoch %d with correlation id %d for partition %s")
- .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(state._1.topic, state._1.partitionId)))
+ .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
}
- logManager.truncateTo(partitionState.map{ case(partition, leaderISRAndControllerEpoch) =>
- new TopicAndPartition(partition) -> partition.getOrCreateReplica().highWatermark
- })
- partitionState.foreach { state =>
- stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition %s as part of " +
+ logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap)
+
+ partitionsToMakeFollower.foreach { partition =>
+ stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
"become-follower request with correlation id %d from controller %d epoch %d").format(localBrokerId,
- TopicAndPartition(state._1.topic, state._1.partitionId), correlationId, controllerId, epoch))
+ partition.topic, partition.partitionId, correlationId, controllerId, epoch))
}
- if (!isShuttingDown.get()) {
- val partitionAndOffsets = mutable.Map[TopicAndPartition, BrokerAndInitialOffset]()
- partitionState.foreach {
- case (partition, partitionStateInfo) =>
- val leader = partitionStateInfo.leaderIsrAndControllerEpoch.leaderAndIsr.leader
- leaders.find(_.id == leader) match {
- case Some(leaderBroker) =>
- partitionAndOffsets.put(new TopicAndPartition(partition),
- BrokerAndInitialOffset(leaderBroker, partition.getReplica().get.logEndOffset))
- case None =>
- stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d " +
- "controller %d epoch %d for partition %s since the designated leader %d " +
- "cannot be found in live or shutting down brokers %s").format(localBrokerId,
- correlationId, controllerId, epoch, partition, leader, leaders.mkString(",")))
- }
+
+ if (isShuttingDown.get()) {
+ partitionsToMakeFollower.foreach { partition =>
+ stateChangeLogger.trace(("Broker %d skipped the adding-fetcher step of the become-follower state change with correlation id %d from " +
+ "controller %d epoch %d for partition [%s,%d] since it is shutting down").format(localBrokerId, correlationId,
+ controllerId, epoch, partition.topic, partition.partitionId))
}
- replicaFetcherManager.addFetcherForPartitions(partitionAndOffsets)
}
else {
- partitionState.foreach { state =>
- stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " +
- "controller %d epoch %d for partition %s since it is shutting down").format(localBrokerId, correlationId,
- controllerId, epoch, TopicAndPartition(state._1.topic, state._1.partitionId)))
+ // we do not need to check if the leader exists again since this has been done at the beginning of this process
+ val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
+ new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap
+ replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
+
+ partitionsToMakeFollower.foreach { partition =>
+ stateChangeLogger.trace(("Broker %d started fetcher to new leader as part of become-follower request from controller " +
+ "%d epoch %d with correlation id %d for partition [%s,%d]")
+ .format(localBrokerId, controllerId, epoch, correlationId, partition.topic, partition.partitionId))
}
}
} catch {