You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[15/28] git commit: KAFKA-708 ISR becomes empty while marking a
partition offline; reviewed by Jun Rao
KAFKA-708 ISR becomes empty while marking a partition offline; reviewed by Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb0d5a7f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb0d5a7f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb0d5a7f
Branch: refs/heads/trunk
Commit: eb0d5a7f815ac51e3bdcf7bb2c1d3af9f33f2d83
Parents: 22cd8bf
Author: Neha Narkhede <ne...@gmail.com>
Authored: Wed Jan 23 13:47:27 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Wed Jan 23 13:47:27 2013 -0800
----------------------------------------------------------------------
.../main/scala/kafka/api/LeaderAndIsrRequest.scala | 2 +-
.../scala/kafka/controller/KafkaController.scala | 26 +++---
.../kafka/controller/PartitionLeaderSelector.scala | 47 +++++-----
.../kafka/controller/PartitionStateMachine.scala | 26 +++---
.../kafka/controller/ReplicaStateMachine.scala | 71 ++++++++-------
core/src/main/scala/kafka/utils/Logging.scala | 2 +-
core/src/main/scala/kafka/utils/Utils.scala | 2 +-
core/src/main/scala/kafka/utils/ZkUtils.scala | 11 ++-
.../test/scala/unit/kafka/utils/UtilsTest.scala | 12 +++
9 files changed, 105 insertions(+), 94 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
index 99af002..6955433 100644
--- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
+++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
@@ -48,7 +48,7 @@ object PartitionStateInfo {
val leader = buffer.getInt
val leaderEpoch = buffer.getInt
val isrString = readShortString(buffer)
- val isr = isrString.split(",").map(_.toInt).toList
+ val isr = Utils.parseCsvList(isrString).map(_.toInt).toList
val zkVersion = buffer.getInt
val replicationFactor = buffer.getInt
PartitionStateInfo(LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, leaderEpoch, isr, zkVersion), controllerEpoch),
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index 565c53a..f1a12c0 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -671,7 +671,9 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
"means the current controller with epoch %d went through a soft failure and another ".format(epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
if (leaderAndIsr.isr.contains(replicaId)) {
- val newLeaderAndIsr = new LeaderAndIsr(leaderAndIsr.leader, leaderAndIsr.leaderEpoch + 1,
+ // if the replica to be removed from the ISR is also the leader, set the new leader value to -1
+ val newLeader = if(replicaId == leaderAndIsr.leader) -1 else leaderAndIsr.leader
+ val newLeaderAndIsr = new LeaderAndIsr(newLeader, leaderAndIsr.leaderEpoch + 1,
leaderAndIsr.isr.filter(b => b != replicaId), leaderAndIsr.zkVersion + 1)
// update the new leadership decision in zookeeper or retry
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(
@@ -683,8 +685,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
finalLeaderIsrAndControllerEpoch = Some(LeaderIsrAndControllerEpoch(newLeaderAndIsr, epoch))
if (updateSucceeded)
- info("New leader and ISR for partition [%s, %d] is %s"
- .format(topic, partition, newLeaderAndIsr.toString()))
+ info("New leader and ISR for partition %s is %s".format(topicAndPartition, newLeaderAndIsr.toString()))
updateSucceeded
} else {
warn("Cannot remove replica %d from ISR of %s. Leader = %d ; ISR = %s"
@@ -721,7 +722,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg
partitionStateMachine.shutdown()
replicaStateMachine.shutdown()
if(controllerContext.controllerChannelManager != null) {
- info("session expires, clean up the state")
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
@@ -766,13 +766,11 @@ class PartitionsReassignedListener(controller: KafkaController) extends IZkDataL
assignedReplicasOpt match {
case Some(assignedReplicas) =>
if(assignedReplicas == newReplicas) {
- throw new KafkaException("Partition %s to be reassigned is already assigned to replicas"
- .format(topicAndPartition) +
+ throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) +
" %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
if(aliveNewReplicas == newReplicas) {
- info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition,
- newReplicas.mkString(",")))
+ info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
val context = createReassignmentContextForPartition(topic, partition, newReplicas)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, context)
controller.onPartitionReassignment(topicAndPartition, context)
@@ -851,18 +849,18 @@ class ReassignedPartitionsIsrChangeListener(controller: KafkaController, topic:
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
if(caughtUpReplicas == reassignedReplicas) {
// resume the partition reassignment process
- info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
+ info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+ .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Resuming partition reassignment")
controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
}
else {
- info("%d/%d replicas have caught up with the leader for partition [%s, %d] being reassigned."
- .format(caughtUpReplicas.size, reassignedReplicas.size, topic, partition) +
+ info("%d/%d replicas have caught up with the leader for partition %s being reassigned."
+ .format(caughtUpReplicas.size, reassignedReplicas.size, topicAndPartition) +
"Replica(s) %s still need to catch up".format((reassignedReplicas -- leaderAndIsr.isr.toSet).mkString(",")))
}
- case None => error("Error handling reassignment of partition [%s, %d] to replicas %s as it was never created"
- .format(topic, partition, reassignedReplicas.mkString(",")))
+ case None => error("Error handling reassignment of partition %s to replicas %s as it was never created"
+ .format(topicAndPartition, reassignedReplicas.mkString(",")))
}
case None =>
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3eb23cd..3ed9b7e 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -23,15 +23,14 @@ import kafka.common.{TopicAndPartition, StateChangeFailedException, PartitionOff
trait PartitionLeaderSelector {
/**
- * @param topic The topic of the partition whose leader needs to be elected
- * @param partition The partition whose leader needs to be elected
+ * @param topicAndPartition The topic and partition whose leader needs to be elected
* @param currentLeaderAndIsr The current leader and isr of input partition read from zookeeper
* @throws PartitionOfflineException If no replica in the assigned replicas list is alive
* @return The leader and isr request, with the newly selected leader info, to send to the brokers
* Also, returns the list of replicas the returned leader and isr request should be sent to
* This API selects a new leader for the input partition
*/
- def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
+ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])
}
@@ -45,8 +44,8 @@ trait PartitionLeaderSelector {
class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[OfflinePartitionLeaderSelector]: "
- def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
- controllerContext.partitionReplicaAssignment.get(TopicAndPartition(topic, partition)) match {
+ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+ controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
case Some(assignedReplicas) =>
val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
@@ -60,7 +59,7 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
case true =>
ControllerStats.offlinePartitionRate.mark()
throw new PartitionOfflineException(("No replica for partition " +
- "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, controllerContext.liveBrokerIds)) +
+ "%s is alive. Live brokers are: [%s],".format(topicAndPartition, controllerContext.liveBrokerIds)) +
" Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
ControllerStats.uncleanLeaderElectionRate.mark()
@@ -74,13 +73,11 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
debug("Some broker in ISR is alive, selecting the leader from the ISR: " + newLeader)
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
}
- info("Selected new leader and ISR %s for offline partition [%s, %d]".format(newLeaderAndIsr.toString(), topic,
- partition))
+ info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))
(newLeaderAndIsr, liveAssignedReplicasToThisPartition)
case None =>
ControllerStats.offlinePartitionRate.mark()
- throw new PartitionOfflineException("Partition [%s, %d] doesn't have".format(topic, partition) +
- "replicas assigned to it")
+ throw new PartitionOfflineException("Partition %s doesn't have".format(topicAndPartition) + "replicas assigned to it")
}
}
}
@@ -91,8 +88,8 @@ class OfflinePartitionLeaderSelector(controllerContext: ControllerContext) exten
class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) extends PartitionLeaderSelector with Logging {
this.logIdent = "[ReassignedPartitionLeaderSelector]: "
- def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
- val reassignedReplicas = controllerContext.partitionsBeingReassigned(TopicAndPartition(topic, partition)).newReplicas
+ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+ val reassignedReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
// pick any replica from the newly assigned replicas list that is in the ISR
@@ -105,10 +102,10 @@ class ReassignedPartitionLeaderSelector(controllerContext: ControllerContext) ex
reassignedReplicas.size match {
case 0 =>
throw new StateChangeFailedException("List of reassigned replicas for partition " +
- "([%s, %d]) is empty. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+ " %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new StateChangeFailedException("None of the reassigned replicas for partition " +
- "([%s, %d]) are alive. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+ "%s are alive. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
@@ -123,17 +120,16 @@ class PreferredReplicaPartitionLeaderSelector(controllerContext: ControllerConte
with Logging {
this.logIdent = "[PreferredReplicaPartitionLeaderSelector]: "
- def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
- val topicAndPartition = TopicAndPartition(topic, partition)
+ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val preferredReplica = assignedReplicas.head
// check if preferred replica is the current leader
val currentLeader = controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader
if(currentLeader == preferredReplica) {
- throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition [%s,%d]"
- .format(preferredReplica, topic, partition))
+ throw new StateChangeFailedException("Preferred replica %d is already the current leader for partition %s"
+ .format(preferredReplica, topicAndPartition))
} else {
- info("Current leader %d for partition [%s,%d] is not the preferred replica.".format(currentLeader, topic, partition) +
+ info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) +
" Trigerring preferred replica leader election")
// check if preferred replica is not the current leader and is alive and in the isr
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
@@ -141,7 +137,7 @@ with Logging {
currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) +
- "[%s,%d] is either not alive or not in the isr. Current leader and ISR: [%s]".format(topic, partition, currentLeaderAndIsr))
+ "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}
@@ -157,13 +153,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
this.logIdent = "[ControlledShutdownLeaderSelector]: "
- def selectLeader(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
+ def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val currentLeader = currentLeaderAndIsr.leader
- val assignedReplicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
+ val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
@@ -172,12 +168,13 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
val newLeaderOpt = newIsr.headOption
newLeaderOpt match {
case Some(newLeader) =>
- debug("Partition [%s,%d] : current leader = %d, new leader = %d"
- .format(topic, partition, currentLeader, newLeader))
+ debug("Partition %s : current leader = %d, new leader = %d"
+ .format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
case None =>
- throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
+ throw new StateChangeFailedException(("No other replicas in ISR %s for %s besides current leader %d and" +
+ " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 372793b..9bb318c 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -130,7 +130,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
assignReplicasToPartitions(topic, partition)
partitionState.put(topicAndPartition, NewPartition)
- info("Partition [%s, %d] state changed from NotExists to New with assigned replicas ".format(topic, partition) +
+ info("Partition %s state changed from NotExists to New with assigned replicas ".format(topicAndPartition) +
"%s".format(controllerContext.partitionReplicaAssignment(topicAndPartition).mkString(",")))
case OnlinePartition =>
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
@@ -144,7 +144,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
- info("Partition [%s, %d] state changed from %s to OnlinePartition with leader %d".format(topic, partition,
+ info("Partition %s state changed from %s to OnlinePartition with leader %d".format(topicAndPartition,
partitionState(topicAndPartition), controllerContext.allLeaders(topicAndPartition).leaderAndIsr.leader))
partitionState.put(topicAndPartition, OnlinePartition)
// post: partition has a leader
@@ -152,18 +152,18 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
// pre: partition should be in Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
// should be called when the leader for a partition is no longer alive
- info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
+ info("Partition %s state changed from Online to Offline".format(topicAndPartition))
partitionState.put(topicAndPartition, OfflinePartition)
// post: partition has no alive leader
case NonExistentPartition =>
// pre: partition could be in either of the above states
assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
- info("Partition [%s, %d] state changed from Offline to NotExists".format(topic, partition))
+ info("Partition %s state changed from Offline to NotExists".format(topicAndPartition))
partitionState.put(topicAndPartition, NonExistentPartition)
// post: partition state is deleted from all brokers and zookeeper
}
} catch {
- case t: Throwable => error("State change for partition [%s, %d] ".format(topic, partition) +
+ case t: Throwable => error("State change for partition %s ".format(topicAndPartition) +
"from %s to %s failed".format(currState, targetState), t)
}
}
@@ -266,8 +266,9 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
* @param leaderSelector Specific leader selector (e.g., offline/reassigned/etc.)
*/
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
+ val topicAndPartition = TopicAndPartition(topic, partition)
// handle leader election for the partitions whose leader is no longer alive
- info("Electing leader for partition [%s, %d]".format(topic, partition))
+ info("Electing leader for partition %s".format(topicAndPartition))
try {
var zookeeperPathUpdateSucceeded: Boolean = false
var newLeaderAndIsr: LeaderAndIsr = null
@@ -281,7 +282,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
"means the current controller with epoch %d went through a soft failure and another ".format(controller.epoch) +
"controller was elected with epoch %d. Aborting state change by this controller".format(controllerEpoch))
// elect new leader or throw exception
- val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topic, partition, currentLeaderAndIsr)
+ val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
ZkUtils.leaderAndIsrZkData(leaderAndIsr, controller.epoch), currentLeaderAndIsr.zkVersion)
@@ -293,15 +294,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
val newLeaderIsrAndControllerEpoch = new LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
// update the leader cache
controllerContext.allLeaders.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
- info("Elected leader %d for Offline partition [%s, %d]".format(newLeaderAndIsr.leader, topic, partition))
+ info("Elected leader %d for Offline partition %s".format(newLeaderAndIsr.leader, topicAndPartition))
// store new leader and isr info in cache
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition,
newLeaderIsrAndControllerEpoch, controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition)).size)
} catch {
- case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition [%s, %d] are dead."
- .format(topic, partition) + " Marking this partition offline", poe)
+ case poe: PartitionOfflineException => throw new PartitionOfflineException("All replicas for partition %s are dead."
+ .format(topicAndPartition) + " Marking this partition offline", poe)
case sce => throw new StateChangeFailedException(("Error while electing leader for partition " +
- " [%s, %d] due to: %s.").format(topic, partition, sce.getMessage), sce)
+ " %s due to: %s.").format(topicAndPartition, sce.getMessage), sce)
}
debug("After leader election, leader cache is updated to %s".format(controllerContext.allLeaders.map(l => (l._1, l._2))))
}
@@ -315,11 +316,12 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
}
private def getLeaderIsrAndEpochOrThrowException(topic: String, partition: Int): LeaderIsrAndControllerEpoch = {
+ val topicAndPartition = TopicAndPartition(topic, partition)
ZkUtils.getLeaderIsrAndEpochForPartition(zkClient, topic, partition) match {
case Some(currentLeaderIsrAndEpoch) => currentLeaderIsrAndEpoch
case None =>
throw new StateChangeFailedException("Leader and ISR information doesn't exist for partition " +
- "[%s, %d] in %s state".format(topic, partition, partitionState(TopicAndPartition(topic, partition))))
+ "%s in %s state".format(topicAndPartition, partitionState(topicAndPartition)))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 264285c..20d9c4f 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -110,14 +110,14 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
if(leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
- throw new StateChangeFailedException("Replica %d for partition [%s, %d] cannot be moved to NewReplica"
- .format(replicaId, topic, partition) + "state as it is being requested to become leader")
+ throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
+ .format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
topic, partition, leaderIsrAndControllerEpoch, replicaAssignment.size)
case None => // new leader request will be sent to this replica when one gets elected
}
replicaState.put((topic, partition, replicaId), NewReplica)
- info("Replica %d for partition [%s, %d] state changed to NewReplica".format(replicaId, topic, partition))
+ info("Replica %d for partition %s state changed to NewReplica".format(replicaId, topicAndPartition))
case NonExistentReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(OfflineReplica), targetState)
// send stop replica command
@@ -126,7 +126,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition,
currentAssignedReplicas.filterNot(_ == replicaId))
- info("Replica %d for partition [%s, %d] state changed to NonExistentReplica".format(replicaId, topic, partition))
+ info("Replica %d for partition %s state changed to NonExistentReplica".format(replicaId, topicAndPartition))
replicaState.remove((topic, partition, replicaId))
case OnlineReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica, OfflineReplica), targetState)
@@ -135,19 +135,19 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
// add this replica to the assigned replicas list for its partition
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
- info("Replica %d for partition [%s, %d] state changed to OnlineReplica".format(replicaId, topic, partition))
+ info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
case _ =>
// check if the leader for this partition is alive or even exists
controllerContext.allLeaders.get(topicAndPartition) match {
case Some(leaderIsrAndControllerEpoch) =>
- val leader = leaderIsrAndControllerEpoch.leaderAndIsr.leader
- if (controllerContext.liveOrShuttingDownBrokerIds.contains(leader)) {
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
- topic, partition, leaderIsrAndControllerEpoch,
- replicaAssignment.size)
- replicaState.put((topic, partition, replicaId), OnlineReplica)
- info("Replica %d for partition [%s, %d] state changed to OnlineReplica"
- .format(replicaId, topic, partition))
+ controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match {
+ case true => // leader is alive
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
+ topic, partition, leaderIsrAndControllerEpoch,
+ replicaAssignment.size)
+ replicaState.put((topic, partition, replicaId), OnlineReplica)
+ info("Replica %d for partition %s state changed to OnlineReplica".format(replicaId, topicAndPartition))
+ case false => // ignore partitions whose leader is not alive
}
case None => // ignore partitions who don't have a leader yet
}
@@ -156,30 +156,31 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging {
case OfflineReplica =>
assertValidPreviousStates(topic, partition, replicaId, List(NewReplica, OnlineReplica), targetState)
// As an optimization, the controller removes dead replicas from the ISR
- val leaderAndIsrIsEmpty: Boolean = controllerContext.allLeaders.get(topicAndPartition) match {
- case Some(currLeaderIsrAndControllerEpoch) =>
- if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
- controller.removeReplicaFromIsr(topic, partition, replicaId) match {
- case Some(updatedLeaderIsrAndControllerEpoch) =>
- // send the shrunk ISR state change request only to the leader
- brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
- topic, partition, updatedLeaderIsrAndControllerEpoch,
- replicaAssignment.size)
- replicaState.put((topic, partition, replicaId), OfflineReplica)
- info("Replica %d for partition [%s, %d] state changed to OfflineReplica".format(replicaId, topic, partition))
- info("Removed offline replica %d from ISR for partition [%s, %d]".format(replicaId, topic, partition))
- false
- case None =>
- true
- }
- else false
- case None =>
- true
- }
+ val leaderAndIsrIsEmpty: Boolean =
+ controllerContext.allLeaders.get(topicAndPartition) match {
+ case Some(currLeaderIsrAndControllerEpoch) =>
+ if (currLeaderIsrAndControllerEpoch.leaderAndIsr.isr.contains(replicaId))
+ controller.removeReplicaFromIsr(topic, partition, replicaId) match {
+ case Some(updatedLeaderIsrAndControllerEpoch) =>
+ // send the shrunk ISR state change request only to the leader
+ brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(updatedLeaderIsrAndControllerEpoch.leaderAndIsr.leader),
+ topic, partition, updatedLeaderIsrAndControllerEpoch,
+ replicaAssignment.size)
+ replicaState.put((topic, partition, replicaId), OfflineReplica)
+ info("Replica %d for partition %s state changed to OfflineReplica".format(replicaId, topicAndPartition))
+ info("Removed offline replica %d from ISR for partition %s".format(replicaId, topicAndPartition))
+ false
+ case None =>
+ true
+ }
+ else false
+ case None =>
+ true
+ }
if (leaderAndIsrIsEmpty)
throw new StateChangeFailedException(
- "Failed to change state of replica %d for partition [%s, %d] since the leader and isr path in zookeeper is empty"
- .format(replicaId, topic, partition))
+ "Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty"
+ .format(replicaId, topicAndPartition))
}
}
catch {
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/utils/Logging.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Logging.scala b/core/src/main/scala/kafka/utils/Logging.scala
index d36fe23..d9f010b 100644
--- a/core/src/main/scala/kafka/utils/Logging.scala
+++ b/core/src/main/scala/kafka/utils/Logging.scala
@@ -28,7 +28,7 @@ trait Logging {
// Force initialization to register Log4jControllerMBean
private val log4jController = Log4jController
- private def msgWithLogIdent(msg: String) = "%s%s".format(logIdent, msg)
+ private def msgWithLogIdent(msg: String) = logIdent + msg
def trace(msg: => String): Unit = {
if (logger.isTraceEnabled())
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index c83752f..0185c14 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -442,7 +442,7 @@ object Utils extends Logging {
* Whitespace surrounding the comma will be removed.
*/
def parseCsvList(csvList: String): Seq[String] = {
- if(csvList == null)
+ if(csvList == null || csvList.isEmpty)
Seq.empty[String]
else {
csvList.split("\\s*,\\s*").filter(v => !v.equals(""))
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f594404..113ad37 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -95,10 +95,11 @@ object ZkUtils extends Logging {
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
- val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
- val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
- val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
- val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
+ val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
+ val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
+ val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
+ val isrString = leaderIsrAndEpochInfo.get("ISR").get
+ val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@@ -201,7 +202,7 @@ object ZkUtils extends Logging {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leaderAndIsr.leader.toString)
jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
- jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+ jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
Utils.stringMapToJson(jsonDataMap)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/eb0d5a7f/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index a4d3a27..cce6c8e 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -62,4 +62,16 @@ class UtilsTest extends JUnitSuite {
}
}
+ @Test
+ def testCsvList() {
+ val emptyString:String = ""
+ val nullString:String = null
+ val emptyList = Utils.parseCsvList(emptyString)
+ val emptyListFromNullString = Utils.parseCsvList(nullString)
+ val emptyStringList = Seq.empty[String]
+ assertTrue(emptyList!=null)
+ assertTrue(emptyListFromNullString!=null)
+ assertTrue(emptyStringList.equals(emptyListFromNullString))
+ assertTrue(emptyStringList.equals(emptyList))
+ }
}