You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 00:51:01 UTC

svn commit: r1367811 [2/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Tue Jul 31 22:50:59 2012
@@ -16,11 +16,12 @@
  */
 package kafka.server
 
+import kafka.common.KafkaZookeeperClient
 import collection.mutable.HashMap
-import collection._
 import collection.immutable.Set
 import kafka.cluster.Broker
 import kafka.api._
+import java.lang.Object
 import kafka.network.{Receive, BlockingChannel}
 import kafka.utils.{ZkUtils, Logging}
 import java.util.concurrent.{CountDownLatch, LinkedBlockingQueue, BlockingQueue}
@@ -28,46 +29,36 @@ import org.I0Itec.zkclient.exception.ZkN
 import java.util.concurrent.atomic.AtomicBoolean
 import org.I0Itec.zkclient.{IZkStateListener, ZkClient, IZkDataListener, IZkChildListener}
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import collection.JavaConversions._
-import java.lang.Object
-import java.nio.channels.AsynchronousCloseException
 
 
-class RequestSendThread(val controllerId: Int,
-                        val toBrokerId: Int,
+class RequestSendThread(val brokerId: Int,
                         val queue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                         val channel: BlockingChannel)
-        extends Thread("requestSendThread-" + toBrokerId) with Logging {
-  this.logIdent = "Controller %d, request send thread to broker %d, ".format(controllerId, toBrokerId)
+        extends Thread("requestSendThread-" + brokerId) with Logging {
   val isRunning: AtomicBoolean = new AtomicBoolean(true)
   private val shutDownLatch = new CountDownLatch(1)
-  private val lock = new Object()
+  private val lock = new Object
 
   def shutDown(): Unit = {
-    info("shutting down")
+    info("Shutting down controller request send thread to broker %d".format(brokerId))
     isRunning.set(false)
     interrupt()
     shutDownLatch.await()
-    info("shutted down completed")
+    info("Controller request send thread to broker %d shutting down completed".format(brokerId))
   }
 
   override def run(): Unit = {
     try{
+      info("In controller, thread for broker: " + brokerId + " started running")
       while(isRunning.get()){
         val queueItem = queue.take()
         val request = queueItem._1
         val callback = queueItem._2
 
         var receive: Receive = null
-        try{
-          lock synchronized {
-            channel.send(request)
-            receive = channel.receive()
-          }
-        } catch {
-          case e =>
-            // log it and let it go. Let controller shut it down.
-            debug("Exception occurs", e)
+        lock synchronized {
+          channel.send(request)
+          receive = channel.receive()
         }
 
         var response: RequestOrResponse = null
@@ -77,15 +68,13 @@ class RequestSendThread(val controllerId
           case RequestKeys.StopReplicaRequest =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }
-        trace("got a response %s".format(controllerId, response, toBrokerId))
-
         if(callback != null){
           callback(response)
         }
       }
     } catch{
-      case e: InterruptedException => warn("intterrupted. Shutting down")
-      case e1 => error("Error due to ", e1)
+      case e: InterruptedException => warn("Controller request send thread to broker %d is intterrupted. Shutting down".format(brokerId))
+      case e1 => error("Error in controller request send thread to broker %d down due to ".format(brokerId), e1)
     }
     shutDownLatch.countDown()
   }
@@ -96,10 +85,9 @@ class ControllerChannelManager(allBroker
   private val messageChannels = new HashMap[Int, BlockingChannel]
   private val messageQueues = new HashMap[Int, BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)]]
   private val messageThreads = new HashMap[Int, RequestSendThread]
-  this.logIdent = "Channel manager on controller " + config.brokerId + ", "
   for(broker <- allBrokers){
     brokers.put(broker.id, broker)
-    info("channel to broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
+    info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
     val channel = new BlockingChannel(broker.host, broker.port,
                                       BlockingChannel.UseDefaultBufferSize,
                                       BlockingChannel.UseDefaultBufferSize,
@@ -111,7 +99,7 @@ class ControllerChannelManager(allBroker
 
   def startUp() = {
     for((brokerId, broker) <- brokers){
-      val thread = new RequestSendThread(config.brokerId, brokerId, messageQueues(brokerId), messageChannels(brokerId))
+      val thread = new RequestSendThread(brokerId, messageQueues(brokerId), messageChannels(brokerId))
       thread.setDaemon(false)
       thread.start()
       messageThreads.put(broker.id, thread)
@@ -131,13 +119,14 @@ class ControllerChannelManager(allBroker
   def addBroker(broker: Broker){
     brokers.put(broker.id, broker)
     messageQueues.put(broker.id, new LinkedBlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)](config.controllerMessageQueueSize))
+    info("channel for broker " + broker.id + " created" + " at host: " + broker.host + " and port: " + broker.port)
     val channel = new BlockingChannel(broker.host, broker.port,
                                       BlockingChannel.UseDefaultBufferSize,
                                       BlockingChannel.UseDefaultBufferSize,
                                       config.controllerSocketTimeoutMs)
     channel.connect()
     messageChannels.put(broker.id, channel)
-    val thread = new RequestSendThread(config.brokerId, broker.id, messageQueues(broker.id), messageChannels(broker.id))
+    val thread = new RequestSendThread(broker.id, messageQueues(broker.id), messageChannels(broker.id))
     thread.setDaemon(false)
     thread.start()
     messageThreads.put(broker.id, thread)
@@ -157,62 +146,38 @@ class ControllerChannelManager(allBroker
   }
 }
 
-class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
-  this.logIdent = "Controller " + config.brokerId + ", "
-  info("startup");
-  private val controllerLock = new Object
+class KafkaController(config : KafkaConfig) extends Logging {
+  info("controller startup");
+  private val lock = new Object
+
+  private var zkClient: ZkClient = null
   private var controllerChannelManager: ControllerChannelManager = null
   private var allBrokers : Set[Broker] = null
-  private var allBrokerIds : Set[Int] = null
   private var allTopics: Set[String] = null
-  private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
-  private var allLeaders: mutable.Map[(String, Int), Int] = null
 
-  // Return true if this controller succeeds in the controller competition
-  private def tryToBecomeController(): Boolean = {
-    try {
-      ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString)
-      // Only the broker successfully registering as the controller can execute following code, otherwise
-      // some exception will be thrown.
-      registerBrokerChangeListener()
-      registerTopicChangeListener()
-      allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
-      allBrokerIds = allBrokers.map(_.id)
-      info("all brokers: %s".format(allBrokerIds))
-      allTopics = ZkUtils.getAllTopics(zkClient).toSet
-      info("all topics: %s".format(allTopics))
-      allPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, allTopics.iterator)
-      info("allPartitionReplicaAssignment: %s".format(allPartitionReplicaAssignment))
-      allLeaders = new mutable.HashMap[(String, Int), Int]
-      controllerChannelManager = new ControllerChannelManager(allBrokers, config)
-      controllerChannelManager.startUp()
-      return true
-    } catch {
-      case e: ZkNodeExistsException =>
-        registerControllerExistListener()
-        info("broker didn't succeed registering as the controller since it's taken by someone else")
-        return false
-      case e2 => throw e2
-    }
-  }
-
-  private def controllerRegisterOrFailover(){
-    info("try to become controller")
-    if(tryToBecomeController() == true){
-      info("won the controller competition and work on leader and isr recovery")
-      deliverLeaderAndISRFromZookeeper(allBrokerIds, allTopics)
-      debug("work on broker changes")
-      onBrokerChange()
-
-      // If there are some partition with leader not initialized, init the leader for them
-      val partitionReplicaAssignment = allPartitionReplicaAssignment.clone()
-      for((topicPartition, replicas) <- partitionReplicaAssignment){
-        if (allLeaders.contains(topicPartition)){
-          partitionReplicaAssignment.remove(topicPartition)
+  private def tryToBecomeController() = {
+    lock synchronized {
+      val curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+      if (curController == null){
+        try {
+          ZkUtils.createEphemeralPathExpectConflict(zkClient, ZkUtils.ControllerPath, config.brokerId.toString())
+
+          // Only the broker successfully registering as the controller can execute following code, otherwise
+          // some exception will be thrown.
+          registerBrokerChangeListener()
+          registerTopicChangeListener()
+          allBrokers = ZkUtils.getAllBrokersInCluster(zkClient).toSet
+          allTopics = ZkUtils.getAllTopics(zkClient).toSet
+          controllerChannelManager = new ControllerChannelManager(allBrokers, config)
+          controllerChannelManager.startUp()
+        } catch {
+          case e: ZkNodeExistsException =>
+            registerControllerExistListener()
+            info("Broker " + config.brokerId + " didn't succeed registering as the controller since it's taken by someone else")
+          case e2 => throw e2
         }
       }
-      debug("work on init leaders: %s, current cache for all leader is: %s".format(partitionReplicaAssignment.toString(), allLeaders))
-      initLeaders(partitionReplicaAssignment)
+      else info("Broker " + config.brokerId + " see not null skip " + " current controller " + curController)
     }
   }
 
@@ -221,22 +186,17 @@ class KafkaController(config : KafkaConf
   }
 
   def startup() = {
-    controllerLock synchronized {
-      registerSessionExpirationListener()
-      registerControllerExistListener()
-      controllerRegisterOrFailover()
-    }
+    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    registerSessionExpirationListener()
+    registerControllerExistListener()
+    tryToBecomeController()
   }
 
   def shutDown() = {
-    controllerLock synchronized {
-      if(controllerChannelManager != null){
-        info("shut down")
-        controllerChannelManager.shutDown()
-        controllerChannelManager = null
-        info("shutted down completely")
-      }
-    }
+    if(controllerChannelManager != null)
+      controllerChannelManager.shutDown()
+    if(zkClient != null)
+      zkClient.close()
   }
 
   def sendRequest(brokerId : Int, request : RequestOrResponse, callback: (RequestOrResponse) => Unit = null) = {
@@ -259,8 +219,7 @@ class KafkaController(config : KafkaConf
     zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerExistListener())
   }
 
-  class SessionExpireListener() extends IZkStateListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
+  class SessionExpireListener() extends IZkStateListener {
     @throws(classOf[Exception])
     def handleStateChanged(state: KeeperState) {
       // do nothing, since zkclient will do reconnect for us.
@@ -275,256 +234,50 @@ class KafkaController(config : KafkaConf
      */
     @throws(classOf[Exception])
     def handleNewSession() {
-      controllerLock synchronized {
-        info("session expires, clean up the state")
-        controllerChannelManager.shutDown()
-        controllerChannelManager = null
-        controllerRegisterOrFailover()
-      }
-    }
-  }
-
-  /**
-   * Used to populate the leaderAndISR from zookeeper to affected brokers when the brokers comes up
-   */
-  private def deliverLeaderAndISRFromZookeeper(brokerIds: Set[Int], topics: Set[String]) = {
-    val leaderAndISRInfos = ZkUtils.getPartitionLeaderAndISRForTopics(zkClient, topics.iterator)
-    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
-    for((topicPartition, leaderAndISR) <- leaderAndISRInfos){
-      // If the leader specified in the leaderAndISR is no longer alive, there is no need to recover it
-      if(allBrokerIds.contains(leaderAndISR.leader)){
-        val brokersAssignedToThisPartitionOpt = allPartitionReplicaAssignment.get(topicPartition)
-        if(brokersAssignedToThisPartitionOpt == None){
-          warn("during leaderAndISR recovery, there's no replica assignment for partition [%s, %d] with allPartitionReplicaAssignment: %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
-        } else{
-          val relatedBrokersAssignedToThisPartition = brokersAssignedToThisPartitionOpt.get.filter(brokerIds.contains(_))
-          relatedBrokersAssignedToThisPartition.foreach(b => {
-            if(!brokerToLeaderAndISRInfosMap.contains(b))
-              brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
-            brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
-          })
-          allLeaders.put(topicPartition, leaderAndISR.leader)
-        }
-      } else
-        debug("during leaderAndISR recovery, the leader %d is not alive any more, just ignore it".format(leaderAndISR.leader))
-    }
-    info("during leaderAndISR recovery, the broker to request map is [%s]".format(brokerToLeaderAndISRInfosMap.toString()))
-
-    brokerToLeaderAndISRInfosMap.foreach(m =>{
-      val broker = m._1
-      val leaderAndISRs = m._2
-      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.IsInit, leaderAndISRs)
-      info("during leaderAndISR recovery, the leaderAndISRRequest sent to new broker [%s] is [%s]".format(broker, leaderAndISRRequest.toString))
-      sendRequest(broker, leaderAndISRRequest)
-    })
-
-    info("after leaderAndISR recovery for brokers %s, the leaders assignment is %s".format(brokerIds, allLeaders))
-  }
-
-
-  private def initLeaders(partitionReplicaAssignment: collection.mutable.Map[(String, Int), Seq[Int]]) {
-    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int),LeaderAndISR]]
-    for((topicPartition, replicaAssignment) <- partitionReplicaAssignment) {
-      val liveAssignedReplicas = replicaAssignment.filter(r => allBrokerIds.contains(r))
-      debug("for topic [%s], partition [%d], live assigned replicas are: [%s]"
-                    .format(topicPartition._1,
-                            topicPartition._2,
-                            liveAssignedReplicas))
-      if(!liveAssignedReplicas.isEmpty){
-        debug("live assigned replica is not empty, check zkClient: %s".format(zkClient))
-        val leader = liveAssignedReplicas.head
-        var leaderAndISR: LeaderAndISR = null
-        var updateLeaderISRZKPathSucceeded: Boolean = false
-        while(!updateLeaderISRZKPathSucceeded){
-          val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
-          debug("curLeaderAndISROpt is %s, zkClient is %s ".format(curLeaderAndISROpt, zkClient))
-          if(curLeaderAndISROpt == None){
-            debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is empty".format(topicPartition._1, topicPartition._2))
-            leaderAndISR = new LeaderAndISR(leader, liveAssignedReplicas.toList)
-            ZkUtils.createPersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString)
-            updateLeaderISRZKPathSucceeded = true
-          } else{
-            debug("during initializing leader of parition (%s, %d), the current leader and isr in zookeeper is not empty".format(topicPartition._1, topicPartition._2))
-            val curZkPathVersion = curLeaderAndISROpt.get.zkVersion
-            leaderAndISR = new LeaderAndISR(leader, curLeaderAndISROpt.get.leaderEpoch + 1,liveAssignedReplicas.toList,  curLeaderAndISROpt.get.zkVersion + 1)
-            val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
-            if(updateSucceeded){
-              leaderAndISR.zkVersion = newVersion
-            }
-            updateLeaderISRZKPathSucceeded = updateSucceeded
-          }
-        }
-        liveAssignedReplicas.foreach(b => {
-          if(!brokerToLeaderAndISRInfosMap.contains(b))
-            brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
-          brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
-        }
-        )
-        allLeaders.put(topicPartition, leaderAndISR.leader)
-      }
-      else{
-        warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment, allBrokerIds))
-      }
+      info("Controller session expires, clean up the state, current controller: " + config.brokerId)
+      controllerChannelManager.shutDown()
+      controllerChannelManager = null
+      info("Controller session expires, the channel manager shut downr: " + config.brokerId)
+      tryToBecomeController()
     }
-
-    info("after leaders initialization for partition replica assignments %s, the cached leaders in controller is %s, and the broker to request map is: %s".format(partitionReplicaAssignment, allLeaders, brokerToLeaderAndISRInfosMap))
-    brokerToLeaderAndISRInfosMap.foreach(m =>{
-      val broker = m._1
-      val leaderAndISRs = m._2
-      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRs)
-      info("at initializing leaders for new partitions, the leaderAndISR request sent to broker %d is %s".format(broker, leaderAndISRRequest))
-      sendRequest(broker, leaderAndISRRequest)
-    })
-  }
-
-
-  private def onBrokerChange(newBrokers: Set[Int] = null){
-    /** handle the new brokers, send request for them to initialize the local log **/
-    if(newBrokers != null)
-      deliverLeaderAndISRFromZookeeper(newBrokers, allTopics)
-
-    /** handle leader election for the partitions whose leader is no longer alive **/
-    val brokerToLeaderAndISRInfosMap = new mutable.HashMap[Int, mutable.HashMap[(String, Int), LeaderAndISR]]
-    allLeaders.foreach(m =>{
-      val topicPartition = m._1
-      val leader = m._2
-      // We only care about the partitions, whose leader is no longer alive
-      if(!allBrokerIds.contains(leader)){
-        var updateLeaderISRZKPathSucceeded: Boolean = false
-        while(!updateLeaderISRZKPathSucceeded){
-          val assignedReplicasOpt = allPartitionReplicaAssignment.get(topicPartition)
-          if(assignedReplicasOpt == None)
-            throw new IllegalStateException("On broker changes, the assigned replica for [%s, %d], shouldn't be None, the general assignment is %s".format(topicPartition._1, topicPartition._2, allPartitionReplicaAssignment))
-          val assignedReplicas = assignedReplicasOpt.get
-          val liveAssignedReplicasToThisPartition = assignedReplicas.filter(r => allBrokerIds.contains(r))
-          val curLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topicPartition._1, topicPartition._2)
-          if(curLeaderAndISROpt == None){
-            throw new IllegalStateException("On broker change, there's no leaderAndISR information for partition (%s, %d) in zookeeper".format(topicPartition._1, topicPartition._2))
-          }
-          val curLeaderAndISR = curLeaderAndISROpt.get
-          val leader = curLeaderAndISR.leader
-          var newLeader: Int = -1
-          val leaderEpoch = curLeaderAndISR.leaderEpoch
-          val ISR = curLeaderAndISR.ISR
-          val curZkPathVersion = curLeaderAndISR.zkVersion
-          debug("leader, epoch, ISR and zkPathVersion for partition (%s, %d) are: [%d], [%d], [%s], [%d]".format(topicPartition._1, topicPartition._2, leader, leaderEpoch, ISR, curZkPathVersion))
-          // The leader is no longer alive, need reelection, we only care about the leader change here, the ISR change can be handled by the leader
-          var leaderAndISR: LeaderAndISR = null
-          // The ISR contains at least 1 broker in the live broker list
-          val liveBrokersInISR = ISR.filter(r => allBrokerIds.contains(r))
-          if(!liveBrokersInISR.isEmpty){
-            newLeader = liveBrokersInISR.head
-            leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch +1, liveBrokersInISR.toList, curZkPathVersion + 1)
-            debug("some broker in ISR is alive, new leader and ISR is %s".format(leaderAndISR.toString()))
-          } else{
-            debug("live broker in ISR is empty, see live assigned replicas: %s".format(liveAssignedReplicasToThisPartition))
-            if (!liveAssignedReplicasToThisPartition.isEmpty){
-              newLeader = liveAssignedReplicasToThisPartition.head
-              leaderAndISR = new LeaderAndISR(newLeader, leaderEpoch + 1, List(newLeader), curZkPathVersion + 1)
-              warn("on broker change, no broker in ISR is alive, new leader elected is [%s], there's potential data loss".format(newLeader))
-            } else
-              error("on broker change, for partition ([%s, %d]), live brokers are: [%s], assigned replicas are: [%s]; no asigned replica is alive".format(topicPartition._1, topicPartition._2, allBrokerIds, assignedReplicas))
-          }
-          debug("the leader and ISR converted string: [%s]".format(leaderAndISR))
-          val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2), leaderAndISR.toString, curZkPathVersion)
-          if(updateSucceeded){
-            leaderAndISR.zkVersion = newVersion
-            liveAssignedReplicasToThisPartition.foreach(b => {
-              if(!brokerToLeaderAndISRInfosMap.contains(b))
-                brokerToLeaderAndISRInfosMap.put(b, new mutable.HashMap[(String, Int), LeaderAndISR])
-              brokerToLeaderAndISRInfosMap(b).put(topicPartition, leaderAndISR)
-            })
-            allLeaders.put(topicPartition, newLeader)
-            info("on broker changes, allLeader is updated to %s".format(allLeaders))
-          }
-          updateLeaderISRZKPathSucceeded = updateSucceeded
-        }
-      }
-    })
-    trace("after acting on broker change, the broker to leaderAndISR request map is".format(brokerToLeaderAndISRInfosMap))
-    brokerToLeaderAndISRInfosMap.foreach(m => {
-      val broker = m._1
-      val leaderAndISRInfos = m._2
-      val leaderAndISRRequest = new LeaderAndISRRequest(LeaderAndISRRequest.NotInit, leaderAndISRInfos)
-      sendRequest(broker, leaderAndISRRequest)
-      info("on broker change, the LeaderAndISRRequest send to brokers [%d] is [%s]".format(leaderAndISRRequest, broker))
-    })
   }
 
   class BrokerChangeListener() extends IZkChildListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
     def handleChildChange(parentPath : String, javaCurChildren : java.util.List[String]) {
-      controllerLock synchronized {
-        info("broker change listener triggered")
+      import scala.collection.JavaConversions._
+      lock synchronized {
+        info("Broker change listener at controller triggerred")
+        val allBrokerIds = allBrokers.map(_.id)
         val curChildrenSeq: Seq[String] = javaCurChildren
         val curBrokerIdsSeq = curChildrenSeq.map(_.toInt)
         val curBrokerIds = curBrokerIdsSeq.toSet
         val addedBrokerIds = curBrokerIds -- allBrokerIds
         val addedBrokersSeq = ZkUtils.getBrokerInfoFromIds(zkClient, addedBrokerIds.toSeq)
+        info("Added brokers: " + addedBrokerIds.toString())
         val deletedBrokerIds = allBrokerIds -- curBrokerIds
-        allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
-        allBrokerIds = allBrokers.map(_.id)
-        info("added brokers: %s, deleted brokers: %s, all brokers: %s".format(addedBrokerIds, deletedBrokerIds, allBrokerIds))
-        addedBrokersSeq.foreach(controllerChannelManager.addBroker(_))
-        deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
-        onBrokerChange(addedBrokerIds)
-      }
-    }
-  }
+        info("Deleted brokers: " + deletedBrokerIds.toString())
 
-  private def handleNewTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-    // get relevant partitions to this broker
-    val partitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => topics.contains(p._1._1))
-    trace("handling new topics, the partition replica assignment to be handled is %s".format(partitionReplicaAssignment))
-    initLeaders(partitionReplicaAssignment)
-  }
+        allBrokers = ZkUtils.getBrokerInfoFromIds(zkClient, curBrokerIdsSeq).toSet
 
-  private def handleDeletedTopics(topics: Set[String], partitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]]) {
-    val brokerToPartitionToStopReplicaMap = new collection.mutable.HashMap[Int, collection.mutable.HashSet[(String, Int)]]
-    for((topicPartition, brokers) <- partitionReplicaAssignment){
-      for (broker <- brokers){
-        if (!brokerToPartitionToStopReplicaMap.contains(broker))
-          brokerToPartitionToStopReplicaMap.put(broker, new collection.mutable.HashSet[(String, Int)])
-        brokerToPartitionToStopReplicaMap(broker).add(topicPartition)
+        for(broker <- addedBrokersSeq){
+          controllerChannelManager.addBroker(broker)
+        }
+        for (brokerId <- deletedBrokerIds){
+          controllerChannelManager.removeBroker(brokerId)
+        }
+        /** TODO: add other broker change handler logic**/
       }
-      allLeaders.remove(topicPartition)
-      info("after deleting topics %s, allLeader is updated to %s and the broker to stop replia request map is %s".format(topics, allLeaders, brokerToPartitionToStopReplicaMap))
-      ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath(topicPartition._1, topicPartition._2))
-    }
-    for((broker, partitionToStopReplica) <- brokerToPartitionToStopReplicaMap){
-      val stopReplicaRequest = new StopReplicaRequest(partitionToStopReplica)
-      info("handling deleted topics: [%s] the stopReplicaRequest sent to broker %d is [%s]".format(topics, broker, stopReplicaRequest))
-      sendRequest(broker, stopReplicaRequest)
     }
-    /*TODO: kafka-330  remove the unneeded leaderAndISRPath that the previous controller didn't get a chance to remove*/
   }
 
   class TopicChangeListener extends IZkChildListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
-
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
-      controllerLock synchronized {
-        info("topic/partition change listener fired for path " + parentPath)
-        val currentChildren = JavaConversions.asBuffer(curChilds).toSet
-        val newTopics = currentChildren -- allTopics
-        val deletedTopics = allTopics -- currentChildren
-        val deletedPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => deletedTopics.contains(p._1._1))
-        allTopics = currentChildren
-
-        val addedPartitionReplicaAssignment = ZkUtils.getReplicaAssignmentForTopics(zkClient, newTopics.iterator)
-        allPartitionReplicaAssignment = allPartitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1._1))
-        allPartitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
-        info("new topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, allPartitionReplicaAssignment))
-        handleNewTopics(newTopics, addedPartitionReplicaAssignment)
-        handleDeletedTopics(deletedTopics, deletedPartitionReplicaAssignment)
-      }
+      // TODO: Incomplete, do not need to review this time
     }
   }
 
   class ControllerExistListener extends IZkDataListener with Logging {
-    this.logIdent = "Controller " + config.brokerId + ", "
-
     @throws(classOf[Exception])
     def handleDataChange(dataPath: String, data: Object) {
       // do nothing, since No logic is needed here
@@ -532,10 +285,8 @@ class KafkaController(config : KafkaConf
 
     @throws(classOf[Exception])
     def handleDataDeleted(dataPath: String) {
-      controllerLock synchronized {
-        info("the current controller failed, competes to be new controller")
-        controllerRegisterOrFailover()
-      }
+      info("Controller fail over, broker " + config.brokerId + " try to become controller")
+      tryToBecomeController()
     }
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Tue Jul 31 22:50:59 2012
@@ -24,44 +24,41 @@ import java.util.concurrent.atomic.Atomi
 /**
  * A thread that answers kafka requests.
  */
-class KafkaRequestHandler(id: Int, brokerId: Int, val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging {
-  this.logIdent = "Kafka Request Handler " + id + " on Broker " + brokerId + ", "
-
+class KafkaRequestHandler(val requestChannel: RequestChannel, apis: KafkaApis) extends Runnable with Logging { 
+     
   def run() { 
     while(true) { 
       val req = requestChannel.receiveRequest()
-      if(req == RequestChannel.AllDone){
-        trace("receives shut down command, shut down".format(brokerId, id))
+      trace("Processor " + Thread.currentThread.getName + " got request " + req)
+      if(req == RequestChannel.AllDone)
         return
-      }
-      debug("handles request " + req)
       apis.handle(req)
     }
   }
 
   def shutdown(): Unit = requestChannel.sendRequest(RequestChannel.AllDone)
+  
 }
 
-class KafkaRequestHandlerPool(val brokerId: Int,
-                              val requestChannel: RequestChannel,
-                              val apis: KafkaApis,
+class KafkaRequestHandlerPool(val requestChannel: RequestChannel, 
+                              val apis: KafkaApis, 
                               numThreads: Int) extends Logging {
-  this.logIdent = "Kafka Request Handler on Broker " + brokerId + ", "
+  
   val threads = new Array[Thread](numThreads)
   val runnables = new Array[KafkaRequestHandler](numThreads)
   for(i <- 0 until numThreads) { 
-    runnables(i) = new KafkaRequestHandler(i, brokerId, requestChannel, apis)
+    runnables(i) = new KafkaRequestHandler(requestChannel, apis)
     threads(i) = Utils.daemonThread("kafka-request-handler-" + i, runnables(i))
     threads(i).start()
   }
   
   def shutdown() {
-    info("shutting down")
+    info("Shutting down request handlers")
     for(handler <- runnables)
       handler.shutdown
     for(thread <- threads)
       thread.join
-    info("shutted down completely")
+    info("Request handlers shut down")
   }
   
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -24,9 +24,8 @@ import kafka.utils._
 import java.util.concurrent._
 import atomic.AtomicBoolean
 import kafka.cluster.Replica
-import kafka.api.LeaderAndISR
-import scala.collection._
 import org.I0Itec.zkclient.ZkClient
+import kafka.common.KafkaZookeeperClient
 
 
 /**
@@ -34,7 +33,7 @@ import org.I0Itec.zkclient.ZkClient
  * to start up and shutdown a single Kafka node.
  */
 class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
-  this.logIdent = "Kafka Server " + config.brokerId + ", "
+
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
@@ -45,7 +44,7 @@ class KafkaServer(val config: KafkaConfi
   var kafkaZookeeper: KafkaZooKeeper = null
   var replicaManager: ReplicaManager = null
   private var apis: KafkaApis = null
-  var kafkaController: KafkaController = null
+  var kafkaController: KafkaController = new KafkaController(config)
   val kafkaScheduler = new KafkaScheduler(4)
   var zkClient: ZkClient = null
 
@@ -54,7 +53,7 @@ class KafkaServer(val config: KafkaConfi
    * Instantiates the LogManager, the SocketServer and the request handlers - KafkaRequestHandlers
    */
   def startup() {
-    info("starting")
+    info("Starting Kafka server..." + config.brokerId)
     isShuttingDown = new AtomicBoolean(false)
     shutdownLatch = new CountDownLatch(1)
     var needRecovery = true
@@ -63,10 +62,11 @@ class KafkaServer(val config: KafkaConfi
       needRecovery = false
       cleanShutDownFile.delete
     }
-
+    /* start client */
+    info("Connecting to ZK: " + config.zkConnect)
+    zkClient = KafkaZookeeperClient.getZookeeperClient(config)
     /* start scheduler */
     kafkaScheduler.startUp
-
     /* start log manager */
     logManager = new LogManager(config,
                                 kafkaScheduler,
@@ -75,107 +75,88 @@ class KafkaServer(val config: KafkaConfi
                                 1000L * 60 * 60 * config.logRetentionHours,
                                 needRecovery)
     logManager.startup()
-
-    socketServer = new SocketServer(config.brokerId,
-                                    config.port,
+                                                
+    socketServer = new SocketServer(config.port,
                                     config.numNetworkThreads,
                                     config.monitoringPeriodSecs,
                                     config.numQueuedRequests,
                                     config.maxSocketRequestSize)
-
-    socketServer.startup
-
     Utils.registerMBean(socketServer.stats, statsMBeanName)
 
-    /* start client */
-    kafkaZookeeper = new KafkaZooKeeper(config)
-    // starting relevant replicas and leader election for partitions assigned to this broker
-    kafkaZookeeper.startup
+    kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
 
-    info("Connecting to ZK: " + config.zkConnect)
+    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler)
 
-    replicaManager = new ReplicaManager(config, time, kafkaZookeeper.getZookeeperClient, kafkaScheduler, deleteLog)
+    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
+    requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
+    socketServer.startup()
 
-    kafkaController = new KafkaController(config, kafkaZookeeper.getZookeeperClient)
-    apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper,
-                         addReplica, stopReplica, makeLeader, makeFollower, config.brokerId)
-    requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, config.numIoThreads)
     Mx4jLoader.maybeLoad
 
-    /**
-     *  Registers this broker in ZK. After this, consumers can connect to broker.
-     *  So this should happen after socket server start.
-     */
+    // starting relevant replicas and leader election for partitions assigned to this broker
+    kafkaZookeeper.startup()
     // start the replica manager
     replicaManager.startup()
     // start the controller
     kafkaController.startup()
-    info("started")
-  }
-
 
+    info("Server started.")
+  }
+  
   /**
    * Shutdown API for shutting down a single instance of the Kafka server.
    * Shuts down the LogManager, the SocketServer and the log cleaner scheduler thread
    */
   def shutdown() {
-    info("shutting down")
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
-      if(requestHandlerPool != null)
-        requestHandlerPool.shutdown()
+      info("Shutting down Kafka server with id " + config.brokerId)
       kafkaScheduler.shutdown()
       apis.close()
-      kafkaZookeeper.shutdown()
       if(replicaManager != null)
         replicaManager.shutdown()
       if (socketServer != null)
         socketServer.shutdown()
+      if(requestHandlerPool != null)
+        requestHandlerPool.shutdown()
       Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
         logManager.shutdown()
-
       if(kafkaController != null)
         kafkaController.shutDown()
-
+      kafkaZookeeper.shutdown()
+      zkClient.close()
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
-      debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
+      debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
       cleanShutDownFile.createNewFile
       shutdownLatch.countDown()
-      info("shutted down completed")
+      info("Kafka server with id %d shut down completed".format(config.brokerId))
     }
   }
-
+  
   /**
    * After calling shutdown(), use this API to wait until the shutdown is complete
    */
   def awaitShutdown(): Unit = shutdownLatch.await()
 
   def addReplica(topic: String, partition: Int, assignedReplicas: Set[Int]): Replica = {
+    info("Added local replica for topic %s partition %d on broker %d".format(topic, partition, config.brokerId))
+    // get local log
     val log = logManager.getOrCreateLog(topic, partition)
     replicaManager.addLocalReplica(topic, partition, log, assignedReplicas)
   }
 
-  def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
-    replicaManager.makeLeader(replica, leaderAndISR)
+  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    replicaManager.makeLeader(replica, currentISRInZk)
   }
 
-  def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
-    replicaManager.makeFollower(replica, leaderAndISR)
+  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+    replicaManager.makeFollower(replica, leaderBrokerId, zkClient)
   }
 
   def getReplica(topic: String, partition: Int): Option[Replica] =
     replicaManager.getReplica(topic, partition)
 
-  def stopReplica(topic: String, partition: Int): Short = {
-    replicaManager.stopReplica(topic, partition)
-  }
-
-  def deleteLog(topic: String,  partition: Int): Unit = {
-    /* TODO: handle deleteLog in a better way */
-    //logManager.deleteLog(topic, partition)
-  }
-
   def getLogManager(): LogManager = logManager
 
   def getStats(): SocketServerStats = socketServer.stats

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Tue Jul 31 22:50:59 2012
@@ -18,29 +18,46 @@
 package kafka.server
 
 import java.net.InetAddress
+import kafka.cluster.Replica
 import kafka.utils._
 import org.apache.zookeeper.Watcher.Event.KeeperState
-import org.I0Itec.zkclient.{IZkStateListener, ZkClient}
+import org.I0Itec.zkclient.{IZkDataListener, IZkChildListener, IZkStateListener, ZkClient}
+import kafka.admin.AdminUtils
+import java.lang.Thread
+import collection.mutable.HashSet
 import kafka.common._
 
-
 /**
  * Handles the server's interaction with zookeeper. The server needs to register the following paths:
  *   /topics/[topic]/[node_id-partition_num]
  *   /brokers/[0...N] --> host:port
+ *
  */
-class KafkaZooKeeper(config: KafkaConfig) extends Logging {
+class KafkaZooKeeper(config: KafkaConfig,
+                     zkClient: ZkClient,
+                     addReplicaCbk: (String, Int, Set[Int]) => Replica,
+                     getReplicaCbk: (String, Int) => Option[Replica],
+                     becomeLeader: (Replica, Seq[Int]) => Unit,
+                     becomeFollower: (Replica, Int, ZkClient) => Unit) extends Logging {
 
   val brokerIdPath = ZkUtils.BrokerIdsPath + "/" + config.brokerId
-  private var zkClient: ZkClient = null
-
-   def startup() {
-     /* start client */
-     info("connecting to ZK: " + config.zkConnect)
-     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
-     zkClient.subscribeStateChanges(new SessionExpireListener)
-     registerBrokerInZk()
-   }
+  private var leaderChangeListener: LeaderChangeListener = null
+  private var topicPartitionsChangeListener: TopicChangeListener = null
+  private var stateChangeHandler: StateChangeCommandHandler = null
+
+  private val topicListenerLock = new Object
+  private val leaderChangeLock = new Object
+
+  def startup() {
+    leaderChangeListener = new LeaderChangeListener
+    topicPartitionsChangeListener = new TopicChangeListener
+    leaderChangeListener = new LeaderChangeListener
+    topicPartitionsChangeListener = new TopicChangeListener
+    startStateChangeCommandHandler()
+    zkClient.subscribeStateChanges(new SessionExpireListener)
+    registerBrokerInZk()
+    subscribeToTopicAndPartitionsChanges(true)
+  }
 
   private def registerBrokerInZk() {
     info("Registering broker " + brokerIdPath)
@@ -49,6 +66,13 @@ class KafkaZooKeeper(config: KafkaConfig
     ZkUtils.registerBrokerInZk(zkClient, config.brokerId, hostName, creatorId, config.port)
   }
 
+  private def startStateChangeCommandHandler() {
+    val stateChangeQ = new ZkQueue(zkClient, ZkUtils.getBrokerStateChangePath(config.brokerId), config.stateChangeQSize)
+    stateChangeHandler = new StateChangeCommandHandler("StateChangeCommandHandler", config, stateChangeQ,
+      ensureStateChangeCommandValidityOnThisBroker, ensureEpochValidity)
+    stateChangeHandler.start()
+  }
+
   /**
    *  When we get a SessionExpired event, we lost all ephemeral nodes and zkclient has reestablished a
    *  connection for us. We need to re-register this broker in the broker registry.
@@ -72,24 +96,20 @@ class KafkaZooKeeper(config: KafkaConfig
       registerBrokerInZk()
       info("done re-registering broker")
       info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+      zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
+      val topics = ZkUtils.getAllTopics(zkClient)
+      debug("Existing topics are %s".format(topics.mkString(",")))
+      topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
+      handleNewTopics(topics)
     }
   }
 
   def shutdown() {
-    if (zkClient != null) {
-      info("Closing zookeeper client...")
-      zkClient.close()
-    }
-  }
-
-  private def doesTopicExistInCluster(topic: String) : Boolean = {
-    val allTopics = ZkUtils.getAllTopics(zkClient)
-    trace("all topics, %s, topic %s".format(allTopics, topic))
-    allTopics.contains(topic)
+    stateChangeHandler.shutdown()
   }
 
   def ensurePartitionLeaderOnThisBroker(topic: String, partition: Int) {
-    if(!doesTopicExistInCluster(topic))
+    if(!topicPartitionsChangeListener.doesTopicExistInCluster(topic))
       throw new UnknownTopicException("Topic %s doesn't exist in the cluster".format(topic))
     // check if partition id is invalid
     if(partition < 0)
@@ -104,7 +124,256 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
-  def getZookeeperClient = {
-    zkClient
+  def getZookeeperClient = zkClient
+
+  def handleNewTopics(topics: Seq[String]) {
+    // get relevant partitions to this broker
+    val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+    debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+    for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
+      // subscribe to leader changes for these partitions
+      subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
+      // start replicas for these partitions
+      startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
+    }
+  }
+
+  def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
+    info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
+    zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
+    val topics = ZkUtils.getAllTopics(zkClient)
+    val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+    debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+    for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
+      // subscribe to leader changes for these partitions
+      subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
+
+      // start replicas for these partitions
+      if(startReplicas)
+        startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
+    }
+  }
+
+  private def subscribeToLeaderForPartitions(topic: String, partitions: Seq[Int]) {
+    partitions.foreach { partition =>
+      info("Broker %d subscribing to leader changes for topic %s partition %d".format(config.brokerId, topic, partition))
+      // register leader change listener
+      zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
+    }
+  }
+
+  private def startReplicasForPartitions(topic: String, partitions: Seq[Int]) {
+    partitions.foreach { partition =>
+      val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
+      info("Assigned replicas list for topic %s partition %d is %s".format(topic, partition, assignedReplicas.mkString(",")))
+      if(assignedReplicas.contains(config.brokerId)) {
+        val replica = addReplicaCbk(topic, partition, assignedReplicas.toSet)
+        startReplica(replica)
+      } else
+        warn("Ignoring partition %d of topic %s since broker %d doesn't host any replicas for it"
+          .format(partition, topic, config.brokerId))
+    }
+  }
+
+  private def startReplica(replica: Replica) {
+    info("Starting replica for topic %s partition %d on broker %d"
+      .format(replica.topic, replica.partition.partitionId, replica.brokerId))
+    ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+      case Some(leader) =>
+        info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader))
+        // check if this broker is the leader, if not, then become follower
+        if(leader != config.brokerId)
+          becomeFollower(replica, leader, zkClient)
+      case None => // leader election
+        leaderElection(replica)
+    }
+  }
+
+  def leaderElection(replica: Replica) {
+    info("Broker %d electing leader for topic %s partition %d".format(config.brokerId, replica.topic, replica.partition.partitionId))
+    // read the AR list for replica.partition from ZK
+    val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
+    val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
+    val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
+    if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) {
+      info("Broker %d will participate in leader election for topic %s partition %d"
+        .format(config.brokerId, replica.topic, replica.partition.partitionId))
+      // wait for some time if it is not the preferred replica
+      try {
+        if(replica.brokerId != assignedReplicas.head) {
+          // sleep only if the preferred replica is alive
+          if(liveBrokers.contains(assignedReplicas.head)) {
+            info("Preferred replica %d for topic %s ".format(assignedReplicas.head, replica.topic) +
+              "partition %d is alive. Waiting for %d ms to allow it to become leader"
+              .format(replica.partition.partitionId, config.preferredReplicaWaitTime))
+            Thread.sleep(config.preferredReplicaWaitTime)
+          }
+        }
+      } catch {
+        case e => // ignoring
+      }
+      val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
+        replica.partition.partitionId, replica.brokerId)
+      newLeaderEpochAndISR match {
+        case Some(epochAndISR) =>
+          info("Broker %d is leader for topic %s partition %d".format(replica.brokerId, replica.topic,
+            replica.partition.partitionId))
+          info("Current ISR for topic %s partition %d is %s".format(replica.topic, replica.partition.partitionId,
+                                                                    epochAndISR._2.mkString(",")))
+          becomeLeader(replica, epochAndISR._2)
+        case None =>
+          ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
+            case Some(leader) =>
+              becomeFollower(replica, leader, zkClient)
+            case None =>
+              error("Lost leader for topic %s partition %d right after leader election".format(replica.topic,
+                replica.partition.partitionId))
+          }
+      }
+    }
+  }
+
+  private def canBecomeLeader(brokerId: Int, topic: String, partition: Int, assignedReplicas: Seq[Int],
+                              inSyncReplicas: Seq[Int], liveBrokers: Seq[Int]): Boolean = {
+    // TODO: raise alert, mark the partition offline if no broker in the assigned replicas list is alive
+    assert(assignedReplicas.size > 0, "There should be at least one replica in the assigned replicas list for topic " +
+      " %s partition %d".format(topic, partition))
+    inSyncReplicas.size > 0 match {
+      case true => // check if this broker is in the ISR. If yes, return true
+        inSyncReplicas.contains(brokerId) match {
+          case true =>
+            info("Broker %d can become leader since it is in the ISR %s".format(brokerId, inSyncReplicas.mkString(",")) +
+              " for topic %s partition %d".format(topic, partition))
+            true
+          case false =>
+            // check if any broker in the ISR is alive. If not, return true only if this broker is in the AR
+            val liveBrokersInISR = inSyncReplicas.filter(r => liveBrokers.contains(r))
+            liveBrokersInISR.isEmpty match {
+              case true =>
+                if(assignedReplicas.contains(brokerId)) {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  true
+                } else {
+                  info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
+                    " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
+                      .format(partition, brokerId, assignedReplicas.mkString(",")))
+                  false
+                }
+              case false =>
+                info("ISR for topic %s partition %d is %s. Out of these %s brokers are alive. Broker %d "
+                  .format(topic, partition, inSyncReplicas.mkString(",")) + "cannot become leader since it doesn't exist " +
+                  "in the ISR")
+                false  // let one of the live brokers in the ISR become the leader
+            }
+        }
+      case false =>
+        if(assignedReplicas.contains(brokerId)) {
+          info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
+            .format(topic, partition, brokerId) + "is part of the assigned replicas list")
+          true
+        } else {
+          info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
+            .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
+          false
+        }
+    }
+  }
+
+  class TopicChangeListener extends IZkChildListener with Logging {
+    private val allTopics = new HashSet[String]()
+    // read existing topics, if any
+    allTopics ++= ZkUtils.getAllTopics(zkClient)
+
+    @throws(classOf[Exception])
+    def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      import collection.JavaConversions
+      topicListenerLock.synchronized {
+        debug("Topic/partition change listener fired for path " + parentPath)
+        val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+        val newTopics = currentChildren -- allTopics
+        val deletedTopics = allTopics -- currentChildren
+        allTopics.clear()
+        allTopics ++= currentChildren
+
+        debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
+        debug("Current topics in the cluster: [%s]".format(allTopics.mkString(",")))
+        handleNewTopics(newTopics.toSeq)
+        // TODO: Handle topic deletions
+        // handleDeletedTopics(deletedTopics.toSeq)
+      }
+    }
+
+    def doesTopicExistInCluster(topic: String): Boolean = {
+      topicListenerLock.synchronized {
+        allTopics.contains(topic)
+      }
+    }
+  }
+
+  private def ensureStateChangeCommandValidityOnThisBroker(stateChangeCommand: StateChangeCommand): Boolean = {
+    // check if this broker hosts a replica for this topic and partition
+    ZkUtils.isPartitionOnBroker(zkClient, stateChangeCommand.topic, stateChangeCommand.partition, config.brokerId)
+  }
+
+  private def ensureEpochValidity(stateChangeCommand: StateChangeCommand): Boolean = {
+    // get the topic and partition that this request is meant for
+    val topic = stateChangeCommand.topic
+    val partition = stateChangeCommand.partition
+    val epoch = stateChangeCommand.epoch
+
+    val currentLeaderEpoch = ZkUtils.getEpochForPartition(zkClient, topic, partition)
+    // check if the request's epoch matches the current leader's epoch OR the admin command's epoch
+    val validEpoch = (currentLeaderEpoch == epoch) || (epoch == AdminUtils.AdminEpoch)
+    if(epoch > currentLeaderEpoch)
+      throw new IllegalStateException(("Illegal epoch state. Request's epoch %d larger than registered epoch %d for " +
+        "topic %s partition %d").format(epoch, currentLeaderEpoch, topic, partition))
+    validEpoch
+  }
+
+  class LeaderChangeListener extends IZkDataListener with Logging {
+
+    @throws(classOf[Exception])
+    def handleDataChange(dataPath: String, data: Object) {
+      // handle leader change event for path
+      val newLeaderAndEpochInfo: String = data.asInstanceOf[String]
+      val newLeader = newLeaderAndEpochInfo.split(";").head.toInt
+      val newEpoch = newLeaderAndEpochInfo.split(";").last.toInt
+      debug("Leader change listener fired on broker %d for path %s. New leader is %d. New epoch is %d".format(config.brokerId,
+        dataPath, newLeader, newEpoch))
+      val topicPartitionInfo = dataPath.split("/")
+      val topic = topicPartitionInfo.takeRight(4).head
+      val partition = topicPartitionInfo.takeRight(2).head.toInt
+      info("Updating leader change information in replica for topic %s partition %d".format(topic, partition))
+      val replica = getReplicaCbk(topic, partition).getOrElse(null)
+      assert(replica != null, "Replica for topic %s partition %d should exist on broker %d"
+        .format(topic, partition, config.brokerId))
+      replica.partition.leaderId(Some(newLeader))
+      assert(getReplicaCbk(topic, partition).get.partition.leaderId().get == newLeader, "New leader should be set correctly")
+    }
+
+    @throws(classOf[Exception])
+    def handleDataDeleted(dataPath: String) {
+      leaderChangeLock.synchronized {
+        // leader is deleted for topic partition
+        val topic = dataPath.split("/").takeRight(4).head
+        val partitionId = dataPath.split("/").takeRight(2).head.toInt
+        debug("Leader deleted listener fired for topic %s partition %d on broker %d"
+          .format(topic, partitionId, config.brokerId))
+        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partitionId).map(r => r.toInt)
+        if(assignedReplicas.contains(config.brokerId)) {
+          val replica = getReplicaCbk(topic, partitionId)
+          replica match {
+            case Some(r) => leaderElection(r)
+            case None =>  error("No replica exists for topic %s partition %s on broker %d"
+              .format(topic, partitionId, config.brokerId))
+          }
+        }
+      }
+    }
   }
 }
+
+
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala Tue Jul 31 22:50:59 2012
@@ -20,10 +20,10 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId + ", ", brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
-    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d on broker %d, ".format(sourceBroker.id, fetcherId, brokerConfig.brokerId), sourceBroker, brokerConfig, replicaMgr)
+    new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(sourceBroker.id, fetcherId), sourceBroker, brokerConfig, replicaMgr)
   }
 
   def shutdown() {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Tue Jul 31 22:50:59 2012
@@ -18,88 +18,64 @@ package kafka.server
 
 import kafka.log.Log
 import kafka.cluster.{Partition, Replica}
-import collection._
+import collection.mutable
 import mutable.ListBuffer
 import org.I0Itec.zkclient.ZkClient
 import java.util.concurrent.locks.ReentrantLock
 import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
-import kafka.api.LeaderAndISR
-import java.util.concurrent.atomic.AtomicBoolean
-import kafka.common.{BrokerNotExistException, KafkaException, ErrorMapping, InvalidPartitionException}
+import kafka.common.{KafkaException, InvalidPartitionException}
 
+class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler)
+  extends Logging {
 
-class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler, deleteLocalLog: (String, Int) => Unit) extends Logging {
-
-  var allPartitions = new mutable.HashMap[(String, Int), Partition]()
+  private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
   private var leaderReplicas = new ListBuffer[Partition]()
   private val leaderReplicaLock = new ReentrantLock()
   private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
-  this.logIdent = "Replica Manager on Broker " + config.brokerId + ", "
-
-  val hwCheckPointThreadStarted = new AtomicBoolean(false)
   private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
-  info("Created highwatermark file %s".format(highwaterMarkCheckpoint.name))
-
-  def startHighWaterMarksCheckPointThread() = {
-    if(hwCheckPointThreadStarted.compareAndSet(false, true))
-      kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
-  }
+  info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
 
   def startup() {
     // start the highwatermark checkpoint thread
+    kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0,
+      config.defaultFlushIntervalMs)
     // start ISR expiration thread
     kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
   }
 
   def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
     val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
-    var retReplica : Replica = null
+    val localReplica = new Replica(config.brokerId, partition, topic, time,
+      Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
+
     val replicaOpt = partition.getReplica(config.brokerId)
     replicaOpt match {
       case Some(replica) =>
-        info("changing remote replica %s into a local replica".format(replica.toString))
+        info("Changing remote replica %s into a local replica".format(replica.toString))
         replica.log match {
           case None =>
             replica.log = Some(log)
           case Some(log) => // nothing to do since log already exists
         }
-        retReplica = replica
       case None =>
-        val localReplica = new Replica(config.brokerId, partition, topic, time,
-                                       Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
         partition.addReplica(localReplica)
-        info("adding local replica %d for topic %s partition %s on broker %d".format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
-        retReplica = localReplica
     }
     val assignedReplicas = assignedReplicaIds.map(partition.getReplica(_).get)
     partition.assignedReplicas(Some(assignedReplicas))
     // get the replica objects for the assigned replicas for this partition
-    retReplica
+    info("Added local replica %d for topic %s partition %s on broker %d"
+      .format(localReplica.brokerId, localReplica.topic, localReplica.partition.partitionId, localReplica.brokerId))
+    localReplica
   }
 
-  def stopReplica(topic: String, partition: Int): Short  = {
-    trace("handling stop replica for partition [%s, %d]".format(topic, partition))
-    val errorCode = ErrorMapping.NoError
-    val replica = getReplica(topic, partition)
-    if(replica.isDefined){
-      replicaFetcherManager.removeFetcher(topic, partition)
-      deleteLocalLog(topic, partition)
-      allPartitions.remove((topic, partition))
-      info("after removing partition (%s, %d), the rest of allReplicas is: [%s]".format(topic, partition, allPartitions))
-    }
-    trace("finishes handling stop replica [%s, %d]".format(topic, partition))
-    errorCode
-  }
-
-
   def getOrCreatePartition(topic: String, partitionId: Int, assignedReplicaIds: Set[Int]): Partition = {
-    val newPartition = allPartitions.contains((topic, partitionId))
+    val newPartition = allReplicas.contains((topic, partitionId))
     newPartition match {
       case true => // partition exists, do nothing
-        allPartitions.get((topic, partitionId)).get
+        allReplicas.get((topic, partitionId)).get
       case false => // create remote replicas for each replica id in assignedReplicas
         val partition = new Partition(topic, partitionId, time)
-        allPartitions += (topic, partitionId) -> partition
+        allReplicas += (topic, partitionId) -> partition
         (assignedReplicaIds - config.brokerId).foreach(
           replicaId => addRemoteReplica(topic, partitionId, replicaId, partition))
         partition
@@ -107,11 +83,12 @@ class ReplicaManager(val config: KafkaCo
   }
 
   def ensurePartitionExists(topic: String, partitionId: Int): Partition = {
-    val partitionOpt = allPartitions.get((topic, partitionId))
+    val partitionOpt = allReplicas.get((topic, partitionId))
     partitionOpt match {
       case Some(partition) => partition
       case None =>
-        throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d".format(topic, partitionId, config.brokerId))
+        throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
+          .format(topic, partitionId, config.brokerId))
     }
   }
 
@@ -120,34 +97,32 @@ class ReplicaManager(val config: KafkaCo
 
     val replicaAdded = partition.addReplica(remoteReplica)
     if(replicaAdded)
-      info("added remote replica %d for topic %s partition %s".format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId))
+      info("Added remote replica %d for topic %s partition %s on broker %d"
+        .format(remoteReplica.brokerId, remoteReplica.topic, remoteReplica.partition.partitionId, config.brokerId))
     remoteReplica
   }
 
   def getReplica(topic: String, partitionId: Int, replicaId: Int = config.brokerId): Option[Replica] = {
-    val partitionOpt = allPartitions.get((topic, partitionId))
-    partitionOpt match {
-      case Some(partition) =>
-        partition.getReplica(replicaId)
+    val replicasOpt = allReplicas.get((topic, partitionId))
+    replicasOpt match {
+      case Some(replicas) =>
+        replicas.getReplica(replicaId)
       case None =>
         None
     }
   }
 
   def getLeaderReplica(topic: String, partitionId: Int): Option[Replica] = {
-    val replicasOpt = allPartitions.get((topic, partitionId))
+    val replicasOpt = allReplicas.get((topic, partitionId))
     replicasOpt match {
       case Some(replicas) =>
         Some(replicas.leaderReplica())
       case None =>
         throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
-                                         "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+          "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
     }
   }
 
-  def getPartition(topic: String, partitionId: Int): Option[Partition] =
-    allPartitions.get((topic, partitionId))
-
   private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
     // set the replica leo
     val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
@@ -162,41 +137,38 @@ class ReplicaManager(val config: KafkaCo
     val newHw = allLeos.min
     val oldHw = partition.leaderHW()
     if(newHw > oldHw) {
+      debug("Updating leader HW for topic %s partition %d to %d".format(replica.topic, replica.partition.partitionId, newHw))
       partition.leaderHW(Some(newHw))
     }else
       debug("Old hw for topic %s partition %d is %d. New hw is %d. All leo's are %s".format(replica.topic,
-                                                                                            replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
+        replica.partition.partitionId, oldHw, newHw, allLeos.mkString(",")))
   }
 
-  def makeLeader(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
-    info("becoming Leader for topic [%s] partition [%d]".format(replica.topic, replica.partition.partitionId))
-    info("started the leader state transition for topic %s partition %d"
-                 .format(replica.topic, replica.partition.partitionId))
+  def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
+    info("Broker %d started the leader state transition for topic %s partition %d"
+      .format(config.brokerId, replica.topic, replica.partition.partitionId))
     try {
       // read and cache the ISR
       replica.partition.leaderId(Some(replica.brokerId))
-      replica.partition.updateISR(leaderAndISR.ISR.toSet)
+      replica.partition.updateISR(currentISRInZk.toSet)
       // stop replica fetcher thread, if any
       replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
       // also add this partition to the list of partitions for which the leader is the current broker
       leaderReplicaLock.lock()
       leaderReplicas += replica.partition
-      info("completed the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId))
-      ErrorMapping.NoError
+      info("Broker %d completed the leader state transition for topic %s partition %d"
+        .format(config.brokerId, replica.topic, replica.partition.partitionId))
     }catch {
-      case e => error("failed to complete the leader state transition for topic %s partition %d".format(replica.topic, replica.partition.partitionId), e)
-      ErrorMapping.UnknownCode
-      /* TODO: add specific error code */
+      case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d"
+        .format(config.brokerId, replica.topic, replica.partition.partitionId), e)
     }finally {
       leaderReplicaLock.unlock()
     }
   }
 
-
-  def makeFollower(replica: Replica, leaderAndISR: LeaderAndISR): Short = {
-    val leaderBrokerId: Int = leaderAndISR.leader
-    info("starting the follower state transition to follow leader %d for topic %s partition %d"
-                 .format(leaderBrokerId, replica.topic, replica.partition.partitionId))
+  def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
+    info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d"
+      .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
     try {
       // set the leader for this partition correctly on this broker
       replica.partition.leaderId(Some(leaderBrokerId))
@@ -205,13 +177,13 @@ class ReplicaManager(val config: KafkaCo
           log.truncateTo(replica.highWatermark())
         case None =>
       }
-      debug("for partition [%s, %d], the leaderBroker is [%d]".format(replica.topic, replica.partition.partitionId, leaderAndISR.leader))
       // get leader for this replica
       val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
       val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
       // become follower only if it is not already following the same leader
       if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
-        info("becoming follower to leader %d for topic %s partition %d".format(leaderBrokerId, replica.topic, replica.partition.partitionId))
+        info("broker %d becoming follower to leader %d for topic %s partition %d"
+          .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
         // stop fetcher thread to previous leader
         replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
         // start fetcher thread to current leader
@@ -220,15 +192,11 @@ class ReplicaManager(val config: KafkaCo
       // remove this replica's partition from the ISR expiration queue
       leaderReplicaLock.lock()
       leaderReplicas -= replica.partition
-      info("completed the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId))
-      ErrorMapping.NoError
-    } catch {
-      case e: BrokerNotExistException =>
-        error("failed to complete the follower state transition to follow leader %d for topic %s partition %d because the leader broker does not exist in the cluster".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
-        ErrorMapping.BrokerNotExistInZookeeperCode
-      case e =>
-        error("failed to complete the follower state transition to follow leader %d for topic %s partition %d".format(leaderAndISR.leader, replica.topic, replica.partition.partitionId), e)
-        ErrorMapping.UnknownCode
+      info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d"
+        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+    }catch {
+      case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d"
+        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e)
     }finally {
       leaderReplicaLock.unlock()
     }
@@ -236,18 +204,21 @@ class ReplicaManager(val config: KafkaCo
 
   private def maybeShrinkISR(): Unit = {
     try {
-      info("evaluating ISR list of partitions to see which replicas can be removed from the ISR")
+      info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
+        .format(config.replicaMaxLagTimeMs))
       leaderReplicaLock.lock()
-      leaderReplicas.foreach(partition => {
+      leaderReplicas.foreach { partition =>
+      // shrink ISR if a follower is slow or stuck
         val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
         if(outOfSyncReplicas.size > 0) {
           val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
           assert(newInSyncReplicas.size > 0)
-          info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
+          info("Shrinking ISR for topic %s partition %d to %s".format(partition.topic, partition.partitionId,
+            newInSyncReplicas.map(_.brokerId).mkString(",")))
           // update ISR in zk and in memory
           partition.updateISR(newInSyncReplicas.map(_.brokerId), Some(zkClient))
         }
-      })
+      }
     }catch {
       case e1 => error("Error in ISR expiration thread. Shutting down due to ", e1)
     }finally {
@@ -262,7 +233,8 @@ class ReplicaManager(val config: KafkaCo
       val leaderHW = partition.leaderHW()
       replica.logEndOffset() >= leaderHW
     }
-    else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) + " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
+    else throw new KafkaException("Replica %s is not in the assigned replicas list for ".format(replica.toString) +
+      " topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId, config.brokerId))
   }
 
   def recordFollowerPosition(topic: String, partition: Int, replicaId: Int, offset: Long, zkClient: ZkClient) = {
@@ -296,21 +268,21 @@ class ReplicaManager(val config: KafkaCo
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */
   private def checkpointHighwaterMarks() {
-    val highwaterMarksForAllPartitions = allPartitions.map
-            { partition =>
-              val topic = partition._1._1
-              val partitionId = partition._1._2
-              val localReplicaOpt = partition._2.getReplica(config.brokerId)
-              val hw = localReplicaOpt match {
-                case Some(localReplica) => localReplica.highWatermark()
-                case None =>
-                  error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) + " Replica metadata doesn't exist")
-                  0L
-              }
-              (topic, partitionId) -> hw
-            }.toMap
+    val highwaterMarksForAllPartitions = allReplicas.map { partition =>
+      val topic = partition._1._1
+      val partitionId = partition._1._2
+      val localReplicaOpt = partition._2.getReplica(config.brokerId)
+      val hw = localReplicaOpt match {
+        case Some(localReplica) => localReplica.highWatermark()
+        case None =>
+          error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
+            " Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
+          0L
+      }
+      (topic, partitionId) -> hw
+    }.toMap
     highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
-    info("Checkpointed high watermark data: %s".format(highwaterMarksForAllPartitions))
+    info("Checkpointed highwatermarks")
   }
 
   /**
@@ -320,9 +292,8 @@ class ReplicaManager(val config: KafkaCo
   def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
 
   def shutdown() {
-    info("shut down")
     replicaFetcherManager.shutdown()
     checkpointHighwaterMarks()
-    info("shuttedd down completely")
+    info("Replica manager shutdown on broker " + config.brokerId)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -39,7 +39,7 @@ class DelayedRequest(val keys: Seq[Any],
  * request to be satisfied. For example it could be that we are waiting for user-specified number of acks on a given (topic, partition)
  * to be able to respond to a request or it could be that we are waiting for a given number of bytes to accumulate on a given request
  * to be able to respond to that request (in the simple case we might wait for at least one byte to avoid busy waiting).
- *
+ * 
  * For us the key is generally a (topic, partition) pair.
  * By calling 
  *   watch(delayedRequest) 
@@ -47,27 +47,27 @@ class DelayedRequest(val keys: Seq[Any],
  *   val satisfied = update(key, request) 
  * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
  * new request.
- *
+ * 
  * An implementation provides extends two helper functions
  *   def checkSatisfied(request: R, delayed: T): Boolean
  * this function returns true if the given request (in combination with whatever previous requests have happened) satisfies the delayed
  * request delayed. This method will likely also need to do whatever bookkeeping is necessary.
- *
+ * 
  * The second function is
  *   def expire(delayed: T)
  * this function handles delayed requests that have hit their time limit without being satisfied.
- *
+ * 
  */
-abstract class RequestPurgatory[T <: DelayedRequest, R](logPrefix: String) extends  Logging{
-  this.logIdent = logPrefix
+abstract class RequestPurgatory[T <: DelayedRequest, R] {
+  
   /* a list of requests watching each key */
   private val watchersForKey = new ConcurrentHashMap[Any, Watchers]
-
+  
   /* background thread expiring requests that have been waiting too long */
-  private val expiredRequestReaper = new ExpiredRequestReaper(logPrefix)
+  private val expiredRequestReaper = new ExpiredRequestReaper
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
   expirationThread.start()
-
+  
   /**
    * Add a new delayed request watching the contained keys
    */
@@ -78,7 +78,7 @@ abstract class RequestPurgatory[T <: Del
     }
     expiredRequestReaper.enqueue(delayedRequest)
   }
-
+  
   /**
    * Update any watchers and return a list of newly satisfied requests.
    */
@@ -89,7 +89,7 @@ abstract class RequestPurgatory[T <: Del
     else
       w.collectSatisfiedRequests(request)
   }
-
+  
   private def watchersFor(key: Any): Watchers = {
     var lst = watchersForKey.get(key)
     if(lst == null) {
@@ -98,46 +98,46 @@ abstract class RequestPurgatory[T <: Del
     }
     lst
   }
-
+  
   /**
    * Check if this request satisfied this delayed request
    */
   protected def checkSatisfied(request: R, delayed: T): Boolean
-
+  
   /**
    * Handle an expired delayed request
    */
   protected def expire(delayed: T)
-
+  
   /**
    * Shutdown the expirey thread
    */
   def shutdown() {
     expiredRequestReaper.shutdown()
   }
-
+  
   /**
    * A linked list of DelayedRequests watching some key with some associated bookeeping logic
    */
   private class Watchers {
-
+    
     /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
     private val CleanupThresholdSize = 100
     private val CleanupThresholdPrct = 0.5
-
+    
     private val requests = new LinkedList[T]
-
+    
     /* you can only change this if you have added something or marked something satisfied */
     var liveCount = 0.0
-
+  
     def add(t: T) {
       synchronized {
-                     requests.add(t)
-                     liveCount += 1
-                     maybePurge()
-                   }
+        requests.add(t)
+        liveCount += 1
+        maybePurge()
+      }
     }
-
+    
     private def maybePurge() {
       if(requests.size > CleanupThresholdSize && liveCount / requests.size < CleanupThresholdPrct) {
         val iter = requests.iterator()
@@ -148,56 +148,55 @@ abstract class RequestPurgatory[T <: Del
         }
       }
     }
-
+  
     def decLiveCount() {
       synchronized {
-                     liveCount -= 1
-                   }
+        liveCount -= 1
+      }
     }
-
+    
     def collectSatisfiedRequests(request: R): Seq[T] = {
       val response = new mutable.ArrayBuffer[T]
       synchronized {
-                     val iter = requests.iterator()
-                     while(iter.hasNext) {
-                       val curr = iter.next
-                       if(curr.satisfied.get) {
-                         // another thread has satisfied this request, remove it
-                         iter.remove()
-                       } else {
-                         if(checkSatisfied(request, curr)) {
-                           iter.remove()
-                           val updated = curr.satisfied.compareAndSet(false, true)
-                           if(updated == true) {
-                             response += curr
-                             liveCount -= 1
-                             expiredRequestReaper.satisfyRequest()
-                           }
-                         }
-                       }
-                     }
-                   }
+        val iter = requests.iterator()
+        while(iter.hasNext) {
+          val curr = iter.next
+          if(curr.satisfied.get) {
+            // another thread has satisfied this request, remove it
+            iter.remove()
+          } else {
+            if(checkSatisfied(request, curr)) {
+              iter.remove()
+              val updated = curr.satisfied.compareAndSet(false, true)
+              if(updated == true) {
+                response += curr
+                liveCount -= 1
+                expiredRequestReaper.satisfyRequest()
+              }
+            }
+          }
+        }
+      }
       response
     }
   }
-
+  
   /**
    * Runnable to expire requests that have sat unfullfilled past their deadline
    */
-  private class ExpiredRequestReaper(logPrefix: String) extends Runnable with Logging {
-    this.logIdent = "ExpiredRequestReaper for " + logPrefix
-
+  private class ExpiredRequestReaper extends Runnable with Logging {
+    
     /* a few magic parameters to help do cleanup to avoid accumulating old watchers */
     private val CleanupThresholdSize = 100
     private val CleanupThresholdPrct = 0.5
-
+    
     private val delayed = new DelayQueue[T]
     private val running = new AtomicBoolean(true)
     private val shutdownLatch = new CountDownLatch(1)
     private val needsPurge = new AtomicBoolean(false)
     /* The count of elements in the delay queue that are unsatisfied */
     private val unsatisfied = new AtomicInteger(0)
-
+    
     /** Main loop for the expiry thread */
     def run() {
       while(running.get) {
@@ -205,18 +204,18 @@ abstract class RequestPurgatory[T <: Del
           val curr = pollExpired()
           expire(curr)
         } catch {
-          case ie: InterruptedException =>
+          case ie: InterruptedException => 
             if(needsPurge.getAndSet(false)) {
               val purged = purgeSatisfied()
               debug("Forced purge of " + purged + " requests from delay queue.")
             }
-          case e: Exception =>
+          case e: Exception => 
             error("Error in long poll expiry thread: ", e)
         }
       }
       shutdownLatch.countDown()
     }
-
+    
     /** Add a request to be expired */
     def enqueue(t: T) {
       delayed.add(t)
@@ -224,24 +223,23 @@ abstract class RequestPurgatory[T <: Del
       if(unsatisfied.get > CleanupThresholdSize && unsatisfied.get / delayed.size.toDouble < CleanupThresholdPrct)
         forcePurge()
     }
-
+    
     private def forcePurge() {
       needsPurge.set(true)
       expirationThread.interrupt()
     }
-
+    
     /** Shutdown the expiry thread*/
     def shutdown() {
-      debug("shutting down")
+      debug("Shutting down request expiry thread")
       running.set(false)
       expirationThread.interrupt()
       shutdownLatch.await()
-      debug("shut down completely")
     }
-
+    
     /** Record the fact that we satisfied a request in the stats for the expiry queue */
     def satisfyRequest(): Unit = unsatisfied.getAndDecrement()
-
+    
     /**
      * Get the next expired event
      */
@@ -258,7 +256,7 @@ abstract class RequestPurgatory[T <: Del
       }
       throw new RuntimeException("This should not happen")
     }
-
+  
     /**
      * Delete all expired events from the delay queue
      */
@@ -275,5 +273,5 @@ abstract class RequestPurgatory[T <: Del
       purged
     }
   }
-
+  
 }
\ No newline at end of file