You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/01/21 19:11:11 UTC

git commit: Revert "draft patch"

Updated Branches:
  refs/heads/0.8 426ef1662 -> 5490884e9


Revert "draft patch"

This reverts commit aa1546b0907c959a4df90a7e3d48bad0890d1f2f.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5490884e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5490884e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5490884e

Branch: refs/heads/0.8
Commit: 5490884e9cbd28ac57e4b73bec72bcab81ce6523
Parents: 426ef16
Author: Neha Narkhede <ne...@gmail.com>
Authored: Mon Jan 21 09:53:41 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Jan 21 09:53:41 2013 -0800

----------------------------------------------------------------------
 .../kafka/controller/PartitionLeaderSelector.scala |    3 +--
 .../kafka/controller/PartitionStateMachine.scala   |    9 ---------
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   11 +++++------
 3 files changed, 6 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 5f748b8..3eb23cd 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -177,8 +177,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
         (LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
          liveAssignedReplicas)
       case None =>
-        throw new StateChangeFailedException(("No other replicas in ISR %s for [%s,%d] besides current leader %d and" +
-          " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topic, partition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
+        throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 94f27ae..372793b 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -151,15 +151,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
         case OfflinePartition =>
           // pre: partition should be in Online state
           assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
-          // mark the partition offline by setting the leader to -1
-          // read the current leader and isr path
-          val leaderIsrAndControllerEpoch = controller.controllerContext.allLeaders(topicAndPartition)
-          leaderIsrAndControllerEpoch.leaderAndIsr.leader = -1
-          leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch += 1
-          leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion += 1
-          ZkUtils.updatePersistentPath(zkClient,
-            ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
-            ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
           // should be called when the leader for a partition is no longer alive
           info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
           partitionState.put(topicAndPartition, OfflinePartition)

http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 113ad37..f594404 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -95,11 +95,10 @@ object ZkUtils extends Logging {
   : Option[LeaderIsrAndControllerEpoch] = {
     Json.parseFull(leaderAndIsrStr) match {
       case Some(m) =>
-        val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
-        val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
-        val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
-        val isrString = leaderIsrAndEpochInfo.get("ISR").get
-        val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
+        val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
+        val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+        val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+        val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
         val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
         val zkPathVersion = stat.getVersion
         debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@@ -202,7 +201,7 @@ object ZkUtils extends Logging {
     val jsonDataMap = new HashMap[String, String]
     jsonDataMap.put("leader", leaderAndIsr.leader.toString)
     jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
-    jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
+    jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
     jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
     Utils.stringMapToJson(jsonDataMap)
   }