You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC

[3/28] git commit: draft patch

draft patch


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aa1546b0
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aa1546b0
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aa1546b0

Branch: refs/heads/trunk
Commit: aa1546b0907c959a4df90a7e3d48bad0890d1f2f
Parents: ecd1f19
Author: Neha Narkhede <ne...@gmail.com>
Authored: Sat Jan 19 14:38:46 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Sat Jan 19 14:38:46 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/PartitionLeaderSelector.scala |    3 ++-
 .../kafka/controller/PartitionStateMachine.scala   |    9 +++++++++
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   11 ++++++-----
 3 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1546b0/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 3eb23cd..5f748b8 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -177,7 +177,8 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
         (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
          liveAssignedReplicas)
       case None =>
-        throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
+        throw new StateChangeFailedException(("No other replicas in ISR %s for [%s,%d] besides current leader %d and" +
+          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topic, partition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1546b0/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 372793b..94f27ae 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -151,6 +151,15 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         case OfflinePartition =>
           // pre: partition should be in Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
+          // mark the partition offline by setting the leader to -1
+          // read the current leader and isr path
+          val leaderIsrAndControllerEpoch = controller.controllerContext.allLeaders(topicAndPartition)
+          leaderIsrAndControllerEpoch.leaderAndIsr.leader = -1
+          leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch += 1
+          leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion += 1
+          ZkUtils.updatePersistentPath(zkClient,
+            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+            ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
           // should be called when the leader for a partition is no longer alive
           info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
           partitionState.put(topicAndPartition, OfflinePartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/aa1546b0/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index f594404..113ad37 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -95,10 +95,11 @@ object ZkUtils extends Logging {
   : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
-        val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
-        val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
-        val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
-        val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
+        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
+        val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
+        val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
+        val isrString = leaderIsrAndEpochInfo.get("ISR").get
+        val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
         val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@@ -201,7 +202,7 @@ object ZkUtils extends Logging {
     val jsonDataMap = new HashMap[String, String]
     jsonDataMap.put("leader", leaderAndIsr.leader.toString)
     jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
-    jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
+    jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
     jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
     Utils.stringMapToJson(jsonDataMap)
   }