You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/09/10 19:09:52 UTC

svn commit: r1382988 [1/2] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/server/ main/scala/kafka/utils/ test/sc...

Author: nehanarkhede
Date: Mon Sep 10 17:09:52 2012
New Revision: 1382988

URL: http://svn.apache.org/viewvc?rev=1382988&view=rev
Log:
KAFKA-498: Controller has race conditions and synchronization bugs; patched by Neha Narkhede; reviewed by Jun Rao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Mon Sep 10 17:09:52 2012
@@ -24,7 +24,7 @@ import kafka.utils.{Logging, Utils, ZkUt
 import org.I0Itec.zkclient.ZkClient
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import scala.collection.mutable
-import kafka.common.{LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
+import kafka.common.{BrokerNotAvailableException, LeaderNotAvailableException, ReplicaNotAvailableException, ErrorMapping}
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -148,13 +148,11 @@ object AdminUtils extends Logging {
       optionalBrokerInfo match {
         case Some(brokerInfo) => brokerInfo // return broker info from the cache
         case None => // fetch it from zookeeper
-          try {
-            val brokerInfo = ZkUtils.getBrokerInfoFromIds(zkClient, List(id)).head
-            cachedBrokerInfo += (id -> brokerInfo)
-            brokerInfo
-          }catch {
-            case e => error("Failed to fetch broker info for broker id " + id)
-            throw e
+          ZkUtils.getBrokerInfo(zkClient, id) match {
+            case Some(brokerInfo) =>
+              cachedBrokerInfo += (id -> brokerInfo)
+              brokerInfo
+            case None => throw new BrokerNotAvailableException("Failed to fetch broker info for broker " + id)
           }
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndISRRequest.scala Mon Sep 10 17:09:52 2012
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * 
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package kafka.api
-
-import java.nio._
-import kafka.utils._
-import collection.mutable.Map
-import collection.mutable.HashMap
-
-
-object LeaderAndISR {
-  val initialLeaderEpoch: Int = 0
-  val initialZKVersion: Int = 0
-  def readFrom(buffer: ByteBuffer): LeaderAndISR = {
-    val leader = buffer.getInt
-    val leaderGenId = buffer.getInt
-    val ISRString = Utils.readShortString(buffer, "UTF-8")
-    val ISR = ISRString.split(",").map(_.toInt).toList
-    val zkVersion = buffer.getInt
-    new LeaderAndISR(leader, leaderGenId, ISR, zkVersion)
-  }
-}
-
-case class LeaderAndISR(var leader: Int, var leaderEpoch: Int, var ISR: List[Int], var zkVersion: Int){
-  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndISR.initialLeaderEpoch, ISR, LeaderAndISR.initialZKVersion)
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putInt(leader)
-    buffer.putInt(leaderEpoch)
-    Utils.writeShortString(buffer, ISR.mkString(","), "UTF-8")
-    buffer.putInt(zkVersion)
-  }
-
-  def sizeInBytes(): Int = {
-    val size = 4 + 4 + (2 + ISR.mkString(",").length) + 4
-    size
-  }
-
-  override def toString(): String = {
-    val jsonDataMap = new HashMap[String, String]
-    jsonDataMap.put("leader", leader.toString)
-    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
-    jsonDataMap.put("ISR", ISR.mkString(","))
-    Utils.stringMapToJsonString(jsonDataMap)
-  }
-}
-
-
-object LeaderAndISRRequest {
-  val CurrentVersion = 1.shortValue()
-  val DefaultClientId = ""
-  val IsInit: Boolean = true
-  val NotInit: Boolean = false
-  val DefaultAckTimeout: Int = 1000
-
-  def readFrom(buffer: ByteBuffer): LeaderAndISRRequest = {
-    val versionId = buffer.getShort
-    val clientId = Utils.readShortString(buffer)
-    val isInit = if(buffer.get() == 1.toByte) true else false
-    val ackTimeoutMs = buffer.getInt
-    val leaderAndISRRequestCount = buffer.getInt
-    val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndISR]
-
-    for(i <- 0 until leaderAndISRRequestCount){
-      val topic = Utils.readShortString(buffer, "UTF-8")
-      val partition = buffer.getInt
-      val leaderAndISRRequest = LeaderAndISR.readFrom(buffer)
-
-      leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
-    }
-    new LeaderAndISRRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos)
-  }
-}
-
-
-case class LeaderAndISRRequest (versionId: Short,
-                                clientId: String,
-                                isInit: Boolean,
-                                ackTimeoutMs: Int,
-                                leaderAndISRInfos: Map[(String, Int), LeaderAndISR])
-        extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
-  def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndISR]) = {
-    this(LeaderAndISRRequest.CurrentVersion, LeaderAndISRRequest.DefaultClientId, isInit, LeaderAndISRRequest.DefaultAckTimeout, leaderAndISRInfos)
-  }
-
-  def writeTo(buffer: ByteBuffer) {
-    buffer.putShort(versionId)
-    Utils.writeShortString(buffer, clientId)
-    buffer.put(if(isInit) 1.toByte else 0.toByte)
-    buffer.putInt(ackTimeoutMs)
-    buffer.putInt(leaderAndISRInfos.size)
-    for((key, value) <- leaderAndISRInfos){
-      Utils.writeShortString(buffer, key._1, "UTF-8")
-      buffer.putInt(key._2)
-      value.writeTo(buffer)
-    }
-  }
-
-  def sizeInBytes(): Int = {
-    var size = 1 + 2 + (2 + clientId.length) + 4 + 4
-    for((key, value) <- leaderAndISRInfos)
-      size += (2 + key._1.length) + 4 + value.sizeInBytes
-    size
-  }
-}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1382988&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Mon Sep 10 17:09:52 2012
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package kafka.api
+
+import java.nio._
+import kafka.utils._
+import collection.mutable.Map
+import collection.mutable.HashMap
+
+
+object LeaderAndIsr {
+  val initialLeaderEpoch: Int = 0
+  val initialZKVersion: Int = 0
+  def readFrom(buffer: ByteBuffer): LeaderAndIsr = {
+    val leader = buffer.getInt
+    val leaderGenId = buffer.getInt
+    val ISRString = Utils.readShortString(buffer, "UTF-8")
+    val ISR = ISRString.split(",").map(_.toInt).toList
+    val zkVersion = buffer.getInt
+    new LeaderAndIsr(leader, leaderGenId, ISR, zkVersion)
+  }
+}
+
+case class LeaderAndIsr(var leader: Int, var leaderEpoch: Int, var isr: List[Int], var zkVersion: Int){
+  def this(leader: Int, ISR: List[Int]) = this(leader, LeaderAndIsr.initialLeaderEpoch, ISR, LeaderAndIsr.initialZKVersion)
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putInt(leader)
+    buffer.putInt(leaderEpoch)
+    Utils.writeShortString(buffer, isr.mkString(","), "UTF-8")
+    buffer.putInt(zkVersion)
+  }
+
+  def sizeInBytes(): Int = {
+    val size = 4 + 4 + (2 + isr.mkString(",").length) + 4
+    size
+  }
+
+  override def toString(): String = {
+    val jsonDataMap = new HashMap[String, String]
+    jsonDataMap.put("leader", leader.toString)
+    jsonDataMap.put("leaderEpoch", leaderEpoch.toString)
+    jsonDataMap.put("ISR", isr.mkString(","))
+    Utils.stringMapToJsonString(jsonDataMap)
+  }
+}
+
+
+object LeaderAndIsrRequest {
+  val CurrentVersion = 1.shortValue()
+  val DefaultClientId = ""
+  val IsInit: Boolean = true
+  val NotInit: Boolean = false
+  val DefaultAckTimeout: Int = 1000
+
+  def readFrom(buffer: ByteBuffer): LeaderAndIsrRequest = {
+    val versionId = buffer.getShort
+    val clientId = Utils.readShortString(buffer)
+    val isInit = if(buffer.get() == 1.toByte) true else false
+    val ackTimeoutMs = buffer.getInt
+    val leaderAndISRRequestCount = buffer.getInt
+    val leaderAndISRInfos = new HashMap[(String, Int), LeaderAndIsr]
+
+    for(i <- 0 until leaderAndISRRequestCount){
+      val topic = Utils.readShortString(buffer, "UTF-8")
+      val partition = buffer.getInt
+      val leaderAndISRRequest = LeaderAndIsr.readFrom(buffer)
+
+      leaderAndISRInfos.put((topic, partition), leaderAndISRRequest)
+    }
+    new LeaderAndIsrRequest(versionId, clientId, isInit, ackTimeoutMs, leaderAndISRInfos)
+  }
+}
+
+
+case class LeaderAndIsrRequest (versionId: Short,
+                                clientId: String,
+                                isInit: Boolean,
+                                ackTimeoutMs: Int,
+                                leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
+        extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
+  def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
+    this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
+  }
+
+  def writeTo(buffer: ByteBuffer) {
+    buffer.putShort(versionId)
+    Utils.writeShortString(buffer, clientId)
+    buffer.put(if(isInit) 1.toByte else 0.toByte)
+    buffer.putInt(ackTimeoutMs)
+    buffer.putInt(leaderAndISRInfos.size)
+    for((key, value) <- leaderAndISRInfos){
+      Utils.writeShortString(buffer, key._1, "UTF-8")
+      buffer.putInt(key._2)
+      value.writeTo(buffer)
+    }
+  }
+
+  def sizeInBytes(): Int = {
+    var size = 1 + 2 + (2 + clientId.length) + 4 + 4
+    for((key, value) <- leaderAndISRInfos)
+      size += (2 + key._1.length) + 4 + value.sizeInBytes
+    size
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Broker.scala Mon Sep 10 17:09:52 2012
@@ -19,7 +19,7 @@ package kafka.cluster
 
 import kafka.utils.Utils._
 import java.nio.ByteBuffer
-import kafka.common.BrokerNotExistException
+import kafka.common.BrokerNotAvailableException
 
 /**
  * A Kafka broker
@@ -28,7 +28,7 @@ private[kafka] object Broker {
 
   def createBroker(id: Int, brokerInfoString: String): Broker = {
     if(brokerInfoString == null)
-      throw new BrokerNotExistException("Broker id %s does not exist".format(id))
+      throw new BrokerNotAvailableException("Broker id %s does not exist".format(id))
     val brokerInfo = brokerInfoString.split(":")
     new Broker(id, brokerInfo(0), brokerInfo(1), brokerInfo(2).toInt)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Mon Sep 10 17:09:52 2012
@@ -19,7 +19,7 @@ package kafka.cluster
 import scala.collection._
 import kafka.utils._
 import java.lang.Object
-import kafka.api.LeaderAndISR
+import kafka.api.LeaderAndIsr
 import kafka.server.ReplicaManager
 import kafka.common.ErrorMapping
 
@@ -39,8 +39,8 @@ class Partition(val topic: String,
   var inSyncReplicas: Set[Replica] = Set.empty[Replica]
   private val assignedReplicaMap = new Pool[Int,Replica]
   private val leaderISRUpdateLock = new Object
-  private var zkVersion: Int = LeaderAndISR.initialZKVersion
-  private var leaderEpoch: Int = LeaderAndISR.initialLeaderEpoch - 1
+  private var zkVersion: Int = LeaderAndIsr.initialZKVersion
+  private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
   this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
@@ -53,7 +53,7 @@ class Partition(val topic: String,
         if (isReplicaLocal(replicaId)) {
           val log = logManager.getOrCreateLog(topic, partitionId)
           val localReplica = new Replica(replicaId, this, time,
-                                         highwaterMarkCheckpoint.read(topic, partitionId), Some(log))
+            highwaterMarkCheckpoint.read(topic, partitionId), Some(log))
           addReplicaIfNotExists(localReplica)
         }
         else {
@@ -97,7 +97,7 @@ class Partition(val topic: String,
   /**
    *  If the leaderEpoch of the incoming request is higher than locally cached epoch, make it the new leader of follower to the new leader.
    */
-  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR, isMakingLeader: Boolean): Boolean = {
+  def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
     leaderISRUpdateLock synchronized {
       if (leaderEpoch >= leaderAndISR.leaderEpoch){
         info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request"
@@ -119,20 +119,20 @@ class Partition(val topic: String,
    *  3. reset LogEndOffset for remote replicas (there could be old LogEndOffset from the time when this broker was the leader last time)
    *  4. set the new leader and ISR
    */
-  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
-      trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
-      // stop replica fetcher thread, if any
-      replicaFetcherManager.removeFetcher(topic, partitionId)
-
-      val newInSyncReplicas = leaderAndISR.ISR.map(r => getOrCreateReplica(r)).toSet
-      // reset LogEndOffset for remote replicas
-      assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
-      inSyncReplicas = newInSyncReplicas
-      leaderEpoch = leaderAndISR.leaderEpoch
-      zkVersion = leaderAndISR.zkVersion
-      leaderReplicaIdOpt = Some(localBrokerId)
-      // we may need to increment high watermark since ISR could be down to 1
-      maybeIncrementLeaderHW(getReplica().get)
+  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
+    trace("Started to become leader at the request %s".format(leaderAndISR.toString()))
+    // stop replica fetcher thread, if any
+    replicaFetcherManager.removeFetcher(topic, partitionId)
+
+    val newInSyncReplicas = leaderAndISR.isr.map(r => getOrCreateReplica(r)).toSet
+    // reset LogEndOffset for remote replicas
+    assignedReplicas.foreach(r => if (r.brokerId != localBrokerId) r.logEndOffset = ReplicaManager.UnknownLogEndOffset)
+    inSyncReplicas = newInSyncReplicas
+    leaderEpoch = leaderAndISR.leaderEpoch
+    zkVersion = leaderAndISR.zkVersion
+    leaderReplicaIdOpt = Some(localBrokerId)
+    // we may need to increment high watermark since ISR could be down to 1
+    maybeIncrementLeaderHW(getReplica().get)
   }
 
   /**
@@ -141,24 +141,28 @@ class Partition(val topic: String,
    *  3. set the leader and set ISR to empty
    *  4. start a fetcher to the new leader
    */
-  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
-      trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
-      val newLeaderBrokerId: Int = leaderAndISR.leader
-      info("Starting the follower state transition to follow leader %d for topic %s partition %d"
-                   .format(newLeaderBrokerId, topic, partitionId))
-      val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(newLeaderBrokerId)).head
-      // stop fetcher thread to previous leader
-      replicaFetcherManager.removeFetcher(topic, partitionId)
-
-      // make sure local replica exists
-      val localReplica = getOrCreateReplica()
-      localReplica.log.get.truncateTo(localReplica.highWatermark)
-      inSyncReplicas = Set.empty[Replica]
-      leaderEpoch = leaderAndISR.leaderEpoch
-      zkVersion = leaderAndISR.zkVersion
-      leaderReplicaIdOpt = Some(newLeaderBrokerId)
-      // start fetcher thread to current leader
-      replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
+    trace("Started to become follower at the request %s".format(leaderAndISR.toString()))
+    val newLeaderBrokerId: Int = leaderAndISR.leader
+    info("Starting the follower state transition to follow leader %d for topic %s partition %d"
+      .format(newLeaderBrokerId, topic, partitionId))
+    ZkUtils.getBrokerInfo(zkClient, newLeaderBrokerId) match {
+      case Some(leaderBroker) =>
+        // stop fetcher thread to previous leader
+        replicaFetcherManager.removeFetcher(topic, partitionId)
+        // make sure local replica exists
+        val localReplica = getOrCreateReplica()
+        localReplica.log.get.truncateTo(localReplica.highWatermark)
+        inSyncReplicas = Set.empty[Replica]
+        leaderEpoch = leaderAndISR.leaderEpoch
+        zkVersion = leaderAndISR.zkVersion
+        leaderReplicaIdOpt = Some(newLeaderBrokerId)
+        // start fetcher thread to current leader
+        replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker)
+      case None => // leader went down
+        warn("Aborting become follower state change on %d since leader %d for ".format(localBrokerId, newLeaderBrokerId) +
+        " topic %s partition %d became unavailble during the state change operation".format(topic, partitionId))
+    }
   }
 
   def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
@@ -197,7 +201,7 @@ class Partition(val topic: String,
           })
           trace("%d/%d acks satisfied for %s-%d".format(numAcks, requiredAcks, topic, partitionId))
           if ((requiredAcks < 0 && numAcks >= inSyncReplicas.size) ||
-              (requiredAcks > 0 && numAcks >= requiredAcks)) {
+            (requiredAcks > 0 && numAcks >= requiredAcks)) {
             /*
             * requiredAcks < 0 means acknowledge after all replicas in ISR
             * are fully caught up to the (local) leader's offset
@@ -211,7 +215,7 @@ class Partition(val topic: String,
       }
     }
   }
-  
+
   def maybeIncrementLeaderHW(leaderReplica: Replica) {
     val allLogEndOffsets = inSyncReplicas.map(_.logEndOffset)
     val newHighWatermark = allLogEndOffsets.min
@@ -220,7 +224,7 @@ class Partition(val topic: String,
       leaderReplica.highWatermark = newHighWatermark
     else
       debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s"
-            .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
+        .format(topic, partitionId, oldHighWatermark, newHighWatermark, allLogEndOffsets.mkString(",")))
   }
 
   def maybeShrinkISR(replicaMaxLagTimeMs: Long,  replicaMaxLagBytes: Long) {
@@ -249,7 +253,7 @@ class Partition(val topic: String,
      *                     for keepInSyncTimeMs ms, the follower is stuck and should be removed from the ISR
      * 2. Slow followers: If the leo of the slowest follower is behind the leo of the leader by keepInSyncBytes, the
      *                     follower is not catching up and should be removed from the ISR
-    **/
+     **/
     val leaderLogEndOffset = leaderReplica.logEndOffset
     val candidateReplicas = inSyncReplicas - leaderReplica
     // Case 1 above
@@ -266,9 +270,9 @@ class Partition(val topic: String,
 
   private def updateISR(newISR: Set[Replica]) {
     info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
-    val newLeaderAndISR = new LeaderAndISR(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
+    val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
-      ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
+      ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)
     if (updateSucceeded){
       inSyncReplicas = newISR
       zkVersion = newVersion

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala?rev=1382988&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotAvailableException.scala Mon Sep 10 17:09:52 2012
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class BrokerNotAvailableException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/BrokerNotExistException.scala Mon Sep 10 17:09:52 2012
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.common
-
-class BrokerNotExistException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Mon Sep 10 17:09:52 2012
@@ -38,7 +38,7 @@ object ErrorMapping {
   val LeaderNotAvailableCode : Short = 6
   val NotLeaderForPartitionCode : Short = 7
   val RequestTimedOutCode: Short = 8
-  val BrokerNotExistInZookeeperCode: Short = 9
+  val BrokerNotAvailableCode: Short = 9
   val ReplicaNotAvailableCode: Short = 10
 
   private val exceptionToCode = 
@@ -51,7 +51,7 @@ object ErrorMapping {
       classOf[NotLeaderForPartitionException].asInstanceOf[Class[Throwable]] -> NotLeaderForPartitionCode,
       classOf[LeaderNotAvailableException].asInstanceOf[Class[Throwable]] -> LeaderNotAvailableCode,
       classOf[RequestTimedOutException].asInstanceOf[Class[Throwable]] -> RequestTimedOutCode,
-      classOf[BrokerNotExistException].asInstanceOf[Class[Throwable]] -> BrokerNotExistInZookeeperCode,
+      classOf[BrokerNotAvailableException].asInstanceOf[Class[Throwable]] -> BrokerNotAvailableCode,
       classOf[ReplicaNotAvailableException].asInstanceOf[Class[Throwable]] -> ReplicaNotAvailableCode
     ).withDefaultValue(UnknownCode)
   

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala?rev=1382988&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/PartitionOfflineException.scala Mon Sep 10 17:09:52 2012
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+/**
+ * This exception is thrown by the leader elector in the controller when leader election fails for a partition since
+ * all the replicas for a partition are offline
+ */
+class PartitionOfflineException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Sep 10 17:09:52 2012
@@ -292,21 +292,21 @@ private[kafka] class ZookeeperConsumerCo
         return partitionInfo.getConsumeOffset
     }
 
-    //otherwise, try to get it from zookeeper
+    // otherwise, try to get it from zookeeper
     try {
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
       val znode = topicDirs.consumerOffsetDir + "/" + partitionId
       val offsetString = readDataMaybeNull(zkClient, znode)._1
-      if (offsetString != null)
-        return offsetString.toLong
-      else
-        return -1
+      offsetString match {
+        case Some(offset) => offset.toLong
+        case None => -1L
+      }
     }
     catch {
       case e =>
         error("error in getConsumedOffset JMX ", e)
+        -2L
     }
-    return -2
   }
 
   def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
@@ -649,18 +649,19 @@ private[kafka] class ZookeeperConsumerCo
       val znode = topicDirs.consumerOffsetDir + "/" + partition
       val offsetString = readDataMaybeNull(zkClient, znode)._1
       // If first time starting a consumer, set the initial offset based on the config
-      var offset : Long = 0L
-      if (offsetString == null)
-        offset = config.autoOffsetReset match {
+      val offset =
+        offsetString match {
+          case Some(offsetStr) => offsetStr.toLong
+          case None =>
+            config.autoOffsetReset match {
               case OffsetRequest.SmallestTimeString =>
-                  earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
+                earliestOrLatestOffset(topic, leader, partition, OffsetRequest.EarliestTime)
               case OffsetRequest.LargestTimeString =>
-                  earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
+                earliestOrLatestOffset(topic, leader, partition, OffsetRequest.LatestTime)
               case _ =>
-                  throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
+                throw new InvalidConfigException("Wrong value in autoOffsetReset in ConsumerConfig")
+            }
         }
-      else
-        offset = offsetString.toLong
       val queue = topicThreadIdAndQueues.get((topic, consumerThreadId))
       val consumedOffset = new AtomicLong(offset)
       val fetchedOffset = new AtomicLong(offset)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Sep 10 17:09:52 2012
@@ -66,7 +66,7 @@ class KafkaApis(val requestChannel: Requ
   }
 
   def handleLeaderAndISRRequest(request: RequestChannel.Request){
-    val leaderAndISRRequest = LeaderAndISRRequest.readFrom(request.request.buffer)
+    val leaderAndISRRequest = LeaderAndIsrRequest.readFrom(request.request.buffer)
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
     trace("Handling leader and isr request " + leaderAndISRRequest)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Mon Sep 10 17:09:52 2012
@@ -22,20 +22,21 @@ import collection.immutable.Set
 import kafka.cluster.Broker
 import kafka.api._
 import kafka.network.{Receive, BlockingChannel}
-import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
 import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
 import org.apache.zookeeper.Watcher.Event.KeeperState
 import collection.JavaConversions._
 import kafka.utils.{ShutdownableThread, ZkUtils, Logging}
 import java.lang.Object
+import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
+import kafka.common.{KafkaException, PartitionOfflineException}
 
 
 class RequestSendThread(val controllerId: Int,
                         val toBrokerId: Int,
                         val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                         val channel: BlockingChannel)
-        extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)){
+  extends ShutdownableThread("Controller-%d-to-broker-%d-send-thread".format(controllerId, toBrokerId)) {
   private val lock = new Object()
 
   override def doWork(): Unit = {
@@ -44,163 +45,157 @@ class RequestSendThread(val controllerId
     val callback = queueItem._2
 
     var receive: Receive = null
-    
-        try{
-          lock synchronized {
-            channel.send(request)
-            receive = channel.receive()
-            var response: RequestOrResponse = null
-            request.requestId.get match {
-              case RequestKeys.LeaderAndISRRequest =>
-                response = LeaderAndISRResponse.readFrom(receive.buffer)
-              case RequestKeys.StopReplicaRequest =>
-                response = StopReplicaResponse.readFrom(receive.buffer)
-            }
-            trace("got a response %s".format(controllerId, response, toBrokerId))
 
-            if(callback != null){
-              callback(response)
-            }
-          }
-        } catch {
-          case e =>
-            // log it and let it go. Let controller shut it down.
-            debug("Exception occurs", e)
+    try{
+      lock synchronized {
+        channel.send(request)
+        receive = channel.receive()
+        var response: RequestOrResponse = null
+        request.requestId.get match {
+          case RequestKeys.LeaderAndISRRequest =>
+            response = LeaderAndISRResponse.readFrom(receive.buffer)
+          case RequestKeys.StopReplicaRequest =>
+            response = StopReplicaResponse.readFrom(receive.buffer)
+        }
+        trace("got a response %s".format(controllerId, response, toBrokerId))
+
+        if(callback != null){
+          callback(response)
         }
       }
- 
+    } catch {
+      case e =>
+        // log it and let it go. Let controller shut it down.
+        debug("Exception occurs", e)
+    }
+  }
 }
 
-class ControllerChannelManager(allBrokers: Set[Broker], config : KafkaConfig) extends Logging{
-  private val brokers = new HashMap[Int, Broker]
-  private val messageChannels = new HashMap[Int, BlockingChannel]
-  private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
-  private val messageThreads = new HashMap[Int, RequestSendThread]
-  private val lock = new Object
+class ControllerChannelManager private (config: KafkaConfig) extends Logging {
+  private val brokerStateInfo = new HashMap[Int, ControllerBrokerStateInfo]
+  private val brokerLock = new Object
   this.logIdent = "[Channel manager on controller " + config.brokerId + "], "
-  for(broker <- allBrokers){
-    brokers.put(broker.id, broker)
-    info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
-    val channel = new BlockingChannel(broker.host, broker.port,
-                                      BlockingChannel.UseDefaultBufferSize,
-                                      BlockingChannel.UseDefaultBufferSize,
-                                      config.controllerSocketTimeoutMs)
-    channel.connect()
-    messageChannels.put(broker.id, channel)
-    messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+
+  def this(allBrokers: Set[Broker], config : KafkaConfig) {
+    this(config)
+    allBrokers.foreach(addNewBroker(_))
   }
 
   def startup() = {
-    for((brokerId, broker) <- brokers){
-      val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId), messageChannels(brokerId))
-      thread.setDaemon(false)
-      thread.start()
-      messageThreads.put(broker.id, thread)
+    brokerLock synchronized {
+      brokerStateInfo.foreach(brokerState => startRequestSendThread(brokerState._1))
     }
   }
 
   def shutdown() = {
-    lock synchronized {
-      for((brokerId, broker) <- brokers){
-        removeBroker(brokerId)
-      }
+    brokerLock synchronized {
+      brokerStateInfo.foreach(brokerState => removeExistingBroker(brokerState._1))
     }
   }
 
-  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null){
-    messageQueues(brokerId).put((request, callback))
+  def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) {
+    brokerLock synchronized {
+      brokerStateInfo(brokerId).messageQueue.put((request, callback))
+    }
   }
 
-  def addBroker(broker: Broker){
-    lock synchronized {
-      brokers.put(broker.id, broker)
-      messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
-      val channel = new BlockingChannel(broker.host, broker.port,
-                                        BlockingChannel.UseDefaultBufferSize,
-                                        BlockingChannel.UseDefaultBufferSize,
-                                        config.controllerSocketTimeoutMs)
-      channel.connect()
-      messageChannels.put(broker.id, channel)
-      val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
-      thread.setDaemon(false)
-      thread.start()
-      messageThreads.put(broker.id, thread)
+  def addBroker(broker: Broker) {
+    brokerLock synchronized {
+      addNewBroker(broker)
+      startRequestSendThread(broker.id)
     }
   }
 
-  def removeBroker(brokerId: Int){
-    lock synchronized {
-      brokers.remove(brokerId)
-      try {
-        messageChannels(brokerId).disconnect()
-        messageChannels.remove(brokerId)
-        messageQueues.remove(brokerId)
-        messageThreads(brokerId).shutdown()
-        messageThreads.remove(brokerId)
-      }catch {
-        case e => error("Error while removing broker by the controller", e)
-      }
+  def removeBroker(brokerId: Int) {
+    brokerLock synchronized {
+      removeExistingBroker(brokerId)
+    }
+  }
+
+  private def addNewBroker(broker: Broker) {
+    val messageQueue = new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize)
+    val channel = new BlockingChannel(broker.host, broker.port,
+      BlockingChannel.UseDefaultBufferSize,
+      BlockingChannel.UseDefaultBufferSize,
+      config.controllerSocketTimeoutMs)
+    channel.connect()
+    val requestThread = new RequestSendThread(config.brokerId, broker.id, messageQueue, channel)
+    requestThread.setDaemon(false)
+    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(channel, broker, messageQueue, requestThread))
+  }
+
+  private def removeExistingBroker(brokerId: Int) {
+    try {
+      brokerStateInfo(brokerId).channel.disconnect()
+      brokerStateInfo(brokerId).requestSendThread.shutdown()
+      brokerStateInfo.remove(brokerId)
+    }catch {
+      case e => error("Error while removing broker by the controller", e)
     }
   }
+
+  private def startRequestSendThread(brokerId: Int) {
+    brokerStateInfo(brokerId).requestSendThread.start()
+  }
 }
 
+case class ControllerBrokerStateInfo(channel: BlockingChannel,
+                                     broker: Broker,
+                                     messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
+                                     requestSendThread: RequestSendThread)
+
 class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
   this.logIdent = "[Controller " + config.brokerId + "], "
-  info("startup");
   private var isRunning = true
   private val controllerLock = new Object
   private var controllerChannelManager: ControllerChannelManager = null
-  private var allBrokers : Set[Broker] = null
-  private var allBrokerIds : Set[Int] = null
+  private var liveBrokers : Set[Broker] = null
+  private var liveBrokerIds : Set[Int] = null
   private var allTopics: Set[String] = null
   private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
   private var allLeaders: mutable.Map[(String, Int), Int] = null
 
-  // Return true if this controller succeeds in the controller competition
+  // Return true if this controller succeeds in the controller leader election
   private def tryToBecomeController(): Boolean = {
-    try {
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
-      // Only the broker successfully registering as the controller can execute following code, otherwise
-      // some exception will be thrown.
-      registerBrokerChangeListener()
-      registerTopicChangeListener()
-      allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
-      allBrokerIds = allBrokers.map(_.id)
-      info("all brokers: %s".format(allBrokerIds))
-      allTopics = ZkUtils.getAllTopics(zkClient).toSet
-      info("all topics: %s".format(allTopics))
-      allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
-      info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment))
-      allLeaders = new mutable.HashMap[(String, Int), Int]
-      controllerChannelManager = new ControllerChannelManager(allBrokers, config)
-      controllerChannelManager.startup()
-      return true
-    } catch {
-      case e: ZkNodeExistsException =>
-        registerControllerExistListener()
-        info("broker didn't succeed registering as the controller since it's taken by someone else")
-        return false
-      case e2 => throw e2
-    }
-  }
-
-  private def controllerRegisterOrFailover(){
-    if(!isRunning){
-      info("controller has already been shut down, don't need to compete for lead controller any more")
-      return
-    }
-    info("try to become controller")
-    if(tryToBecomeController() == true){
-      info("won the controller competition and work on leader and isr recovery")
-      deliverLeaderAndISRFromZookeeper(allBrokerIds, allTopics)
-      debug("work on broker changes")
-      onBrokerChange()
-
-      // If there are some partition with leader not initialized, init the leader for them
-      val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1))
-      debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
-      initLeaders(partitionReplicaAssignment)
-    }
+    val controllerStatus =
+      try {
+        ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
+        // Only the broker elected as the new controller can execute following code, otherwise
+        // some exception will be thrown.
+        registerBrokerChangeListener()
+        registerTopicChangeListener()
+        liveBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+        liveBrokerIds = liveBrokers.map(_.id)
+        info("Currently active brokers in the cluster: %s".format(liveBrokerIds))
+        allTopics = ZkUtils.getAllTopics(zkClient).toSet
+        info("Current list of topics in the cluster: %s".format(allTopics))
+        allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
+        info("Partition replica assignment: %s".format(allPartitionReplicaAssignment))
+        allLeaders = new mutable.HashMap[(String, Int), Int]
+        controllerChannelManager = new ControllerChannelManager(liveBrokers, config)
+        controllerChannelManager.startup()
+        true
+      } catch {
+        case e: ZkNodeExistsException =>
+          registerControllerExistsListener()
+          false
+        case e2 => throw e2
+      }
+    controllerStatus
+  }
+
+  private def controllerRegisterOrFailover() {
+    if(isRunning) {
+      if(tryToBecomeController()) {
+        readAndSendLeaderAndIsrFromZookeeper(liveBrokerIds, allTopics)
+        onBrokerChange()
+        // If there are some partition with leader not initialized, init the leader for them
+        val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(m => !allLeaders.contains(m._1))
+        debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
+        initLeaders(partitionReplicaAssignment)
+      }
+    }else
+      info("Controller has been shut down, aborting startup procedure")
   }
 
   def isActive(): Boolean = {
@@ -209,20 +204,22 @@ class KafkaController(config : KafkaConf
 
   def startup() = {
     controllerLock synchronized {
+      info("Controller starting up");
       registerSessionExpirationListener()
-      registerControllerExistListener()
+      registerControllerExistsListener()
       isRunning = true
       controllerRegisterOrFailover()
+      info("Controller startup complete")
     }
   }
 
   def shutdown() = {
     controllerLock synchronized {
-      if(controllerChannelManager != null){
-        info("shut down")
+      if(controllerChannelManager != null) {
+        info("Controller shutting down")
         controllerChannelManager.shutdown()
         controllerChannelManager = null
-        info("shutted down completely")
+        info("Controller shutdown complete")
       }
       isRunning = false
     }
@@ -244,8 +241,8 @@ class KafkaController(config : KafkaConf
     zkClient.subscribeStateChanges(new SessionExpireListener())
   }
 
-  private def registerControllerExistListener(){
-    zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
+  private def registerControllerExistsListener(){
+    zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistsListener())
   }
 
   class SessionExpireListener() extends IZkStateListener with Logging {
@@ -265,199 +262,265 @@ class KafkaController(config : KafkaConf
     @throws(classOf[Exception])
     def handleNewSession() {
       controllerLock synchronized {
-        if(controllerChannelManager != null){
+        if(controllerChannelManager != null) {
           info("session expires, clean up the state")
           controllerChannelManager.shutdown()
           controllerChannelManager = null
         }
+        controllerRegisterOrFailover()
       }
-      controllerRegisterOrFailover()
     }
   }
 
   /**
-   * Used to populate the leaderAndISR from zookeeper to affected brokers when the brokers comes up
+   * @param brokerIds The set of currently active brokers in the cluster, as known to the controller
+   * @param topics The set of topics known to the controller by reading from zookeeper
+   * This API reads the list of partitions that exist for all the topics in the specified list of input topics.
+   * For each of those partitions, it reads the assigned replica list so that it can send the appropriate leader and
+   * isr state change request to all the brokers in the assigned replica list. It arranges the leader and isr state
+   * change requests by broker id. At the end, it circles through this map, sending the required INIT state change requests
+   * to each broker. This API is called when -
+   * 1. A new broker starts up
+   * 2. A new controller is elected
    */
-  private def deliverLeaderAndISRFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
-    val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRForTopics(zkClient, topics.iterator)
-    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
-    for((topicPartition, leaderAndISR) <- leaderAndISRInfos){
-      // If the leader specified in the leaderAndISR is no longer alive, there is no need to recover it
-      if(allBrokerIds.contains(leaderAndISR.leader)){
-        val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
-        if(brokersAssignedToThisPartitionOpt == None){
-          warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
-        } else{
-          val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
-          relatedBrokersAssignedToThisPartition.foreach(b => {
-            if(!brokerToLeaderAndISRInfosMap.contains(b))
-              brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
-            brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
-          })
-          allLeaders.put(topicPartition, leaderAndISR.leader)
-        }
-      } else
-        debug("during leaderAndISR recovery, the leader %d is not alive any more, just ignore it".format(leaderAndISR.leader))
+  private def readAndSendLeaderAndIsrFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
+    val leaderAndIsrInfo = ZkUtils.getPartitionLeaderAndIsrForTopics(zkClient, topics.iterator)
+    val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+    for((topicPartition, leaderAndIsr) <- leaderAndIsrInfo) {
+      // If the leader specified in the leaderAndIsr is no longer alive, there is no need to recover it
+      liveBrokerIds.contains(leaderAndIsr.leader) match {
+        case true =>
+          val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
+          brokersAssignedToThisPartitionOpt match {
+            case Some(brokersAssignedToThisPartition) =>
+              val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
+              relatedBrokersAssignedToThisPartition.foreach(b => {
+                brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
+                brokerToLeaderAndIsrInfoMap(b).put(topicPartition, leaderAndIsr)
+              })
+              allLeaders.put(topicPartition, leaderAndIsr.leader)
+            case None => warn(("While refreshing controller's leader and isr cache, no replica assignment was found " +
+              "for partition [%s, %d]. Rest of the partition replica assignment is %s").format(topicPartition._1,
+              topicPartition._2, allPartitionReplicaAssignment))
+          }
+        case false =>
+          debug("While refreshing controller's leader and isr cache, broker %d is not alive any more, just ignore it"
+            .format(leaderAndIsr.leader))
+      }
     }
-    info("during leaderAndISR recovery, the broker to request map is [%s]".format(brokerToLeaderAndISRInfosMap.toString()))
+    debug(("While refreshing controller's leader and isr cache, the state change requests for each broker is " +
+      "[%s]").format(brokerToLeaderAndIsrInfoMap.toString()))
 
-    brokerToLeaderAndISRInfosMap.foreach(m =>{
+    brokerToLeaderAndIsrInfoMap.foreach(m =>{
       val broker = m._1
-      val leaderAndISRs = m._2
-      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRs)
-      info("during leaderAndISR recovery, the leaderAndISRRequest sent to new broker [%s] is [%s]".format(broker, leaderAndISRRequest.toString))
-      sendRequest(broker, leaderAndISRRequest)
+      val leaderAndIsrs = m._2
+      val leaderAndIsrRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.IsInit, leaderAndIsrs)
+      info("After refreshing controller's leader and isr cache, the leader and ISR change state change request sent to" +
+        " new broker [%s] is [%s]".format(broker, leaderAndIsrRequest.toString))
+      sendRequest(broker, leaderAndIsrRequest)
     })
-
-    info("after leaderAndISR recovery for brokers %s, the leaders assignment is %s".format(brokerIds, allLeaders))
+    info("After refreshing controller's leader and isr cache for brokers %s, the leaders assignment is %s"
+      .format(brokerIds, allLeaders))
   }
 
-
   private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) {
-    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndISR]]
+    val brokerToLeaderAndISRInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndIsr]]
     for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) {
-      val liveAssignedReplicas = replicaAssignment.filter(r => allBrokerIds.contains(r))
+      val liveAssignedReplicas = replicaAssignment.filter(r => liveBrokerIds.contains(r))
       debug("for topic [%s], partition [%d], live assigned replicas are: [%s]"
-                    .format(topicPartition._1,
-                            topicPartition._2,
-                            liveAssignedReplicas))
-      if(!liveAssignedReplicas.isEmpty){
+        .format(topicPartition._1,
+        topicPartition._2,
+        liveAssignedReplicas))
+      if(!liveAssignedReplicas.isEmpty) {
         debug("live assigned replica is not empty, check zkClient: %s".format(zkClient))
         val leader = liveAssignedReplicas.head
-        var leaderAndISR: LeaderAndISR = null
+        var leaderAndISR: LeaderAndIsr = null
         var updateLeaderISRZKPathSucceeded: Boolean = false
-        while(!updateLeaderISRZKPathSucceeded){
-          val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
+        while(!updateLeaderISRZKPathSucceeded) {
+          val curLeaderAndISROpt = ZkUtils.getLeaderAndIsrForPartition(zkClient, topicPartition._1, topicPartition._2)
           debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient))
           if(curLeaderAndISROpt == None){
             debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2))
-            leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList)
-            ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
+            leaderAndISR = new LeaderAndIsr(leader, liveAssignedReplicas.toList)
+            ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
             updateLeaderISRZKPathSucceeded = true
-          } else{
-            debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is not empty".format(topicPartition._1, topicPartition._2))
+          } else {
+            debug("During initializing leader of parition (%s, %d),".format(topicPartition._1, topicPartition._2) +
+              " the current leader and isr in zookeeper is not empty")
             val curZkPathVersion = curLeaderAndISROpt.get.zkVersion
-            leaderAndISR = new LeaderAndISR(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList,  curLeaderAndISROpt.get.zkVersion + 1)
-            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
-            if(updateSucceeded){
+            leaderAndISR = new LeaderAndIsr(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList,
+              curLeaderAndISROpt.get.zkVersion + 1)
+            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+              ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2),
+              leaderAndISR.toString, curZkPathVersion)
+            if(updateSucceeded) {
               leaderAndISR.zkVersion = newVersion
             }
             updateLeaderISRZKPathSucceeded = updateSucceeded
           }
         }
         liveAssignedReplicas.foreach(b => {
-          if(!brokerToLeaderAndISRInfosMap.contains(b))
-            brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
-          brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+          if(!brokerToLeaderAndISRInfoMap.contains(b))
+            brokerToLeaderAndISRInfoMap.put(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
+          brokerToLeaderAndISRInfoMap(b).put(topicPartition, leaderAndISR)
         }
         )
         allLeaders.put(topicPartition, leaderAndISR.leader)
       }
       else{
-        warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), allBrokerIds))
+        warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds))
       }
     }
 
-    info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfosMap))
-    brokerToLeaderAndISRInfosMap.foreach(m =>{
+    info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfoMap))
+    brokerToLeaderAndISRInfoMap.foreach(m =>{
       val broker = m._1
       val leaderAndISRs = m._2
-      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRs)
+      val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRs)
       info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest))
       sendRequest(broker, leaderAndISRRequest)
     })
   }
 
-
-  private def onBrokerChange(newBrokers: Set[Int] = null){
+  /**
+   * @param newBrokers The list of brokers that are started up. This is an optional argument that can be empty when
+   * new controller is being elected
+   * The purpose of this API is to send the leader state change request to all live replicas of partitions that
+   * currently don't have an alive leader. It first finds the partitions with dead leaders, then it looks up the list
+   * of assigned replicas for those partitions that are alive. It reads the leader and isr info for those partitions
+   * from zookeeper.
+   * It can happen that when the controller is in the middle of updating the new leader info in zookeeper,
+   * the leader changes the ISR for the partition. Due to this, the zookeeper path's version will be different than
+   * what was known to the controller. So it's new leader update will fail. The controller retries the leader election
+   * based on the new ISR until it's leader update in zookeeper succeeds.
+   * Once the write to zookeeper succeeds, it sends the leader state change request to the live assigned replicas for
+   * each affected partition.
+   */
+  private def onBrokerChange(newBrokers: Set[Int] = Set.empty[Int]) {
     /** handle the new brokers, send request for them to initialize the local log **/
-    if(newBrokers != null && newBrokers.size != 0)
-      deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
+    if(newBrokers.size != 0)
+      readAndSendLeaderAndIsrFromZookeeper(newBrokers, allTopics)
 
     /** handle leader election for the partitions whose leader is no longer alive **/
-    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
-    allLeaders.foreach(m =>{
-      val topicPartition = m._1
-      val leader = m._2
-      // We only care about the partitions, whose leader is no longer alive
-      if(!allBrokerIds.contains(leader)){
-        var updateLeaderISRZKPathSucceeded: Boolean = false
-        while(!updateLeaderISRZKPathSucceeded){
-          val assignedReplicasOpt = allPartitionReplicaAssignment.get(topicPartition)
-          if(assignedReplicasOpt == None)
-            throw new IllegalStateException("On broker changes, the assigned replica for [%s, %d], shouldn't be None, the general assignment is %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
-          val assignedReplicas = assignedReplicasOpt.get
-          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => allBrokerIds.contains(r))
-          val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
-          if(curLeaderAndISROpt == None){
-            throw new IllegalStateException("On broker change, there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topicPartition._1, topicPartition._2))
-          }
-          val curLeaderAndISR = curLeaderAndISROpt.get
-          val leader = curLeaderAndISR.leader
-          var newLeader: Int = -1
-          val leaderEpoch = curLeaderAndISR.leaderEpoch
-          val ISR = curLeaderAndISR.ISR
-          val curZkPathVersion = curLeaderAndISR.zkVersion
-          debug("leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]".format(topicPartition._1, topicPartition._2, leader, leaderEpoch, ISR, curZkPathVersion))
-          // The leader is no longer alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader
-          var leaderAndISR: LeaderAndISR = null
-          // The ISR contains at least 1 broker in the live broker list
-          val liveBrokersInISR = ISR.filter(r => allBrokerIds.contains(r))
-          if(!liveBrokersInISR.isEmpty){
-            newLeader = liveBrokersInISR.head
-            leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch +1, liveBrokersInISR.toList, curZkPathVersion + 1)
-            debug("some broker in ISR is alive, new leader and ISR is %s".format(leaderAndISR.toString()))
-          } else{
-            debug("live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicasToThisPartition))
-            if (!liveAssignedReplicasToThisPartition.isEmpty){
-              newLeader = liveAssignedReplicasToThisPartition.head
-              leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch + 1, List(newLeader), curZkPathVersion + 1)
-              warn("on broker change, no broker in ISR is alive, new leader elected is [%s], there's potential data loss".format(newLeader))
-            } else
-              error("on broker change, for partition ([%s, %d]), live brokers are: [%s], assigned replicas are: [%s]; no asigned replica is alive".format(topicPartition._1, topicPartition._2, allBrokerIds, assignedReplicas))
-          }
-          debug("the leader and ISR converted string: [%s]".format(leaderAndISR))
-          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
-          if(updateSucceeded){
-            leaderAndISR.zkVersion = newVersion
-            liveAssignedReplicasToThisPartition.foreach(b => {
-              if(!brokerToLeaderAndISRInfosMap.contains(b))
-                brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
-              brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
-            })
-            allLeaders.put(topicPartition, newLeader)
-            info("on broker changes, allLeader is updated to %s".format(allLeaders))
-          }
-          updateLeaderISRZKPathSucceeded = updateSucceeded
+    val brokerToLeaderAndIsrInfoMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndIsr]]
+    // retain only partitions whose leaders are not alive
+    val partitionsWithDeadLeaders = allLeaders.filter(partitionAndLeader => !liveBrokerIds.contains(partitionAndLeader._2))
+    partitionsWithDeadLeaders.foreach { partitionAndLeader =>
+      val topic = partitionAndLeader._1._1
+      val partition = partitionAndLeader._1._2
+
+      try {
+        allPartitionReplicaAssignment.get((topic, partition)) match {
+          case Some(assignedReplicas) =>
+            val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r))
+            ZkUtils.getLeaderAndIsrForPartition(zkClient, topic, partition) match {
+              case Some(currentLeaderAndIsr) =>
+                try {
+                  // elect new leader or throw exception
+                  val newLeaderAndIsr = electLeaderForPartition(topic, partition, currentLeaderAndIsr, assignedReplicas)
+                  // store new leader and isr info in cache
+                  liveAssignedReplicasToThisPartition.foreach { b =>
+                    brokerToLeaderAndIsrInfoMap.getOrElseUpdate(b, new mutable.HashMap[(String, Int), LeaderAndIsr])
+                    brokerToLeaderAndIsrInfoMap(b).put((topic, partition), newLeaderAndIsr)
+                  }
+                }catch {
+                  case e => error("Error while electing leader for partition [%s, %d]".format(topic, partition))
+                }
+              case None => throw new KafkaException(("On broker changes, " +
+                "there's no leaderAndISR information for partition (%s, %d) in zookeeper").format(topic, partition))
+            }
+          case None => throw new KafkaException(("While handling broker changes, the " +
+            "partition [%s, %d] doesn't have assigned replicas. The replica assignment for other partitions is %s")
+            .format(topic, partition, allPartitionReplicaAssignment))
         }
+      }catch {
+        case e: PartitionOfflineException =>
+          error("All replicas for partition [%s, %d] are dead.".format(topic, partition) +
+            " Marking this partition offline")
       }
-    })
-    brokerToLeaderAndISRInfosMap.foreach(m => {
+    }
+    debug("After leader election, leader cache is updated to %s".format(allLeaders))
+    brokerToLeaderAndIsrInfoMap.foreach(m => {
       val broker = m._1
-      val leaderAndISRInfos = m._2
-      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
+      val leaderAndISRInfo = m._2
+      val leaderAndISRRequest = new LeaderAndIsrRequest(LeaderAndIsrRequest.NotInit, leaderAndISRInfo)
       sendRequest(broker, leaderAndISRRequest)
-      info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(broker, leaderAndISRRequest))
+      info("On broker changes, the LeaderAndIsrRequest send to broker [%d] is [%s]".format(broker, leaderAndISRRequest))
     })
   }
 
+  /**
+   * @param topic                      The topic of the partition whose leader needs to be elected
+   * @param partition                  The partition whose leader needs to be elected
+   * @param currentLeaderAndIsr        The leader and isr information stored for this partition in zookeeper
+   * @param assignedReplicas           The list of replicas assigned to the input partition
+   * @throws PartitionOfflineException If no replica in the assigned replicas list is alive
+   * This API selects a new leader for the input partition -
+   * 1. If at least one broker from the isr is alive, it picks a broker from the isr as the new leader
+   * 2. Else, it picks some alive broker from the assigned replica list as the new leader
+   * 3. If no broker in the assigned replica list is alive, it throws PartitionOfflineException
+   * Once the leader is successfully registered in zookeeper, it updates the allLeaders cache
+   * TODO: If a leader cannot be elected for a partition, it should be marked offline and exposed through some metric
+   */
+  private def electLeaderForPartition(topic: String, partition: Int, currentLeaderAndIsr: LeaderAndIsr,
+                                      assignedReplicas: Seq[Int]):LeaderAndIsr = {
+    var zookeeperPathUpdateSucceeded: Boolean = false
+    var newLeaderAndIsr: LeaderAndIsr = currentLeaderAndIsr
+    while(!zookeeperPathUpdateSucceeded) {
+      val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => liveBrokerIds.contains(r))
+      val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => liveBrokerIds.contains(r))
+      val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
+      val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
+      debug("Leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]"
+        .format(topic, partition, currentLeaderAndIsr.leader, currentLeaderEpoch, currentLeaderAndIsr.isr,
+        currentLeaderIsrZkPathVersion))
+      newLeaderAndIsr = liveBrokersInIsr.isEmpty match {
+        case true =>
+          debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
+            .format(liveAssignedReplicasToThisPartition.mkString(",")))
+          liveAssignedReplicasToThisPartition.isEmpty match {
+            case true => throw new PartitionOfflineException(("No replica for partition " +
+              "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
+              " Assigned replicas are: [%s]".format(assignedReplicas))
+            case false =>
+              val newLeader = liveAssignedReplicasToThisPartition.head
+              warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
+                "There's potential data loss")
+              new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
+          }
+        case false =>
+          val newLeader = liveBrokersInIsr.head
+          debug("Some broker in ISR is alive, picking the leader from the ISR: " + newLeader)
+          new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr.toList, currentLeaderIsrZkPathVersion + 1)
+      }
+      info("New leader and ISR for partition [%s, %d] is %s".format(topic, partition, newLeaderAndIsr.toString()))
+      // update the new leadership decision in zookeeper or retry
+      val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
+        ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partition),
+        newLeaderAndIsr.toString, currentLeaderAndIsr.zkVersion)
+      newLeaderAndIsr.zkVersion = newVersion
+      zookeeperPathUpdateSucceeded = updateSucceeded
+    }
+    // update the leader cache
+    allLeaders.put((topic, partition), newLeaderAndIsr.leader)
+    newLeaderAndIsr
+  }
+
   class BrokerChangeListener() extends IZkChildListener with Logging {
     this.logIdent = "[Controller " + config.brokerId + "], "
-    def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
+    def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
       controllerLock synchronized {
-        info("broker change listener triggered")
-        val curChildrenSeq: Seq[String] = javaCurChildren
-        val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
-        val curBrokerIds = curBrokerIdsSeq.toSet
-        val addedBrokerIds = curBrokerIds -- allBrokerIds
-        val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
-        val deletedBrokerIds = allBrokerIds -- curBrokerIds
-        allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
-        allBrokerIds = allBrokers.map(_.id)
-        info("added brokers: %s, deleted brokers: %s, all brokers: %s".format(addedBrokerIds, deletedBrokerIds, allBrokerIds))
-        addedBrokersSeq.foreach(controllerChannelManager.addBroker(_))
+        val curBrokerIds = currentBrokerList.map(_.toInt).toSet
+        val newBrokerIds = curBrokerIds -- liveBrokerIds
+        val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+        val deletedBrokerIds = liveBrokerIds -- curBrokerIds
+        liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+        liveBrokerIds = liveBrokers.map(_.id)
+        info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+          .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
+        newBrokers.foreach(controllerChannelManager.addBroker(_))
         deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
-        onBrokerChange(addedBrokerIds)
+        onBrokerChange(newBrokerIds)
       }
     }
   }
@@ -465,7 +528,7 @@ class KafkaController(config : KafkaConf
   private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
     // get relevant partitions to this broker
     val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1))
-    trace("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
+    debug("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
     initLeaders(partitionReplicaAssignment)
   }
 
@@ -479,24 +542,24 @@ class KafkaController(config : KafkaConf
       }
       allLeaders.remove(topicPartition)
       info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap))
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2))
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndIsrPath(topicPartition._1, topicPartition._2))
     }
     for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
       val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
       info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
       sendRequest(broker, stopReplicaRequest)
     }
-    /*TODO: kafka-330  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
+    /* TODO: kafka-330  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
   }
 
   class TopicChangeListener extends IZkChildListener with Logging {
     this.logIdent = "[Controller " + config.brokerId + "], "
 
     @throws(classOf[Exception])
-    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+    def handleChildChange(parentPath : String, children : java.util.List[String]) {
       controllerLock synchronized {
         info("topic/partition change listener fired for path " + parentPath)
-        val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+        val currentChildren = JavaConversions.asBuffer(children).toSet
         val newTopics = currentChildren -- allTopics
         val deletedTopics = allTopics -- currentChildren
         val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1))
@@ -512,7 +575,7 @@ class KafkaController(config : KafkaConf
     }
   }
 
-  class ControllerExistListener extends IZkDataListener with Logging {
+  class ControllerExistsListener extends IZkDataListener with Logging {
     this.logIdent = "[Controller " + config.brokerId + "], "
 
     @throws(classOf[Exception])
@@ -523,7 +586,7 @@ class KafkaController(config : KafkaConf
     @throws(classOf[Exception])
     def handleDataDeleted(dataPath: String) {
       controllerLock synchronized {
-        info("the current controller failed, competes to be new controller")
+        info("Current controller failed, participating in election for a new controller")
         controllerRegisterOrFailover()
       }
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Mon Sep 10 17:09:52 2012
@@ -22,7 +22,7 @@ import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.atomic.AtomicBoolean
 import kafka.utils._
 import kafka.log.LogManager
-import kafka.api.{LeaderAndISRRequest, LeaderAndISR}
+import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
 import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
 
 object ReplicaManager {
@@ -114,7 +114,7 @@ class ReplicaManager(val config: KafkaCo
     }
   }
 
-  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndISRRequest): collection.Map[(String, Int), Short] = {
+  def becomeLeaderOrFollower(leaderAndISRRequest: LeaderAndIsrRequest): collection.Map[(String, Int), Short] = {
     info("Handling leader and isr request %s".format(leaderAndISRRequest))
     val responseMap = new collection.mutable.HashMap[(String, Int), Short]
 
@@ -141,7 +141,7 @@ class ReplicaManager(val config: KafkaCo
      *  If IsInit flag is on, this means that the controller wants to treat topics not in the request
      *  as deleted.
      */
-    if(leaderAndISRRequest.isInit == LeaderAndISRRequest.IsInit){
+    if(leaderAndISRRequest.isInit == LeaderAndIsrRequest.IsInit){
       startHighWaterMarksCheckPointThread
       val partitionsToRemove = allPartitions.filter(p => !leaderAndISRRequest.leaderAndISRInfos.contains(p._1)).map(entry => entry._1)
       info("Init flag is set in leaderAndISR request, partitions to remove: %s".format(partitionsToRemove))
@@ -151,7 +151,7 @@ class ReplicaManager(val config: KafkaCo
     responseMap
   }
 
-  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) = {
+  private def makeLeader(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) = {
     info("Becoming Leader for topic [%s] partition [%d]".format(topic, partitionId))
     val partition = getOrCreatePartition(topic, partitionId)
     if (partition.makeLeaderOrFollower(topic, partitionId, leaderAndISR, true)) {
@@ -163,7 +163,7 @@ class ReplicaManager(val config: KafkaCo
     info("Completed the leader state transition for topic %s partition %d".format(topic, partitionId))
   }
 
-  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndISR) {
+  private def makeFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr) {
     val leaderBrokerId: Int = leaderAndISR.leader
     info("Starting the follower state transition to follow leader %d for topic %s partition %d"
                  .format(leaderBrokerId, topic, partitionId))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Mon Sep 10 17:09:52 2012
@@ -62,18 +62,17 @@ object UpdateOffsetsInZK {
           "getOffsetsBefore request")
       }
 
-      val brokerInfos = ZkUtils.getBrokerInfoFromIds(zkClient, List(broker))
-      if(brokerInfos.size == 0)
-        throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
+      ZkUtils.getBrokerInfo(zkClient, broker) match {
+        case Some(brokerInfo) =>
+          val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
+          val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
+          val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
-      val brokerInfo = brokerInfos.head
-      val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
-      val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
-      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-
-      println("updating partition " + partition + " with new offset: " + offsets(0))
-      ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString)
-      numParts += 1
+          println("updating partition " + partition + " with new offset: " + offsets(0))
+          ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offsets(0).toString)
+          numParts += 1
+        case None => throw new KafkaException("Broker information for broker id %d does not exist in ZK".format(broker))
+      }
     }
     println("updated the offset for " + numParts + " partitions")
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1382988&r1=1382987&r2=1382988&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Sep 10 17:09:52 2012
@@ -26,7 +26,6 @@ import java.util.zip.CRC32
 import javax.management._
 import scala.collection._
 import scala.collection.mutable
-import kafka.message.{NoCompressionCodec, CompressionCodec}
 import org.I0Itec.zkclient.ZkClient
 import java.util.{Random, Properties}
 import joptsimple.{OptionSpec, OptionSet, OptionParser}