You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 18:14:01 UTC

svn commit: r1368092 [2/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Wed Aug  1 16:13:59 2012
@@ -16,12 +16,11 @@
  */
 package kafka.server
 
-import kafka.common.KafkaZookeeperClient
 import collection.mutable.HashMap
+import collection._
 import collection.immutable.Set
 import kafka.cluster.Broker
 import kafka.api._
-import java.lang.Object
 import kafka.network.{Receive, BlockingChannel}
 import kafka.utils.{ZkUtils, Logging}
 import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
@@ -29,36 +28,44 @@ import org.I0Itec.zkclient.exception.ZkN
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
 import org.apache.zookeeper.Watcher.Event.KeeperState
+import collection.JavaConversions._
+import java.lang.Object
 
-
-class RequestSendThread(val brokerId: Int,
+class RequestSendThread(val controllerId: Int,
+                        val toBrokerId: Int,
                         val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                         val channel: BlockingChannel)
-        extends Thread("requestSendThread-" + brokerId) with Logging {
+        extends Thread("requestSendThread-" + toBrokerId) with Logging {
+  this.logIdent = "Controller %d, request send thread to broker %d, ".format(controllerId, toBrokerId)
   val isRunning: AtomicBoolean = new AtomicBoolean(true)
   private val shutDownLatch = new CountDownLatch(1)
-  private val lock = new Object
+  private val lock = new Object()
 
   def shutDown(): Unit = {
-    info("Shutting down controller request send thread to broker %d".format(brokerId))
+    info("shutting down")
     isRunning.set(false)
     interrupt()
     shutDownLatch.await()
-    info("Controller request send thread to broker %d shutting down completed".format(brokerId))
+    info("shutted down completed")
   }
 
   override def run(): Unit = {
     try{
-      info("In controller, thread for broker: " + brokerId + " started running")
       while(isRunning.get()){
         val queueItem = queue.take()
         val request = queueItem._1
         val callback = queueItem._2
 
         var receive: Receive = null
-        lock synchronized {
-          channel.send(request)
-          receive = channel.receive()
+        try{
+          lock synchronized {
+            channel.send(request)
+            receive = channel.receive()
+          }
+        } catch {
+          case e =>
+            // log it and let it go. Let controller shut it down.
+            debug("Exception occurs", e)
         }
 
         var response: RequestOrResponse = null
@@ -68,13 +75,15 @@ class RequestSendThread(val brokerId: In
           case RequestKeys.StopReplicaRequest =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }
+        trace("got a response %s".format(controllerId, response, toBrokerId))
+
         if(callback != null){
           callback(response)
         }
       }
     } catch{
-      case e: InterruptedException => warn("Controller request send thread to broker %d is intterrupted. Shutting down".format(brokerId))
-      case e1 => error("Error in controller request send thread to broker %d down due to ".format(brokerId), e1)
+      case e: InterruptedException => warn("intterrupted. Shutting down")
+      case e1 => error("Error due to ", e1)
     }
     shutDownLatch.countDown()
   }
@@ -85,9 +94,10 @@ class ControllerChannelManager(allBroker
   private val messageChannels = new HashMap[Int, BlockingChannel]
   private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
   private val messageThreads = new HashMap[Int, RequestSendThread]
+  this.logIdent = "Channel manager on controller " + config.brokerId + ", "
   for(broker <- allBrokers){
     brokers.put(broker.id, broker)
-    info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+    info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
     val channel = new BlockingChannel(broker.host, broker.port,
                                       BlockingChannel.UseDefaultBufferSize,
                                       BlockingChannel.UseDefaultBufferSize,
@@ -99,7 +109,7 @@ class ControllerChannelManager(allBroker
 
   def startUp() = {
     for((brokerId, broker) <- brokers){
-      val thread = new RequestSendThread(brokerId, messageQueues(brokerId), messageChannels(brokerId))
+      val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId), messageChannels(brokerId))
       thread.setDaemon(false)
       thread.start()
       messageThreads.put(broker.id, thread)
@@ -119,14 +129,13 @@ class ControllerChannelManager(allBroker
   def addBroker(broker: Broker){
     brokers.put(broker.id, broker)
     messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
-    info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
     val channel = new BlockingChannel(broker.host, broker.port,
                                       BlockingChannel.UseDefaultBufferSize,
                                       BlockingChannel.UseDefaultBufferSize,
                                       config.controllerSocketTimeoutMs)
     channel.connect()
     messageChannels.put(broker.id, channel)
-    val thread = new RequestSendThread(broker.id, messageQueues(broker.id), messageChannels(broker.id))
+    val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
     thread.setDaemon(false)
     thread.start()
     messageThreads.put(broker.id, thread)
@@ -146,38 +155,62 @@ class ControllerChannelManager(allBroker
   }
 }
 
-class KafkaController(config : KafkaConfig) extends Logging {
-  info("controller startup");
-  private val lock = new Object
-
-  private var zkClient: ZkClient = null
+class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
+  this.logIdent = "Controller " + config.brokerId + ", "
+  info("startup");
+  private val controllerLock = new Object
   private var controllerChannelManager: ControllerChannelManager = null
   private var allBrokers : Set[Broker] = null
+  private var allBrokerIds : Set[Int] = null
   private var allTopics: Set[String] = null
+  private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
+  private var allLeaders: mutable.Map[(String, Int), Int] = null
 
-  private def tryToBecomeController() = {
-    lock synchronized {
-      val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
-      if (curController == null){
-        try {
-          ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString())
-
-          // Only the broker successfully registering as the controller can execute following code, otherwise
-          // some exception will be thrown.
-          registerBrokerChangeListener()
-          registerTopicChangeListener()
-          allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
-          allTopics = ZkUtils.getAllTopics(zkClient).toSet
-          controllerChannelManager = new ControllerChannelManager(allBrokers, config)
-          controllerChannelManager.startUp()
-        } catch {
-          case e: ZkNodeExistsException =>
-            registerControllerExistListener()
-            info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else")
-          case e2 => throw e2
+  // Return true if this controller succeeds in the controller competition
+  private def tryToBecomeController(): Boolean = {
+    try {
+      ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
+      // Only the broker successfully registering as the controller can execute following code, otherwise
+      // some exception will be thrown.
+      registerBrokerChangeListener()
+      registerTopicChangeListener()
+      allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+      allBrokerIds = allBrokers.map(_.id)
+      info("all brokers: %s".format(allBrokerIds))
+      allTopics = ZkUtils.getAllTopics(zkClient).toSet
+      info("all topics: %s".format(allTopics))
+      allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
+      info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment))
+      allLeaders = new mutable.HashMap[(String, Int), Int]
+      controllerChannelManager = new ControllerChannelManager(allBrokers, config)
+      controllerChannelManager.startUp()
+      return true
+    } catch {
+      case e: ZkNodeExistsException =>
+        registerControllerExistListener()
+        info("broker didn't succeed registering as the controller since it's taken by someone else")
+        return false
+      case e2 => throw e2
+    }
+  }
+
+  private def controllerRegisterOrFailover(){
+    info("try to become controller")
+    if(tryToBecomeController() == true){
+      info("won the controller competition and work on leader and isr recovery")
+      deliverLeaderAndISRFromZookeeper(allBrokerIds, allTopics)
+      debug("work on broker changes")
+      onBrokerChange()
+
+      // If there are some partition with leader not initialized, init the leader for them
+      val partitionReplicaAssignment = allPartitionReplicaAssignment.clone()
+      for((topicPartition, replicas) <- partitionReplicaAssignment){
+        if (allLeaders.contains(topicPartition)){
+          partitionReplicaAssignment.remove(topicPartition)
         }
       }
-      else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController)
+      debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
+      initLeaders(partitionReplicaAssignment)
     }
   }
 
@@ -186,17 +219,22 @@ class KafkaController(config : KafkaConf
   }
 
   def startup() = {
-    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
-    registerSessionExpirationListener()
-    registerControllerExistListener()
-    tryToBecomeController()
+    controllerLock synchronized {
+      registerSessionExpirationListener()
+      registerControllerExistListener()
+      controllerRegisterOrFailover()
+    }
   }
 
   def shutDown() = {
-    if(controllerChannelManager != null)
-      controllerChannelManager.shutDown()
-    if(zkClient != null)
-      zkClient.close()
+    controllerLock synchronized {
+      if(controllerChannelManager != null){
+        info("shut down")
+        controllerChannelManager.shutDown()
+        controllerChannelManager = null
+        info("shutted down completely")
+      }
+    }
   }
 
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
@@ -219,7 +257,8 @@ class KafkaController(config : KafkaConf
     zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
   }
 
-  class SessionExpireListener() extends IZkStateListener {
+  class SessionExpireListener() extends IZkStateListener with Logging {
+    this.logIdent = "Controller " + config.brokerId + ", "
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
@@ -234,50 +273,256 @@ class KafkaController(config : KafkaConf
      */
     @throws(classOf[Exception])
     def handleNewSession() {
-      info("Controller session expires, clean up the state, current controller: " + config.brokerId)
-      controllerChannelManager.shutDown()
-      controllerChannelManager = null
-      info("Controller session expires, the channel manager shut downr: " + config.brokerId)
-      tryToBecomeController()
+      controllerLock synchronized {
+        info("session expires, clean up the state")
+        controllerChannelManager.shutDown()
+        controllerChannelManager = null
+        controllerRegisterOrFailover()
+      }
     }
   }
 
+  /**
+   * Used to populate the leaderAndISR from zookeeper to affected brokers when the brokers comes up
+   */
+  private def deliverLeaderAndISRFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
+    val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRForTopics(zkClient, topics.iterator)
+    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
+    for((topicPartition, leaderAndISR) <- leaderAndISRInfos){
+      // If the leader specified in the leaderAndISR is no longer alive, there is no need to recover it
+      if(allBrokerIds.contains(leaderAndISR.leader)){
+        val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
+        if(brokersAssignedToThisPartitionOpt == None){
+          warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
+        } else{
+          val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
+          relatedBrokersAssignedToThisPartition.foreach(b => {
+            if(!brokerToLeaderAndISRInfosMap.contains(b))
+              brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
+            brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+          })
+          allLeaders.put(topicPartition, leaderAndISR.leader)
+        }
+      } else
+        debug("during leaderAndISR recovery, the leader %d is not alive any more, just ignore it".format(leaderAndISR.leader))
+    }
+    info("during leaderAndISR recovery, the broker to request map is [%s]".format(brokerToLeaderAndISRInfosMap.toString()))
+
+    brokerToLeaderAndISRInfosMap.foreach(m =>{
+      val broker = m._1
+      val leaderAndISRs = m._2
+      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRs)
+      info("during leaderAndISR recovery, the leaderAndISRRequest sent to new broker [%s] is [%s]".format(broker, leaderAndISRRequest.toString))
+      sendRequest(broker, leaderAndISRRequest)
+    })
+
+    info("after leaderAndISR recovery for brokers %s, the leaders assignment is %s".format(brokerIds, allLeaders))
+  }
+
+
+  private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) {
+    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndISR]]
+    for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) {
+      val liveAssignedReplicas = replicaAssignment.filter(r => allBrokerIds.contains(r))
+      debug("for topic [%s], partition [%d], live assigned replicas are: [%s]"
+                    .format(topicPartition._1,
+                            topicPartition._2,
+                            liveAssignedReplicas))
+      if(!liveAssignedReplicas.isEmpty){
+        debug("live assigned replica is not empty, check zkClient: %s".format(zkClient))
+        val leader = liveAssignedReplicas.head
+        var leaderAndISR: LeaderAndISR = null
+        var updateLeaderISRZKPathSucceeded: Boolean = false
+        while(!updateLeaderISRZKPathSucceeded){
+          val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
+          debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient))
+          if(curLeaderAndISROpt == None){
+            debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2))
+            leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList)
+            ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
+            updateLeaderISRZKPathSucceeded = true
+          } else{
+            debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is not empty".format(topicPartition._1, topicPartition._2))
+            val curZkPathVersion = curLeaderAndISROpt.get.zkVersion
+            leaderAndISR = new LeaderAndISR(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList,  curLeaderAndISROpt.get.zkVersion + 1)
+            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
+            if(updateSucceeded){
+              leaderAndISR.zkVersion = newVersion
+            }
+            updateLeaderISRZKPathSucceeded = updateSucceeded
+          }
+        }
+        liveAssignedReplicas.foreach(b => {
+          if(!brokerToLeaderAndISRInfosMap.contains(b))
+            brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
+          brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+        }
+        )
+        allLeaders.put(topicPartition, leaderAndISR.leader)
+      }
+      else{
+        warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment, allBrokerIds))
+      }
+    }
+
+    info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfosMap))
+    brokerToLeaderAndISRInfosMap.foreach(m =>{
+      val broker = m._1
+      val leaderAndISRs = m._2
+      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRs)
+      info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest))
+      sendRequest(broker, leaderAndISRRequest)
+    })
+  }
+
+
+  private def onBrokerChange(newBrokers: Set[Int] = null){
+    /** handle the new brokers, send request for them to initialize the local log **/
+    if(newBrokers != null)
+      deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
+
+    /** handle leader election for the partitions whose leader is no longer alive **/
+    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
+    allLeaders.foreach(m =>{
+      val topicPartition = m._1
+      val leader = m._2
+      // We only care about the partitions, whose leader is no longer alive
+      if(!allBrokerIds.contains(leader)){
+        var updateLeaderISRZKPathSucceeded: Boolean = false
+        while(!updateLeaderISRZKPathSucceeded){
+          val assignedReplicasOpt = allPartitionReplicaAssignment.get(topicPartition)
+          if(assignedReplicasOpt == None)
+            throw new IllegalStateException("On broker changes, the assigned replica for [%s, %d], shouldn't be None, the general assignment is %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
+          val assignedReplicas = assignedReplicasOpt.get
+          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => allBrokerIds.contains(r))
+          val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
+          if(curLeaderAndISROpt == None){
+            throw new IllegalStateException("On broker change, there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topicPartition._1, topicPartition._2))
+          }
+          val curLeaderAndISR = curLeaderAndISROpt.get
+          val leader = curLeaderAndISR.leader
+          var newLeader: Int = -1
+          val leaderEpoch = curLeaderAndISR.leaderEpoch
+          val ISR = curLeaderAndISR.ISR
+          val curZkPathVersion = curLeaderAndISR.zkVersion
+          debug("leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]".format(topicPartition._1, topicPartition._2, leader, leaderEpoch, ISR, curZkPathVersion))
+          // The leader is no longer alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader
+          var leaderAndISR: LeaderAndISR = null
+          // The ISR contains at least 1 broker in the live broker list
+          val liveBrokersInISR = ISR.filter(r => allBrokerIds.contains(r))
+          if(!liveBrokersInISR.isEmpty){
+            newLeader = liveBrokersInISR.head
+            leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch +1, liveBrokersInISR.toList, curZkPathVersion + 1)
+            debug("some broker in ISR is alive, new leader and ISR is %s".format(leaderAndISR.toString()))
+          } else{
+            debug("live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicasToThisPartition))
+            if (!liveAssignedReplicasToThisPartition.isEmpty){
+              newLeader = liveAssignedReplicasToThisPartition.head
+              leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch + 1, List(newLeader), curZkPathVersion + 1)
+              warn("on broker change, no broker in ISR is alive, new leader elected is [%s], there's potential data loss".format(newLeader))
+            } else
+              error("on broker change, for partition ([%s, %d]), live brokers are: [%s], assigned replicas are: [%s]; no asigned replica is alive".format(topicPartition._1, topicPartition._2, allBrokerIds, assignedReplicas))
+          }
+          debug("the leader and ISR converted string: [%s]".format(leaderAndISR))
+          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
+          if(updateSucceeded){
+            leaderAndISR.zkVersion = newVersion
+            liveAssignedReplicasToThisPartition.foreach(b => {
+              if(!brokerToLeaderAndISRInfosMap.contains(b))
+                brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
+              brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
+            })
+            allLeaders.put(topicPartition, newLeader)
+            info("on broker changes, allLeader is updated to %s".format(allLeaders))
+          }
+          updateLeaderISRZKPathSucceeded = updateSucceeded
+        }
+      }
+    })
+    trace("after acting on broker change, the broker to leaderAndISR request map is".format(brokerToLeaderAndISRInfosMap))
+    brokerToLeaderAndISRInfosMap.foreach(m => {
+      val broker = m._1
+      val leaderAndISRInfos = m._2
+      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
+      sendRequest(broker, leaderAndISRRequest)
+      info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(leaderAndISRRequest, broker))
+    })
+  }
+
   class BrokerChangeListener() extends IZkChildListener with Logging {
+    this.logIdent = "Controller " + config.brokerId + ", "
     def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
-      import scala.collection.JavaConversions._
-      lock synchronized {
-        info("Broker change listener at controller triggerred")
-        val allBrokerIds = allBrokers.map(_.id)
+      controllerLock synchronized {
+        info("broker change listener triggered")
         val curChildrenSeq: Seq[String] = javaCurChildren
         val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
         val curBrokerIds = curBrokerIdsSeq.toSet
         val addedBrokerIds = curBrokerIds -- allBrokerIds
         val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
-        info("Added brokers: " + addedBrokerIds.toString())
         val deletedBrokerIds = allBrokerIds -- curBrokerIds
-        info("Deleted brokers: " + deletedBrokerIds.toString())
-
         allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
+        allBrokerIds = allBrokers.map(_.id)
+        info("added brokers: %s, deleted brokers: %s, all brokers: %s".format(addedBrokerIds, deletedBrokerIds, allBrokerIds))
+        addedBrokersSeq.foreach(controllerChannelManager.addBroker(_))
+        deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
+        onBrokerChange(addedBrokerIds)
+      }
+    }
+  }
 
-        for(broker <- addedBrokersSeq){
-          controllerChannelManager.addBroker(broker)
-        }
-        for (brokerId <- deletedBrokerIds){
-          controllerChannelManager.removeBroker(brokerId)
-        }
-        /** TODO: add other broker change handler logic**/
+  private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+    // get relevant partitions to this broker
+    val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1))
+    trace("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
+    initLeaders(partitionReplicaAssignment)
+  }
+
+  private def handleDeletedTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
+    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
+    for((topicPartition, brokers) <- partitionReplicaAssignment){
+      for (broker <- brokers){
+        if (!brokerToPartitionToStopReplicaMap.contains(broker))
+          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
+        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
       }
+      allLeaders.remove(topicPartition)
+      info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap))
+      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2))
+    }
+    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
+      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
+      info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
+      sendRequest(broker, stopReplicaRequest)
     }
+    /*TODO: kafka-330  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
   }
 
   class TopicChangeListener extends IZkChildListener with Logging {
+    this.logIdent = "Controller " + config.brokerId + ", "
+
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
-      // TODO: Incomplete, do not need to review this time
+      controllerLock synchronized {
+        info("topic/partition change listener fired for path " + parentPath)
+        val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+        val newTopics = currentChildren -- allTopics
+        val deletedTopics = allTopics -- currentChildren
+        val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1))
+        allTopics = currentChildren
+
+        val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.iterator)
+        allPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
+        allPartitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
+        info("new topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, allPartitionReplicaAssignment))
+        handleNewTopics(newTopics, addedPartitionReplicaAssignment)
+        handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
+      }
     }
   }
 
   class ControllerExistListener extends IZkDataListener with Logging {
+    this.logIdent = "Controller " + config.brokerId + ", "
+
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       // do nothing, since No logic is needed here
@@ -285,8 +530,10 @@ class KafkaController(config : KafkaConf
 
     @throws(classOf[Exception])
     def handleDataDeleted(dataPath: String) {
-      info("Controller fail over, broker " + config.brokerId + " try to become controller")
-      tryToBecomeController()
+      controllerLock synchronized {
+        info("the current controller failed, competes to be new controller")
+        controllerRegisterOrFailover()
+      }
     }
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Wed Aug  1 16:13:59 2012
@@ -24,41 +24,44 @@ import java.util.concurrent.atomic.Atomi
 /**
  * A thread that answers kafka requests.
  */
-class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { 
-     
+class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
+  this.logIdent = "Kafka Request Handler " + id + " on Broker " + brokerId + ", "
+
   def run() { 
     while(true) { 
       val req = requestChannel.receiveRequest()
-      trace("Processor " + Thread.currentThread.getName + " got request " + req)
-      if(req == RequestChannel.AllDone)
+      if(req == RequestChannel.AllDone){
+        trace("receives shut down command, shut down".format(brokerId, id))
         return
+      }
+      debug("handles request " + req)
       apis.handle(req)
     }
   }
 
   def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
-  
 }
 
-class KafkaRequestHandlerPool(val requestChannel: RequestChannel, 
-                              val apis: KafkaApis, 
+class KafkaRequestHandlerPool(val brokerId: Int,
+                              val requestChannel: RequestChannel,
+                              val apis: KafkaApis,
                               numThreads: Int) extends Logging {
-  
+  this.logIdent = "Kafka Request Handler on Broker " + brokerId + ", "
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) { 
-    runnables(i) = new KafkaRequestHandler(requestChannel, apis)
+    runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
     threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
     threads(i).start()
   }
   
   def shutdown() {
-    info("Shutting down request handlers")
+    info("shutting down")
     for(handler <- runnables)
       handler.shutdown
     for(thread <- threads)
       thread.join
-    info("Request handlers shut down")
+    info("shutted down completely")
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Wed Aug  1 16:13:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,8 +24,9 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.AtomicBoolean
 import kafka.cluster.Replica
+import kafka.api.LeaderAndISR
+import scala.collection._
 import org.I0Itec.zkclient.ZkClient
-import kafka.common.KafkaZookeeperClient
 
 
 /**
@@ -33,7 +34,7 @@ import kafka.common.KafkaZookeeperClient
  * to start up and shutdown a single Kafka node.
  */
 class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
-
+  this.logIdent = "Kafka Server " + config.brokerId + ", "
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
@@ -44,7 +45,7 @@ class KafkaServer(val config: KafkaConfi
   var kafkaZookeeper: KafkaZooKeeper = null
   var replicaManager: ReplicaManager = null
   private var apis: KafkaApis = null
-  var kafkaController: KafkaController = new KafkaController(config)
+  var kafkaController: KafkaController = null
   val kafkaScheduler = new KafkaScheduler(4)
   var zkClient: ZkClient = null
 
@@ -53,7 +54,7 @@ class KafkaServer(val config: KafkaConfi
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
-    info("Starting Kafka server..." + config.brokerId)
+    info("starting")
     isShuttingDown = new AtomicBoolean(false)
     shutdownLatch = new CountDownLatch(1)
     var needRecovery = true
@@ -62,11 +63,10 @@ class KafkaServer(val config: KafkaConfi
       needRecovery = false
       cleanShutDownFile.delete
     }
-    /* start client */
-    info("Connecting to ZK: " + config.zkConnect)
-    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+
     /* start scheduler */
     kafkaScheduler.startUp
+
     /* start log manager */
     logManager = new LogManager(config,
                                 kafkaScheduler,
@@ -75,88 +75,107 @@ class KafkaServer(val config: KafkaConfi
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)
     logManager.startup()
-                                                
-    socketServer = new SocketServer(config.port,
+
+    socketServer = new SocketServer(config.brokerId,
+                                    config.port,
                                     config.numNetworkThreads,
                                     config.monitoringPeriodSecs,
                                     config.numQueuedRequests,
                                     config.maxSocketRequestSize)
+
+    socketServer.startup
+
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
+    /* start client */
+    kafkaZookeeper = new KafkaZooKeeper(config)
+    // starting relevant replicas and leader election for partitions assigned to this broker
+    kafkaZookeeper.startup
 
-    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler)
+    info("Connecting to ZK: " + config.zkConnect)
 
-    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
-    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
-    socketServer.startup()
+    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, deleteLog)
 
+    kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
+    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper,
+                         addReplica, stopReplica, makeLeader, makeFollower, config.brokerId)
+    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 
-    // starting relevant replicas and leader election for partitions assigned to this broker
-    kafkaZookeeper.startup()
+    /**
+     *  Registers this broker in ZK. After this, consumers can connect to broker.
+     *  So this should happen after socket server start.
+     */
     // start the replica manager
     replicaManager.startup()
     // start the controller
     kafkaController.startup()
-
-    info("Server started.")
+    info("started")
   }
-  
+
+
   /**
    * Shutdown API for shutting down a single instance of the Kafka server.
    * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
    */
   def shutdown() {
+    info("shutting down")
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
-      info("Shutting down Kafka server with id " + config.brokerId)
+      if(requestHandlerPool != null)
+        requestHandlerPool.shutdown()
       kafkaScheduler.shutdown()
       apis.close()
+      kafkaZookeeper.shutdown()
       if(replicaManager != null)
         replicaManager.shutdown()
       if (socketServer != null)
         socketServer.shutdown()
-      if(requestHandlerPool != null)
-        requestHandlerPool.shutdown()
       Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
         logManager.shutdown()
+
       if(kafkaController != null)
         kafkaController.shutDown()
-      kafkaZookeeper.shutdown()
-      zkClient.close()
+
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
-      debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
+      debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
       cleanShutDownFile.createNewFile
       shutdownLatch.countDown()
-      info("Kafka server with id %d shut down completed".format(config.brokerId))
+      info("shutted down completed")
     }
   }
-  
+
   /**
    * After calling shutdown(), use this API to wait until the shutdown is complete
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
   def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
-    info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
-    // get local log
     val log = logManager.getOrCreateLog(topic, partition)
     replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
   }
 
-  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
-    replicaManager.makeLeader(replica, currentISRInZk)
+  def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+    replicaManager.makeLeader(replica, leaderAndISR)
   }
 
-  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
-    replicaManager.makeFollower(replica, leaderBrokerId, zkClient)
+  def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+    replicaManager.makeFollower(replica, leaderAndISR)
   }
 
   def getReplica(topic: String, partition: Int): Option[Replica] =
     replicaManager.getReplica(topic, partition)
 
+  def stopReplica(topic: String, partition: Int): Short = {
+    replicaManager.stopReplica(topic, partition)
+  }
+
+  def deleteLog(topic: String,  partition: Int): Unit = {
+    /* TODO: handle deleteLog in a better way */
+    //logManager.deleteLog(topic, partition)
+  }
+
   def getLogManager(): LogManager = logManager
 
   def getStats(): SocketServerStats = socketServer.stats

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Wed Aug  1 16:13:59 2012
@@ -18,46 +18,29 @@
 package kafka.server
 
 import java.net.InetAddress
-import kafka.cluster.Replica
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
-import kafka.admin.AdminUtils
-import java.lang.Thread
-import collection.mutable.HashSet
+import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
 import kafka.common._
 
+
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
  *   /topics/[topic]/[node_id-partition_num]
  *   /brokers/[0...N] --> host:port
- *
  */
-class KafkaZooKeeper(config: KafkaConfig,
-                     zkClient: ZkClient,
-                     addReplicaCbk: (String, Int, Set[Int]) => Replica,
-                     getReplicaCbk: (String, Int) => Option[Replica],
-                     becomeLeader: (Replica, Seq[Int]) => Unit,
-                     becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
+class KafkaZooKeeper(config: KafkaConfig) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
-  private var leaderChangeListener: LeaderChangeListener = null
-  private var topicPartitionsChangeListener: TopicChangeListener = null
-  private var stateChangeHandler: StateChangeCommandHandler = null
-
-  private val topicListenerLock = new Object
-  private val leaderChangeLock = new Object
-
-  def startup() {
-    leaderChangeListener = new LeaderChangeListener
-    topicPartitionsChangeListener = new TopicChangeListener
-    leaderChangeListener = new LeaderChangeListener
-    topicPartitionsChangeListener = new TopicChangeListener
-    startStateChangeCommandHandler()
-    zkClient.subscribeStateChanges(new SessionExpireListener)
-    registerBrokerInZk()
-    subscribeToTopicAndPartitionsChanges(true)
-  }
+  private var zkClient: ZkClient = null
+
+   def startup() {
+     /* start client */
+     info("connecting to ZK: " + config.zkConnect)
+     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+     zkClient.subscribeStateChanges(new SessionExpireListener)
+     registerBrokerInZk()
+   }
 
   private def registerBrokerInZk() {
     info("Registering broker " + brokerIdPath)
@@ -66,13 +49,6 @@ class KafkaZooKeeper(config: KafkaConfig
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
 
-  private def startStateChangeCommandHandler() {
-    val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize)
-    stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ,
-      ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity)
-    stateChangeHandler.start()
-  }
-
   /**
    *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
    *  connection for us. We need to re-register this broker in the broker registry.
@@ -96,20 +72,24 @@ class KafkaZooKeeper(config: KafkaConfig
       registerBrokerInZk()
       info("done re-registering broker")
       info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
-      zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
-      val topics = ZkUtils.getAllTopics(zkClient)
-      debug("Existing topics are %s".format(topics.mkString(",")))
-      topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
-      handleNewTopics(topics)
     }
   }
 
   def shutdown() {
-    stateChangeHandler.shutdown()
+    if (zkClient != null) {
+      info("Closing zookeeper client...")
+      zkClient.close()
+    }
+  }
+
+  private def doesTopicExistInCluster(topic: String) : Boolean = {
+    val allTopics = ZkUtils.getAllTopics(zkClient)
+    trace("all topics, %s, topic %s".format(allTopics, topic))
+    allTopics.contains(topic)
   }
 
   def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
-    if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
+    if(!doesTopicExistInCluster(topic))
       throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
     // check if partition id is invalid
     if(partition < 0)
@@ -124,256 +104,7 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
-  def getZookeeperClient = zkClient
-
-  def handleNewTopics(topics: Seq[String]) {
-    // get relevant partitions to this broker
-    val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
-    debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
-    for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
-      // subscribe to leader changes for these partitions
-      subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
-      // start replicas for these partitions
-      startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
-    }
-  }
-
-  def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
-    info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
-    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
-    val topics = ZkUtils.getAllTopics(zkClient)
-    val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
-    debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
-    for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
-      // subscribe to leader changes for these partitions
-      subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
-
-      // start replicas for these partitions
-      if(startReplicas)
-        startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
-    }
-  }
-
-  private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) {
-    partitions.foreach { partition =>
-      info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition))
-      // register leader change listener
-      zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
-    }
-  }
-
-  private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
-    partitions.foreach { partition =>
-      val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
-      info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
-      if(assignedReplicas.contains(config.brokerId)) {
-        val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
-        startReplica(replica)
-      } else
-        warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
-          .format(partition, topic, config.brokerId))
-    }
-  }
-
-  private def startReplica(replica: Replica) {
-    info("Starting replica for topic %s partition %d on broker %d"
-      .format(replica.topic, replica.partition.partitionId, replica.brokerId))
-    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
-      case Some(leader) =>
-        info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader))
-        // check if this broker is the leader, if not, then become follower
-        if(leader != config.brokerId)
-          becomeFollower(replica, leader, zkClient)
-      case None => // leader election
-        leaderElection(replica)
-    }
-  }
-
-  def leaderElection(replica: Replica) {
-    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
-    // read the AR list for replica.partition from ZK
-    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
-    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
-    val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
-    if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) {
-      info("Broker %d will participate in leader election for topic %s partition %d"
-        .format(config.brokerId, replica.topic, replica.partition.partitionId))
-      // wait for some time if it is not the preferred replica
-      try {
-        if(replica.brokerId != assignedReplicas.head) {
-          // sleep only if the preferred replica is alive
-          if(liveBrokers.contains(assignedReplicas.head)) {
-            info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) +
-              "partition %d is alive. Waiting for %d ms to allow it to become leader"
-              .format(replica.partition.partitionId, config.preferredReplicaWaitTime))
-            Thread.sleep(config.preferredReplicaWaitTime)
-          }
-        }
-      } catch {
-        case e => // ignoring
-      }
-      val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
-        replica.partition.partitionId, replica.brokerId)
-      newLeaderEpochAndISR match {
-        case Some(epochAndISR) =>
-          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic,
-            replica.partition.partitionId))
-          info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId,
-                                                                    epochAndISR._2.mkString(",")))
-          becomeLeader(replica, epochAndISR._2)
-        case None =>
-          ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
-            case Some(leader) =>
-              becomeFollower(replica, leader, zkClient)
-            case None =>
-              error("Lost leader for topic %s partition %d right after leader election".format(replica.topic,
-                replica.partition.partitionId))
-          }
-      }
-    }
-  }
-
-  private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int],
-                              inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = {
-    // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
-    assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
-      " %s partition %d".format(topic, partition))
-    inSyncReplicas.size > 0 match {
-      case true => // check if this broker is in the ISR. If yes, return true
-        inSyncReplicas.contains(brokerId) match {
-          case true =>
-            info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) +
-              " for topic %s partition %d".format(topic, partition))
-            true
-          case false =>
-            // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR
-            val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r))
-            liveBrokersInISR.isEmpty match {
-              case true =>
-                if(assignedReplicas.contains(brokerId)) {
-                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
-                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
-                      .format(partition, brokerId, assignedReplicas.mkString(",")))
-                  true
-                } else {
-                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
-                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
-                      .format(partition, brokerId, assignedReplicas.mkString(",")))
-                  false
-                }
-              case false =>
-                info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d "
-                  .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " +
-                  "in the ISR")
-                false  // let one of the live brokers in the ISR become the leader
-            }
-        }
-      case false =>
-        if(assignedReplicas.contains(brokerId)) {
-          info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
-            .format(topic, partition, brokerId) + "is part of the assigned replicas list")
-          true
-        } else {
-          info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
-            .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
-          false
-        }
-    }
-  }
-
-  class TopicChangeListener extends IZkChildListener with Logging {
-    private val allTopics = new HashSet[String]()
-    // read existing topics, if any
-    allTopics ++= ZkUtils.getAllTopics(zkClient)
-
-    @throws(classOf[Exception])
-    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
-      import collection.JavaConversions
-      topicListenerLock.synchronized {
-        debug("Topic/partition change listener fired for path " + parentPath)
-        val currentChildren = JavaConversions.asBuffer(curChilds).toSet
-        val newTopics = currentChildren -- allTopics
-        val deletedTopics = allTopics -- currentChildren
-        allTopics.clear()
-        allTopics ++= currentChildren
-
-        debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
-        debug("Current topics in the cluster: [%s]".format(allTopics.mkString(",")))
-        handleNewTopics(newTopics.toSeq)
-        // TODO: Handle topic deletions
-        // handleDeletedTopics(deletedTopics.toSeq)
-      }
-    }
-
-    def doesTopicExistInCluster(topic: String): Boolean = {
-      topicListenerLock.synchronized {
-        allTopics.contains(topic)
-      }
-    }
-  }
-
-  private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
-    // check if this broker hosts a replica for this topic and partition
-    ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId)
-  }
-
-  private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = {
-    // get the topic and partition that this request is meant for
-    val topic = stateChangeCommand.topic
-    val partition = stateChangeCommand.partition
-    val epoch = stateChangeCommand.epoch
-
-    val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
-    // check if the request's epoch matches the current leader's epoch OR the admin command's epoch
-    val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch)
-    if(epoch > currentLeaderEpoch)
-      throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
-        "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
-    validEpoch
-  }
-
-  class LeaderChangeListener extends IZkDataListener with Logging {
-
-    @throws(classOf[Exception])
-    def handleDataChange(dataPath: String, data: Object) {
-      // handle leader change event for path
-      val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
-      val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
-      val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
-      debug("Leader change listener fired on broker %d for path %s. New leader is %d. New epoch is %d".format(config.brokerId,
-        dataPath, newLeader, newEpoch))
-      val topicPartitionInfo = dataPath.split("/")
-      val topic = topicPartitionInfo.takeRight(4).head
-      val partition = topicPartitionInfo.takeRight(2).head.toInt
-      info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
-      val replica = getReplicaCbk(topic, partition).getOrElse(null)
-      assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
-        .format(topic, partition, config.brokerId))
-      replica.partition.leaderId(Some(newLeader))
-      assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
-    }
-
-    @throws(classOf[Exception])
-    def handleDataDeleted(dataPath: String) {
-      leaderChangeLock.synchronized {
-        // leader is deleted for topic partition
-        val topic = dataPath.split("/").takeRight(4).head
-        val partitionId = dataPath.split("/").takeRight(2).head.toInt
-        debug("Leader deleted listener fired for topic %s partition %d on broker %d"
-          .format(topic, partitionId, config.brokerId))
-        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
-        if(assignedReplicas.contains(config.brokerId)) {
-          val replica = getReplicaCbk(topic, partitionId)
-          replica match {
-            case Some(r) => leaderElection(r)
-            case None =>  error("No replica exists for topic %s partition %s on broker %d"
-              .format(topic, partitionId, config.brokerId))
-          }
-        }
-      }
-    }
+  def getZookeeperClient = {
+    zkClient
   }
 }
-
-
-

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Wed Aug  1 16:13:59 2012
@@ -20,10 +20,10 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
+    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
   }
 
   def shutdown() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Wed Aug  1 16:13:59 2012
@@ -18,64 +18,88 @@ package kafka.server
 
 import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
-import collection.mutable
+import collection._
 import mutable.ListBuffer
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
-import kafka.common.{KafkaException, InvalidPartitionException}
+import kafka.api.LeaderAndISR
+import java.util.concurrent.atomic.AtomicBoolean
+import kafka.common.{BrokerNotExistException, KafkaException, ErrorMapping, InvalidPartitionException}
 
-class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler)
-  extends Logging {
 
-  private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
+class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, deleteLocalLog: (String, Int) => Unit) extends Logging {
+
+  var allPartitions = new mutable.HashMap[(String, Int), Partition]()
   private var leaderReplicas = new ListBuffer[Partition]()
   private val leaderReplicaLock = new ReentrantLock()
   private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+  this.logIdent = "Replica Manager on Broker " + config.brokerId + ", "
+
+  val hwCheckPointThreadStarted = new AtomicBoolean(false)
   private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
-  info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
+  info("Created highwatermark file %s".format(highwaterMarkCheckpoint.name))
+
+  def startHighWaterMarksCheckPointThread() = {
+    if(hwCheckPointThreadStarted.compareAndSet(false, true))
+      kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
+  }
 
   def startup() {
     // start the highwatermark checkpoint thread
-    kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0,
-      config.defaultFlushIntervalMs)
     // start ISR expiration thread
     kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
   }
 
   def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
     val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
-    val localReplica = new Replica(config.brokerId, partition, topic, time,
-      Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
-
+    var retReplica : Replica = null
     val replicaOpt = partition.getReplica(config.brokerId)
     replicaOpt match {
       case Some(replica) =>
-        info("Changing remote replica %s into a local replica".format(replica.toString))
+        info("changing remote replica %s into a local replica".format(replica.toString))
         replica.log match {
           case None =>
             replica.log = Some(log)
           case Some(log) => // nothing to do since log already exists
         }
+        retReplica = replica
       case None =>
+        val localReplica = new Replica(config.brokerId, partition, topic, time,
+                                       Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
         partition.addReplica(localReplica)
+        info("adding local replica %d for topic %s partition %s on broker %d".format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+        retReplica = localReplica
     }
     val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
     partition.assignedReplicas(Some(assignedReplicas))
     // get the replica objects for the assigned replicas for this partition
-    info("Added local replica %d for topic %s partition %s on broker %d"
-      .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
-    localReplica
+    retReplica
   }
 
+  def stopReplica(topic: String, partition: Int): Short  = {
+    trace("handling stop replica for partition [%s, %d]".format(topic, partition))
+    val errorCode = ErrorMapping.NoError
+    val replica = getReplica(topic, partition)
+    if(replica.isDefined){
+      replicaFetcherManager.removeFetcher(topic, partition)
+      deleteLocalLog(topic, partition)
+      allPartitions.remove((topic, partition))
+      info("after removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partition, allPartitions))
+    }
+    trace("finishes handling stop replica [%s, %d]".format(topic, partition))
+    errorCode
+  }
+
+
   def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
-    val newPartition = allReplicas.contains((topic, partitionId))
+    val newPartition = allPartitions.contains((topic, partitionId))
     newPartition match {
       case true => // partition exists, do nothing
-        allReplicas.get((topic, partitionId)).get
+        allPartitions.get((topic, partitionId)).get
       case false => // create remote replicas for each replica id in assignedReplicas
         val partition = new Partition(topic, partitionId, time)
-        allReplicas += (topic, partitionId) -> partition
+        allPartitions += (topic, partitionId) -> partition
         (assignedReplicaIds - config.brokerId).foreach(
           replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
         partition
@@ -83,12 +107,11 @@ class ReplicaManager(val config: KafkaCo
   }
 
   def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
-    val partitionOpt = allReplicas.get((topic, partitionId))
+    val partitionOpt = allPartitions.get((topic, partitionId))
     partitionOpt match {
       case Some(partition) => partition
       case None =>
-        throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
-          .format(topic, partitionId, config.brokerId))
+        throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d".format(topic, partitionId, config.brokerId))
     }
   }
 
@@ -97,32 +120,34 @@ class ReplicaManager(val config: KafkaCo
 
     val replicaAdded = partition.addReplica(remoteReplica)
     if(replicaAdded)
-      info("Added remote replica %d for topic %s partition %s on broker %d"
-        .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
+      info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId))
     remoteReplica
   }
 
   def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
-    val replicasOpt = allReplicas.get((topic, partitionId))
-    replicasOpt match {
-      case Some(replicas) =>
-        replicas.getReplica(replicaId)
+    val partitionOpt = allPartitions.get((topic, partitionId))
+    partitionOpt match {
+      case Some(partition) =>
+        partition.getReplica(replicaId)
       case None =>
         None
     }
   }
 
   def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
-    val replicasOpt = allReplicas.get((topic, partitionId))
+    val replicasOpt = allPartitions.get((topic, partitionId))
     replicasOpt match {
       case Some(replicas) =>
         Some(replicas.leaderReplica())
       case None =>
         throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
-          "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+                                         "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
     }
   }
 
+  def getPartition(topic: String, partitionId: Int): Option[Partition] =
+    allPartitions.get((topic, partitionId))
+
   private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
     // set the replica leo
     val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
@@ -137,38 +162,41 @@ class ReplicaManager(val config: KafkaCo
     val newHw = allLeos.min
     val oldHw = partition.leaderHW()
     if(newHw > oldHw) {
-      debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
       partition.leaderHW(Some(newHw))
     }else
       debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
-        replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
+                                                                                            replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
   }
 
-  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
-    info("Broker %d started the leader state transition for topic %s partition %d"
-      .format(config.brokerId, replica.topic, replica.partition.partitionId))
+  def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+    info("becoming Leader for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
+    info("started the leader state transition for topic %s partition %d"
+                 .format(replica.topic, replica.partition.partitionId))
     try {
       // read and cache the ISR
       replica.partition.leaderId(Some(replica.brokerId))
-      replica.partition.updateISR(currentISRInZk.toSet)
+      replica.partition.updateISR(leaderAndISR.ISR.toSet)
       // stop replica fetcher thread, if any
       replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderReplicaLock.lock()
       leaderReplicas += replica.partition
-      info("Broker %d completed the leader state transition for topic %s partition %d"
-        .format(config.brokerId, replica.topic, replica.partition.partitionId))
+      info("completed the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId))
+      ErrorMapping.NoError
     }catch {
-      case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d"
-        .format(config.brokerId, replica.topic, replica.partition.partitionId), e)
+      case e => error("failed to complete the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId), e)
+      ErrorMapping.UnknownCode
+      /* TODO: add specific error code */
     }finally {
       leaderReplicaLock.unlock()
     }
   }
 
-  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
-    info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d"
-      .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+
+  def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
+    val leaderBrokerId: Int = leaderAndISR.leader
+    info("starting the follower state transition to follow leader %d for topic %s partition %d"
+                 .format(leaderBrokerId, replica.topic, replica.partition.partitionId))
     try {
       // set the leader for this partition correctly on this broker
       replica.partition.leaderId(Some(leaderBrokerId))
@@ -177,13 +205,13 @@ class ReplicaManager(val config: KafkaCo
           log.truncateTo(replica.highWatermark())
         case None =>
       }
+      debug("for partition [%s, %d], the leaderBroker is [%d]".format(replica.topic, replica.partition.partitionId, leaderAndISR.leader))
       // get leader for this replica
       val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
       val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
       // become follower only if it is not already following the same leader
       if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
-        info("broker %d becoming follower to leader %d for topic %s partition %d"
-          .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+        info("becoming follower to leader %d for topic %s partition %d".format(leaderBrokerId, replica.topic, replica.partition.partitionId))
         // stop fetcher thread to previous leader
         replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
         // start fetcher thread to current leader
@@ -192,11 +220,15 @@ class ReplicaManager(val config: KafkaCo
       // remove this replica's partition from the ISR expiration queue
       leaderReplicaLock.lock()
       leaderReplicas -= replica.partition
-      info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d"
-        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
-    }catch {
-      case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d"
-        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e)
+      info("completed the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId))
+      ErrorMapping.NoError
+    } catch {
+      case e: BrokerNotExistException =>
+        error("failed to complete the follower state transition to follow leader %d for topic %s partition %d because the leader broker does not exist in the cluster".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
+        ErrorMapping.BrokerNotExistInZookeeperCode
+      case e =>
+        error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
+        ErrorMapping.UnknownCode
     }finally {
       leaderReplicaLock.unlock()
     }
@@ -204,21 +236,18 @@ class ReplicaManager(val config: KafkaCo
 
   private def maybeShrinkISR(): Unit = {
     try {
-      info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
-        .format(config.replicaMaxLagTimeMs))
+      info("evaluating ISR list of partitions to see which replicas can be removed from the ISR")
       leaderReplicaLock.lock()
-      leaderReplicas.foreach { partition =>
-      // shrink ISR if a follower is slow or stuck
+      leaderReplicas.foreach(partition => {
         val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
         if(outOfSyncReplicas.size > 0) {
           val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
           assert(newInSyncReplicas.size > 0)
-          info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId,
-            newInSyncReplicas.map(_.brokerId).mkString(",")))
+          info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
           // update ISR in zk and in memory
           partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
         }
-      }
+      })
     }catch {
       case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
     }finally {
@@ -233,8 +262,7 @@ class ReplicaManager(val config: KafkaCo
       val leaderHW = partition.leaderHW()
       replica.logEndOffset() >= leaderHW
     }
-    else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
-      " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+    else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
   }
 
   def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
@@ -268,21 +296,21 @@ class ReplicaManager(val config: KafkaCo
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */
   def checkpointHighwaterMarks() {
-    val highwaterMarksForAllPartitions = allReplicas.map { partition =>
-      val topic = partition._1._1
-      val partitionId = partition._1._2
-      val localReplicaOpt = partition._2.getReplica(config.brokerId)
-      val hw = localReplicaOpt match {
-        case Some(localReplica) => localReplica.highWatermark()
-        case None =>
-          error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
-            " Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
-          0L
-      }
-      (topic, partitionId) -> hw
-    }.toMap
+    val highwaterMarksForAllPartitions = allPartitions.map
+            { partition =>
+              val topic = partition._1._1
+              val partitionId = partition._1._2
+              val localReplicaOpt = partition._2.getReplica(config.brokerId)
+              val hw = localReplicaOpt match {
+                case Some(localReplica) => localReplica.highWatermark()
+                case None =>
+                  error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + " Replica metadata doesn't exist")
+                  0L
+              }
+              (topic, partitionId) -> hw
+            }.toMap
     highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
-    info("Checkpointed highwatermarks")
+    info("Checkpointed high watermark data: %s".format(highwaterMarksForAllPartitions))
   }
 
   /**
@@ -292,8 +320,9 @@ class ReplicaManager(val config: KafkaCo
   def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
 
   def shutdown() {
+    info("shut down")
     replicaFetcherManager.shutdown()
     checkpointHighwaterMarks()
-    info("Replica manager shutdown on broker " + config.brokerId)
+    info("shuttedd down completely")
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Wed Aug  1 16:13:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -39,7 +39,7 @@ class DelayedRequest(val keys: Seq[Any],
  * request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition)
  * to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request
  * to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
- * 
+ *
  * For us the key is generally a (topic, partition) pair.
  * By calling 
  *   watch(delayedRequest) 
@@ -47,27 +47,27 @@ class DelayedRequest(val keys: Seq[Any],
  *   val satisfied = update(key, request) 
  * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
  * new request.
- * 
+ *
  * An implementation provides extends two helper functions
  *   def checkSatisfied(request: R, delayed: T): Boolean
  * this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed
  * request delayed. This method will likely also need to do whatever bookkeeping is necessary.
- * 
+ *
  * The second function is
  *   def expire(delayed: T)
  * this function handles delayed requests that have hit their time limit without being satisfied.
- * 
+ *
  */
-abstract class RequestPurgatory[T <: DelayedRequest, R] {
-  
+abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) extends  Logging{
+  this.logIdent = logPrefix
   /* a list of requests watching each key */
   private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
-  
+
   /* background thread expiring requests that have been waiting too long */
-  private val expiredRequestReaper = new ExpiredRequestReaper
+  private val expiredRequestReaper = new ExpiredRequestReaper(logPrefix)
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
   expirationThread.start()
-  
+
   /**
    * Add a new delayed request watching the contained keys
    */
@@ -78,7 +78,7 @@ abstract class RequestPurgatory[T <: Del
     }
     expiredRequestReaper.enqueue(delayedRequest)
   }
-  
+
   /**
    * Update any watchers and return a list of newly satisfied requests.
    */
@@ -89,7 +89,7 @@ abstract class RequestPurgatory[T <: Del
     else
       w.collectSatisfiedRequests(request)
   }
-  
+
   private def watchersFor(key: Any): Watchers = {
     var lst = watchersForKey.get(key)
     if(lst == null) {
@@ -98,46 +98,46 @@ abstract class RequestPurgatory[T <: Del
     }
     lst
   }
-  
+
   /**
    * Check if this request satisfied this delayed request
    */
   protected def checkSatisfied(request: R, delayed: T): Boolean
-  
+
   /**
    * Handle an expired delayed request
    */
   protected def expire(delayed: T)
-  
+
   /**
    * Shutdown the expirey thread
    */
   def shutdown() {
     expiredRequestReaper.shutdown()
   }
-  
+
   /**
    * A linked list of DelayedRequests watching some key with some associated bookeeping logic
    */
   private class Watchers {
-    
+
     /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
     private val CleanupThresholdSize = 100
     private val CleanupThresholdPrct = 0.5
-    
+
     private val requests = new LinkedList[T]
-    
+
     /* you can only change this if you have added something or marked something satisfied */
     var liveCount = 0.0
-  
+
     def add(t: T) {
       synchronized {
-        requests.add(t)
-        liveCount += 1
-        maybePurge()
-      }
+                     requests.add(t)
+                     liveCount += 1
+                     maybePurge()
+                   }
     }
-    
+
     private def maybePurge() {
       if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
         val iter = requests.iterator()
@@ -148,55 +148,56 @@ abstract class RequestPurgatory[T <: Del
         }
       }
     }
-  
+
     def decLiveCount() {
       synchronized {
-        liveCount -= 1
-      }
+                     liveCount -= 1
+                   }
     }
-    
+
     def collectSatisfiedRequests(request: R): Seq[T] = {
       val response = new mutable.ArrayBuffer[T]
       synchronized {
-        val iter = requests.iterator()
-        while(iter.hasNext) {
-          val curr = iter.next
-          if(curr.satisfied.get) {
-            // another thread has satisfied this request, remove it
-            iter.remove()
-          } else {
-            if(checkSatisfied(request, curr)) {
-              iter.remove()
-              val updated = curr.satisfied.compareAndSet(false, true)
-              if(updated == true) {
-                response += curr
-                liveCount -= 1
-                expiredRequestReaper.satisfyRequest()
-              }
-            }
-          }
-        }
-      }
+                     val iter = requests.iterator()
+                     while(iter.hasNext) {
+                       val curr = iter.next
+                       if(curr.satisfied.get) {
+                         // another thread has satisfied this request, remove it
+                         iter.remove()
+                       } else {
+                         if(checkSatisfied(request, curr)) {
+                           iter.remove()
+                           val updated = curr.satisfied.compareAndSet(false, true)
+                           if(updated == true) {
+                             response += curr
+                             liveCount -= 1
+                             expiredRequestReaper.satisfyRequest()
+                           }
+                         }
+                       }
+                     }
+                   }
       response
     }
   }
-  
+
   /**
    * Runnable to expire requests that have sat unfullfilled past their deadline
    */
-  private class ExpiredRequestReaper extends Runnable with Logging {
-    
+  private class ExpiredRequestReaper(logPrefix: String) extends Runnable with Logging {
+    this.logIdent = "ExpiredRequestReaper for " + logPrefix
+
     /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
     private val CleanupThresholdSize = 100
     private val CleanupThresholdPrct = 0.5
-    
+
     private val delayed = new DelayQueue[T]
     private val running = new AtomicBoolean(true)
     private val shutdownLatch = new CountDownLatch(1)
     private val needsPurge = new AtomicBoolean(false)
     /* The count of elements in the delay queue that are unsatisfied */
     private val unsatisfied = new AtomicInteger(0)
-    
+
     /** Main loop for the expiry thread */
     def run() {
       while(running.get) {
@@ -204,18 +205,18 @@ abstract class RequestPurgatory[T <: Del
           val curr = pollExpired()
           expire(curr)
         } catch {
-          case ie: InterruptedException => 
+          case ie: InterruptedException =>
             if(needsPurge.getAndSet(false)) {
               val purged = purgeSatisfied()
               debug("Forced purge of " + purged + " requests from delay queue.")
             }
-          case e: Exception => 
+          case e: Exception =>
             error("Error in long poll expiry thread: ", e)
         }
       }
       shutdownLatch.countDown()
     }
-    
+
     /** Add a request to be expired */
     def enqueue(t: T) {
       delayed.add(t)
@@ -223,23 +224,24 @@ abstract class RequestPurgatory[T <: Del
       if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
         forcePurge()
     }
-    
+
     private def forcePurge() {
       needsPurge.set(true)
       expirationThread.interrupt()
     }
-    
+
     /** Shutdown the expiry thread*/
     def shutdown() {
-      debug("Shutting down request expiry thread")
+      debug("shutting down")
       running.set(false)
       expirationThread.interrupt()
       shutdownLatch.await()
+      debug("shut down completely")
     }
-    
+
     /** Record the fact that we satisfied a request in the stats for the expiry queue */
     def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
-    
+
     /**
      * Get the next expired event
      */
@@ -256,7 +258,7 @@ abstract class RequestPurgatory[T <: Del
       }
       throw new RuntimeException("This should not happen")
     }
-  
+
     /**
      * Delete all expired events from the delay queue
      */
@@ -273,5 +275,5 @@ abstract class RequestPurgatory[T <: Del
       purged
     }
   }
-  
+
 }
\ No newline at end of file