You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2014/01/15 17:35:11 UTC
git commit: kafka-1202; optimize ZK access in KafkaController;
also incorporating fixes in kafka-1020; patched by Jun Rao and Guozhang Wang;
reviewed by Neha Narkhede and Joel Koshy
Updated Branches:
refs/heads/trunk d401292ab -> 855340a2e
kafka-1202; optimize ZK access in KafkaController; also incorporating fixes in kafka-1020; patched by Jun Rao and Guozhang Wang; reviewed by Neha Narkhede and Joel Koshy
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/855340a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/855340a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/855340a2
Branch: refs/heads/trunk
Commit: 855340a2e65ffbb79520c49d0b9a231b94acd538
Parents: d401292
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Jan 15 08:37:14 2014 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Jan 15 08:37:14 2014 -0800
----------------------------------------------------------------------
.../controller/ControllerChannelManager.scala | 4 -
.../kafka/controller/KafkaController.scala | 132 ++++++++++++-------
.../controller/PartitionLeaderSelector.scala | 40 +++---
.../controller/PartitionStateMachine.scala | 22 +++-
.../kafka/controller/ReplicaStateMachine.scala | 35 ++---
core/src/main/scala/kafka/utils/ZkUtils.scala | 43 ------
6 files changed, 146 insertions(+), 130 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 33a84fb..ea8485b 100644
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -203,10 +203,6 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq
if(stopAndDeleteReplicaRequestMap.size > 0)
throw new IllegalStateException("Controller to broker state change requests batch is not empty while creating a " +
"new one. Some StopReplica with delete state changes %s might be lost ".format(stopAndDeleteReplicaRequestMap.toString()))
- leaderAndIsrRequestMap.clear()
- stopReplicaRequestMap.clear()
- updateMetadataRequestMap.clear()
- stopAndDeleteReplicaRequestMap.clear()
}
def addLeaderAndIsrRequestForBrokers(brokerIds: Seq[Int], topic: String, partition: Int,
http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 03ef9cf..a0267ae 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -69,6 +69,26 @@ class ControllerContext(val zkClient: ZkClient,
def liveOrShuttingDownBrokerIds = liveBrokerIdsUnderlying
def liveOrShuttingDownBrokers = liveBrokersUnderlying
+
+ def partitionsOnBroker(brokerId: Int): Set[TopicAndPartition] = {
+ partitionReplicaAssignment
+ .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
+ .map { case(topicAndPartition, replicas) => topicAndPartition }
+ .toSet
+ }
+
+ def replicasOnBrokers(brokerIds: Set[Int]): Set[PartitionAndReplica] = {
+ brokerIds.map { brokerId =>
+ partitionReplicaAssignment
+ .filter { case(topicAndPartition, replicas) => replicas.contains(brokerId) }
+ .map { case(topicAndPartition, replicas) =>
+ new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, brokerId) }
+ }.flatten.toSet
+ }
+
+ def allLiveReplicas(): Set[PartitionAndReplica] = {
+ replicasOnBrokers(liveBrokerIds)
+ }
}
trait KafkaControllerMBean {
@@ -190,13 +210,11 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
debug("Live brokers: " + controllerContext.liveBrokerIds.mkString(","))
}
- val allPartitionsAndReplicationFactorOnBroker = controllerContext.controllerLock synchronized {
- getPartitionsAssignedToBroker(zkClient, controllerContext.allTopics.toSeq, id).map {
- case(topic, partition) =>
- val topicAndPartition = TopicAndPartition(topic, partition)
- (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size)
+ val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
+ controllerContext.controllerLock synchronized {
+ controllerContext.partitionsOnBroker(id)
+ .map(topicAndPartition => (topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
}
- }
allPartitionsAndReplicationFactorOnBroker.foreach {
case(topicAndPartition, replicationFactor) =>
@@ -328,7 +346,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
sendUpdateMetadataRequest(newBrokers)
// the very first thing to do when a new broker comes up is send it the entire list of partitions that it is
// supposed to host. Based on that the broker starts the high watermark threads for the input list of partitions
- replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
+ replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(newBrokersSet), OnlineReplica)
// when a new broker comes up, the controller needs to trigger leader election for all new and offline partitions
// to see if these brokers can become leaders for some/all of those
partitionStateMachine.triggerOnlinePartitionStateChange()
@@ -366,12 +384,12 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
// trigger OnlinePartition state changes for offline or new partitions
partitionStateMachine.triggerOnlinePartitionStateChange()
// handle dead replicas
- replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, controllerContext.allTopics.toSeq, deadBrokers), OfflineReplica)
+ replicaStateMachine.handleStateChanges(controllerContext.replicasOnBrokers(deadBrokersSet), OfflineReplica)
}
/**
- * This callback is invoked by the partition state machine's topic change listener with the list of failed brokers
- * as input. It does the following -
+ * This callback is invoked by the partition state machine's topic change listener with the list of new topics
+ * and partitions as input. It does the following -
* 1. Registers partition change listener. This is not required until KAFKA-347
* 2. Invokes the new partition callback
*/
@@ -383,7 +401,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
}
/**
- * This callback is invoked by the topic change callback with the list of failed brokers as input.
+ * This callback is invoked by the partition state machine's partition change listener with the list of new partitions.
* It does the following -
* 1. Move the newly created partitions to the NewPartition state
* 2. Move the newly created partitions from NewPartition->OnlinePartition state
@@ -399,60 +417,84 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
/**
* This callback is invoked by the reassigned partitions listener. When an admin command initiates a partition
* reassignment, it creates the /admin/reassign_partitions path that triggers the zookeeper listener.
- * Reassigning replicas for a partition goes through a few stages -
+ * Reassigning replicas for a partition goes through a few steps listed in the code.
* RAR = Reassigned replicas
- * AR = Original list of replicas for partition
- * 1. Write new AR = AR + RAR. At this time, update the leader epoch in zookeeper and send a LeaderAndIsr request with
- * AR = AR + RAR to all replicas in (AR + RAR)
- * 2. Start new replicas RAR - AR.
- * 3. Wait until new replicas are in sync with the leader
- * 4. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
+ * OAR = Original list of replicas for partition
+ * AR = current assigned replicas
+ *
+ * 1. Update AR in ZK with OAR + RAR.
+ * 2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR). We do this by forcing an update
+ * of the leader epoch in zookeeper.
+ * 3. Start new replicas RAR - OAR by moving replicas in RAR - OAR to NewReplica state.
+ * 4. Wait until all replicas in RAR are in sync with the leader.
+ * 5 Move all replicas in RAR to OnlineReplica state.
+ * 6. Set AR to RAR in memory.
+ * 7. If the leader is not in RAR, elect a new leader from RAR. If new leader needs to be elected from RAR, a LeaderAndIsr
* will be sent. If not, then leader epoch will be incremented in zookeeper and a LeaderAndIsr request will be sent.
* In any case, the LeaderAndIsr request will have AR = RAR. This will prevent the leader from adding any replica in
- * RAR - AR back in the ISR
- * 5. Stop old replicas AR - RAR. As part of this, we make 2 state changes OfflineReplica and NonExistentReplica. As part
- * of OfflineReplica state change, we shrink the ISR to remove RAR - AR in zookeeper and sent a LeaderAndIsr ONLY to
- * the Leader to notify it of the shrunk ISR. After that, we send a StopReplica (delete = false) to the replicas in
- * RAR - AR. As part of the NonExistentReplica state change, we delete replicas in RAR - AR.
- * 6. Write new AR = RAR. As part of this, we finally change the AR in zookeeper to RAR.
- * 7. Remove partition from the /admin/reassign_partitions path
+ * RAR - OAR back in the isr.
+ * 8. Move all replicas in OAR - RAR to OfflineReplica state. As part of OfflineReplica state change, we shrink the
+ * isr to remove OAR - RAR in zookeeper and sent a LeaderAndIsr ONLY to the Leader to notify it of the shrunk isr.
+ * After that, we send a StopReplica (delete = false) to the replicas in OAR - RAR.
+ * 9. Move all replicas in OAR - RAR to NonExistentReplica state. This will send a StopReplica (delete = false) to
+ * the replicas in OAR - RAR to physically delete the replicas on disk.
+ * 10. Update AR in ZK with RAR.
+ * 11. Update the /admin/reassign_partitions path in ZK to remove this partition.
+ * 12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
+ *
+ * For example, if OAR = {1, 2, 3} and RAR = {4,5,6}, the values in the assigned replica (AR) and leader/isr path in ZK
+ * may go through the following transition.
+ * AR leader/isr
+ * {1,2,3} 1/{1,2,3} (initial state)
+ * {1,2,3,4,5,6} 1/{1,2,3} (step 2)
+ * {1,2,3,4,5,6} 1/{1,2,3,4,5,6} (step 4)
+ * {1,2,3,4,5,6} 4/{1,2,3,4,5,6} (step 7)
+ * {1,2,3,4,5,6} 4/{4,5,6} (step 8)
+ * {4,5,6} 4/{4,5,6} (step 10)
+ *
+ * Note that we have to update AR in ZK with RAR last since it's the only place where we store OAR persistently.
+ * This way, if the controller crashes before that step, we can still recover.
*/
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
val reassignedReplicas = reassignedPartitionContext.newReplicas
areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas) match {
+ case false =>
+ info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
+ "reassigned not yet caught up with the leader")
+ val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
+ val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
+ //1. Update AR in ZK with OAR + RAR.
+ updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
+ //2. Send LeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
+ updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
+ newAndOldReplicas.toSeq)
+ //3. replicas in RAR - OAR -> NewReplica
+ startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
+ info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
+ "reassigned to catch up with the leader")
case true =>
+ //4. Wait until all replicas in RAR are in sync with the leader.
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
- // mark the new replicas as online
+ //5. replicas in RAR -> OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
replica)), OnlineReplica)
}
- // check if current leader is in the new replicas list. If not, controller needs to trigger leader election
+ //6. Set AR to RAR in memory.
+ //7. Send LeaderAndIsr request with a potential new leader (if current leader not in RAR) and
+ // a new AR (using RAR) and same isr to every broker in RAR
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
- // stop older replicas
+ //8. replicas in OAR - RAR -> Offline (force those replicas out of isr)
+ //9. replicas in OAR - RAR -> NonExistentReplica (force those replicas to be deleted)
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
- // write the new list of replicas for this partition in zookeeper
+ //10. Update AR in ZK with RAR.
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
- // update the /admin/reassign_partitions path to remove this partition
+ //11. Update the /admin/reassign_partitions path in ZK to remove this partition.
removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
- // after electing leader, the replicas and isr information changes, so resend the update metadata request
+ //12. After electing leader, the replicas and isr information changes. So resend the update metadata request to every broker.
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
- case false =>
- info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
- "reassigned not yet caught up with the leader")
- val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
- val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
- // write the expanded list of replicas to zookeeper
- updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
- // update the leader epoch in zookeeper to use on the next LeaderAndIsrRequest
- updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
- newAndOldReplicas.toSeq)
- // start new replicas
- startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
- info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
- "reassigned to catch up with the leader")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index a47b142..fd9200f 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -26,19 +26,20 @@ trait PartitionLeaderSelector {
* @param topicAndPartition The topic and partition whose leader needs to be elected
* @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper
* @throws NoReplicaOnlineException If no replica in the assigned replicas list is alive
- * @return The leader and isr request, with the newly selected leader info, to send to the brokers
- * Also, returns the list of replicas the returned leader and isr request should be sent to
- * This API selects a new leader for the input partition
+ * @return The leader and isr request, with the newly selected leader and isr, and the set of replicas to receive
+ * the LeaderAndIsrRequest.
*/
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}
/**
- * This API selects a new leader for the input partition -
- * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
- * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+ * Select the new leader, new isr and receiving replicas (for the LeaderAndIsrRequest):
+ * 1. If at least one broker from the isr is alive, it picks a broker from the live isr as the new leader and the live
+ * isr as the new isr.
+ * 2. Else, it picks some alive broker from the assigned replica list as the new leader and the new isr.
* 3. If no broker in the assigned replica list is alive, it throws NoReplicaOnlineException
+ * Replicas to receive LeaderAndIsr request = live assigned replicas
* Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
*/
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
@@ -82,7 +83,9 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
}
/**
- * Picks one of the alive in-sync reassigned replicas as the new leader.
+ * New leader = a live in-sync reassigned replica
+ * New isr = current isr
+ * Replicas to receive LeaderAndIsr request = reassigned replicas
*/
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "
@@ -94,7 +97,8 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
- val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
+ val aliveReassignedInSyncReplicas = reassignedInSyncReplicas.filter(r => controllerContext.liveBrokerIds.contains(r) &&
+ currentLeaderAndIsr.isr.contains(r))
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
case Some(newLeader) => (new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr,
@@ -106,16 +110,16 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
" %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new StateChangeFailedException("None of the reassigned replicas for partition " +
- "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
+ "%s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
}
/**
- * Picks the preferred replica as the new leader if -
- * 1. It is already not the current leader
- * 2. It is alive
+ * New leader = preferred (first assigned) replica (if in isr and alive);
+ * New isr = current isr;
+ * Replicas to receive LeaderAndIsr request = assigned replicas
*/
class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector
with Logging {
@@ -145,8 +149,9 @@ with Logging {
}
/**
- * Picks one of the alive replicas (other than the current leader) in ISR as
- * new leader, fails if there are no other replicas in ISR.
+ * New leader = replica in isr that's not being shutdown;
+ * New isr = current isr - shutdown replica;
+ * Replicas to receive LeaderAndIsr request = live assigned replicas
*/
class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
extends PartitionLeaderSelector
@@ -164,8 +169,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
- val newIsr = currentLeaderAndIsr.isr.filter(brokerId => brokerId != currentLeader &&
- !controllerContext.shuttingDownBrokerIds.contains(brokerId))
+ val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
val newLeaderOpt = newIsr.headOption
newLeaderOpt match {
case Some(newLeader) =>
@@ -174,8 +178,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
case None =>
- throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" +
- " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
+ throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides" +
+ " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 5859ce7..ac4262a 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -119,7 +119,23 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
/**
* This API exercises the partition's state machine. It ensures that every state transition happens from a legal
- * previous state to the target state.
+ * previous state to the target state. Valid state transitions are:
+ * NonExistentPartition -> NewPartition:
+ * --load assigned replicas from ZK to controller cache
+ *
+ * NewPartition -> OnlinePartition
+ * --assign first live replica as the leader and all live replicas as the isr; write leader and isr to ZK for this partition
+ * --send LeaderAndIsr request to every live replica and UpdateMetadata request to every live broker
+ *
+ * OnlinePartition,OfflinePartition -> OnlinePartition
+ * --select new leader and isr for this partition and a set of replicas to receive the LeaderAndIsr request, and write leader and isr to ZK
+ * --for this partition, send LeaderAndIsr request to every receiving replica and UpdateMetadata request to every live broker
+ *
+ * NewPartition,OnlinePartition -> OfflinePartition
+ * --nothing other than marking partition state as Offline
+ *
+ * OfflinePartition -> NonExistentPartition
+ * --nothing other than marking the partition state as NonExistentPartition
* @param topic The topic of the partition for which the state transition is invoked
* @param partition The partition for which the state transition is invoked
* @param targetState The end state that the partition should be moved to
@@ -273,8 +289,8 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
/**
- * Invoked on the OfflinePartition->OnlinePartition state change. It invokes the leader election API to elect a leader
- * for the input offline partition
+ * Invoked on the OfflinePartition,OnlinePartition->OnlinePartition state change.
+ * It invokes the leader election API to elect a leader for the input offline partition
* @param topic The topic of the offline partition
* @param partition The offline partition
* @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.)
http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index ad4ee53..483559a 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -58,8 +58,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
initializeReplicaState()
hasStarted.set(true)
// move all Online replicas to Online
- handleStateChanges(getAllReplicasOnBroker(controllerContext.allTopics.toSeq,
- controllerContext.liveBrokerIds.toSeq), OnlineReplica)
+ handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
info("Started replica state machine with initial state -> " + replicaState.toString())
}
@@ -95,7 +94,23 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
/**
* This API exercises the replica's state machine. It ensures that every state transition happens from a legal
- * previous state to the target state.
+ * previous state to the target state. Valid state transitions are:
+ * NonExistentReplica --> NewReplica
+ * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+ *
+ * NewReplica -> OnlineReplica
+ * --add the new replica to the assigned replica list if needed
+ *
+ * OnlineReplica,OfflineReplica -> OnlineReplica
+ * --send LeaderAndIsr request with current leader and isr to the new replica and UpdateMetadata request for the partition to every live broker
+ *
+ * NewReplica,OnlineReplica -> OfflineReplica
+ * --send StopReplicaRequest to the replica (w/o deletion)
+ * --remove this replica from the isr and send LeaderAndIsr request (with new isr) to the leader replica and UpdateMetadata request for the partition to every live broker.
+ *
+ * OfflineReplica -> NonExistentReplica
+ * --send StopReplicaRequest to the replica (with deletion)
+ *
* @param topic The topic of the replica for which the state transition is invoked
* @param partition The partition of the replica for which the state transition is invoked
* @param replicaId The replica for which the state transition is invoked
@@ -228,20 +243,6 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
}
}
- private def getAllReplicasOnBroker(topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
- brokerIds.map { brokerId =>
- val partitionsAssignedToThisBroker =
- controllerContext.partitionReplicaAssignment.filter(p => topics.contains(p._1.topic) && p._2.contains(brokerId))
- if(partitionsAssignedToThisBroker.size == 0)
- info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
- partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1.topic, p._1.partition, brokerId))
- }.flatten.toSet
- }
-
- def getPartitionsAssignedToBroker(topics: Seq[String], brokerId: Int):Seq[TopicAndPartition] = {
- controllerContext.partitionReplicaAssignment.filter(_._2.contains(brokerId)).keySet.toSeq
- }
-
/**
* This is the zookeeper listener that triggers all the state transitions for a replica
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/855340a2/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 73902b2..b42e52b 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -185,12 +185,6 @@ object ZkUtils extends Logging {
}
}
- def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
- val replicas = getReplicasForPartition(zkClient, topic, partition)
- debug("The list of replicas for partition [%s,%d] is %s".format(topic, partition, replicas))
- replicas.contains(brokerId.toString)
- }
-
def registerBrokerInZk(zkClient: ZkClient, id: Int, host: String, port: Int, timeout: Int, jmxPort: Int) {
val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + id
val timestamp = SystemTime.milliseconds.toString
@@ -500,8 +494,6 @@ object ZkUtils extends Logging {
client.exists(path)
}
- def getLastPart(path : String) : String = path.substring(path.lastIndexOf('/') + 1)
-
def getCluster(zkClient: ZkClient) : Cluster = {
val cluster = new Cluster
val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
@@ -571,17 +563,6 @@ object ZkUtils extends Logging {
ret
}
- def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]):
- mutable.Map[(String, Int), Seq[Int]] = {
- val ret = new mutable.HashMap[(String, Int), Seq[Int]]
- for((topic, partitionAssignment) <- topicPartitionAssignment){
- for((partition, replicaAssignment) <- partitionAssignment){
- ret.put((topic, partition), replicaAssignment)
- }
- }
- ret
- }
-
def getPartitionsForTopics(zkClient: ZkClient, topics: Seq[String]): mutable.Map[String, Seq[Int]] = {
getPartitionAssignmentForTopics(zkClient, topics).map { topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
@@ -591,19 +572,6 @@ object ZkUtils extends Logging {
}
}
- def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Seq[(String, Int)] = {
- val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics)
- topicsAndPartitions.map { topicAndPartitionMap =>
- val topic = topicAndPartitionMap._1
- val partitionMap = topicAndPartitionMap._2
- val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
- val relevantPartitions = relevantPartitionsMap.map(_._1)
- for(relevantPartition <- relevantPartitions) yield {
- (topic, relevantPartition)
- }
- }.flatten[(String, Int)].toSeq
- }
-
def getPartitionsBeingReassigned(zkClient: ZkClient): Map[TopicAndPartition, ReassignedPartitionsContext] = {
// read the partitions and their new replica list
val jsonPartitionMapOpt = readDataMaybeNull(zkClient, ReassignPartitionsPath)._1
@@ -677,17 +645,6 @@ object ZkUtils extends Logging {
}
}
- def getAllReplicasOnBroker(zkClient: ZkClient, topics: Seq[String], brokerIds: Seq[Int]): Set[PartitionAndReplica] = {
- Set.empty[PartitionAndReplica] ++ brokerIds.map { brokerId =>
- // read all the partitions and their assigned replicas into a map organized by
- // { replica id -> partition 1, partition 2...
- val partitionsAssignedToThisBroker = getPartitionsAssignedToBroker(zkClient, topics, brokerId)
- if(partitionsAssignedToThisBroker.size == 0)
- info("No state transitions triggered since no partitions are assigned to brokers %s".format(brokerIds.mkString(",")))
- partitionsAssignedToThisBroker.map(p => new PartitionAndReplica(p._1, p._2, brokerId))
- }.flatten
- }
-
def getPartitionsUndergoingPreferredReplicaElection(zkClient: ZkClient): Set[TopicAndPartition] = {
// read the partitions and their new replica list
val jsonPartitionListOpt = readDataMaybeNull(zkClient, PreferredReplicaLeaderElectionPath)._1