You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/26 07:04:36 UTC
[6/28] git commit: Revert "draft patch"
Revert "draft patch"
This reverts commit aa1546b0907c959a4df90a7e3d48bad0890d1f2f.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5490884e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5490884e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5490884e
Branch: refs/heads/trunk
Commit: 5490884e9cbd28ac57e4b73bec72bcab81ce6523
Parents: 426ef16
Author: Neha Narkhede <ne...@gmail.com>
Authored: Mon Jan 21 09:53:41 2013 -0800
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Mon Jan 21 09:53:41 2013 -0800
----------------------------------------------------------------------
.../kafka/controller/PartitionLeaderSelector.scala | 3 +--
.../kafka/controller/PartitionStateMachine.scala | 9 ---------
core/src/main/scala/kafka/utils/ZkUtils.scala | 11 +++++------
3 files changed, 6 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
index 5f748b8..3eb23cd 100644
--- a/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
+++ b/core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala
@@ -177,8 +177,7 @@ class ControlledShutdownLeaderSelector(controllerContext: ControllerContext)
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1),
liveAssignedReplicas)
case None =>
- throw new StateChangeFailedException(("No other replicas in ISR %s for [%s,%d] besides current leader %d and" +
- " shutting down brokers %s").format(currentLeaderAndIsr.isr.mkString(","), topic, partition, currentLeader, controllerContext.shuttingDownBrokerIds.mkString(",")))
+ throw new StateChangeFailedException("No other replicas in ISR for %s-%s.".format(topic, partition))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
index 94f27ae..372793b 100644
--- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala
@@ -151,15 +151,6 @@ class PartitionStateMachine(controller: KafkaController) extends Logging {
case OfflinePartition =>
// pre: partition should be in Online state
assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition), OfflinePartition)
- // mark the partition offline by setting the leader to -1
- // read the current leader and isr path
- val leaderIsrAndControllerEpoch = controller.controllerContext.allLeaders(topicAndPartition)
- leaderIsrAndControllerEpoch.leaderAndIsr.leader = -1
- leaderIsrAndControllerEpoch.leaderAndIsr.leaderEpoch += 1
- leaderIsrAndControllerEpoch.leaderAndIsr.zkVersion += 1
- ZkUtils.updatePersistentPath(zkClient,
- ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
- ZkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// should be called when the leader for a partition is no longer alive
info("Partition [%s, %d] state changed from Online to Offline".format(topic, partition))
partitionState.put(topicAndPartition, OfflinePartition)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5490884e/core/src/main/scala/kafka/utils/ZkUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 113ad37..f594404 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -95,11 +95,10 @@ object ZkUtils extends Logging {
: Option[LeaderIsrAndControllerEpoch] = {
Json.parseFull(leaderAndIsrStr) match {
case Some(m) =>
- val leaderIsrAndEpochInfo = m.asInstanceOf[Map[String, String]]
- val leader = leaderIsrAndEpochInfo.get("leader").get.toInt
- val epoch = leaderIsrAndEpochInfo.get("leaderEpoch").get.toInt
- val isrString = leaderIsrAndEpochInfo.get("ISR").get
- val controllerEpoch = leaderIsrAndEpochInfo.get("controllerEpoch").get.toInt
+ val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
+ val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+ val isrString = m.asInstanceOf[Map[String, String]].get("ISR").get
+ val controllerEpoch = m.asInstanceOf[Map[String, String]].get("controllerEpoch").get.toInt
val isr = Utils.parseCsvList(isrString).map(r => r.toInt)
val zkPathVersion = stat.getVersion
debug("Leader %d, Epoch %d, Isr %s, Zk path version %d for topic %s and partition %d".format(leader, epoch,
@@ -202,7 +201,7 @@ object ZkUtils extends Logging {
val jsonDataMap = new HashMap[String, String]
jsonDataMap.put("leader", leaderAndIsr.leader.toString)
jsonDataMap.put("leaderEpoch", leaderAndIsr.leaderEpoch.toString)
- jsonDataMap.put("ISR", if(leaderAndIsr.isr.isEmpty) "" else leaderAndIsr.isr.mkString(","))
+ jsonDataMap.put("ISR", leaderAndIsr.isr.mkString(","))
jsonDataMap.put("controllerEpoch", controllerEpoch.toString)
Utils.stringMapToJson(jsonDataMap)
}