You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[3/28] git commit: draft patch
draft patch
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa1546b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa1546b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa1546b0
Branch: refs/heads/trunk
Commit: aa1546b0907c959a4df90a7e3d48bad0890d1f2f
Parents: ecd1f19
Author: Neha Narkhede <ne...@gmail.com>
Authored: Sat Jan 19 14:38:46 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sat Jan 19 14:38:46 2013 -0800
----------------------------------------------------------------------
.../kafka/controller/PartitionLeaderSelector.scala | 3 ++-
.../kafka/controller/PartitionStateMachine.scala | 9 +++++++++
core/src/main/scala/kafka/utils/ZkUtils.scala | 11 ++++++-----
3 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1546b0/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3eb23cd..5f748b8 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -177,7 +177,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
case None =>
- throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
+ throw new StateChangeFailedException(("No other replicas in ISR %s for [%s,%d] besides current leader %d and" +
+ " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topic, partition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1546b0/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 372793b..94f27ae 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -151,6 +151,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
case OfflinePartition =>
// pre: partition should be in Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
+ // mark the partition offline by setting the leader to -1
+ // read the current leader and isr path
+ val leaderIsrAndControllerEpoch = controller.controllerContext.allLeaders(topicAndPartition)
+ leaderIsrAndControllerEpoch.leaderAndIsr.leader = -1
+ leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch += 1
+ leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion += 1
+ ZkUtils.updatePersistentPath(zkClient,
+ ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+ ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// should be called when the leader for a partition is no longer alive
info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
partitionState.put(topicAndPartition, OfflinePartition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1546b0/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f594404..113ad37 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -95,10 +95,11 @@ object ZkUtils extends Logging {
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
- val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
- val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
- val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
- val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
+ val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
+ val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
+ val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
+ val isrString = leaderIsrAndEpochInfo.get("ISR").get
+ val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@@ -201,7 +202,7 @@ object ZkUtils extends Logging {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leaderAndIsr.leader.toString)
jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
- jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+ jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
Utils.stringMapToJson(jsonDataMap)
}