You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 00:51:01 UTC

svn commit: r1367811 [3/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Tue Jul 31 22:50:59 2012
@@ -33,7 +33,7 @@ object ConsumerOffsetChecker extends Log
   // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
 
   private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
-    val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
+    val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))
     val consumer = brokerInfo match {
       case BrokerIpPattern(ip, port) =>
         Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
@@ -47,9 +47,9 @@ object ConsumerOffsetChecker extends Log
   private def processPartition(zkClient: ZkClient,
                                group: String, topic: String, bidPid: String) {
     val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
-            format(group, topic, bidPid))._1.toLong
+            format(group, topic, bidPid)).toLong
     val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
-            format(group, topic, bidPid))._1
+            format(group, topic, bidPid))
     println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
     println("%20s%s".format("Owner = ", owner))
     println("%20s%d".format("Consumer offset = ", offset))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala Tue Jul 31 22:50:59 2012
@@ -100,7 +100,7 @@ object ExportZkOffsets extends Logging {
           for (bidPid <- bidPidList) {
             val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
             val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
-            val offsetVal  = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1
+            val offsetVal  = ZkUtils.readDataMaybeNull(zkClient, offsetPath)
             fileWriter.write(offsetPath + ":" + offsetVal + "\n")
             debug(offsetPath + " => " + offsetVal)
           }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Tue Jul 31 22:50:59 2012
@@ -104,7 +104,7 @@ object VerifyConsumerRebalance extends L
         }
         // try reading the partition owner path for see if a valid consumer id exists there
         val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1
+        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)
         if(partitionOwner == null) {
           error("No owner for topic %s partition %s".format(topic, partition))
           rebalanceSucceeded = false

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Tue Jul 31 22:50:59 2012
@@ -45,7 +45,7 @@ object UpdateOffsetsInZK {
   private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
     val cluster = ZkUtils.getCluster(zkClient)
     val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
-    var partitions: Seq[Int] = Nil
+    var partitions: Seq[String] = Nil
 
     partitionsPerTopicMap.get(topic) match {
       case Some(l) =>  partitions = l.sortWith((s,t) => s < t)
@@ -54,7 +54,7 @@ object UpdateOffsetsInZK {
 
     var numParts = 0
     for (partition <- partitions) {
-      val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+      val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition.toInt)
 
       val broker = brokerHostingPartition match {
         case Some(b) => b
@@ -68,7 +68,7 @@ object UpdateOffsetsInZK {
 
       val brokerInfo = brokerInfos.head
       val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
-      val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
+      val offsets = consumer.getOffsetsBefore(topic, partition.toInt, offsetOption, 1)
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
       println("updating partition " + partition + " with new offset: " + offsets(0))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -19,29 +19,22 @@ package kafka.utils
 
 import java.util.Properties
 import kafka.cluster.{Broker, Cluster}
+import kafka.common.NoEpochForPartitionException
 import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import util.parsing.json.JSON
-import kafka.api.LeaderAndISR
-import kafka.common.NoEpochForPartitionException
-import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
-import scala.throws
-
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
+  val BrokerStatePath = "/brokers/state"
   val ControllerPath = "/controller"
 
-  def getBrokerPath(brokerId: Int): String = {
-    BrokerIdsPath + "/" + brokerId
-  }
-
   def getTopicPath(topic: String): String ={
     BrokerTopicsPath + "/" + topic
   }
@@ -51,63 +44,53 @@ object ZkUtils extends Logging {
   }
 
   def getController(zkClient: ZkClient): Int= {
-    val controller = readDataMaybeNull(zkClient, ControllerPath)._1
+    val controller = readDataMaybeNull(zkClient, ControllerPath)
     controller.toInt
   }
 
-  def getTopicPartitionPath(topic: String, partitionId: Int): String ={
+  def getTopicPartitionPath(topic: String, partitionId: String): String ={
     getTopicPartitionsPath(topic) + "/" + partitionId
   }
 
-  def getTopicPartitionLeaderAndISRPath(topic: String, partitionId: Int): String ={
+  def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={
     getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
   }
 
-  def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
-    ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+  def getTopicVersion(zkClient: ZkClient, topic: String): String ={
+    readDataMaybeNull(zkClient, getTopicPath(topic))
   }
 
-  def getAllLiveBrokerIds(zkClient: ZkClient): Set[Int] = {
-    ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).toSet
+  def getTopicPartitionReplicasPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "replicas"
   }
 
-  def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
-    val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
-    getBrokerInfoFromIds(zkClient, brokerIds.map(_.toInt))
+  def getTopicPartitionInSyncPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "isr"
   }
 
+  def getTopicPartitionLeaderPath(topic: String, partitionId: String): String ={
+    getTopicPartitionPath(topic, partitionId) + "/" + "leader"
+  }
 
-  def getLeaderAndISRForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndISR] = {
-    val leaderAndISRPath = getTopicPartitionLeaderAndISRPath(topic, partition)
-    val ret = readDataMaybeNull(zkClient, leaderAndISRPath)
-    val leaderAndISRStr: String = ret._1
-    val stat = ret._2
-    if(leaderAndISRStr == null) None
-    else {
-      JSON.parseFull(leaderAndISRStr) match {
-        case Some(m) =>
-          val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
-          val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
-          val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
-          val ISR = Utils.getCSVList(ISRString).map(r => r.toInt)
-          val zkPathVersion = stat.getVersion
-          debug("Leader %d, Epoch %d, isr %s, zk path version %d for topic %s and partition %d".format(leader, epoch, ISR.toString(), zkPathVersion, topic, partition))
-          Some(LeaderAndISR(leader, epoch, ISR.toList, zkPathVersion))
-        case None => None
-      }
-    }
+  def getBrokerStateChangePath(brokerId: Int): String = {
+    BrokerStatePath + "/" + brokerId
+  }
+
+  def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
+      ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
 
+  def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
+    val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+    getBrokerInfoFromIds(zkClient, brokerIds.map(b => b.toInt))
+  }
 
   def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
-    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
-    if(leaderAndISR == null) None
+    val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
+    if(leaderAndEpoch == null) None
     else {
-      JSON.parseFull(leaderAndISR) match {
-        case Some(m) =>
-          Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
-        case None => None
-      }
+      val leaderAndEpochInfo = leaderAndEpoch.split(";")
+      Some(leaderAndEpochInfo.head.toInt)
     }
   }
 
@@ -116,53 +99,51 @@ object ZkUtils extends Logging {
    * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
    * other broker will retry becoming leader with the same new epoch value.
    */
-  def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
-    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
-    if(leaderAndISR != null) {
-      val epoch = JSON.parseFull(leaderAndISR) match {
-        case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
-        case Some(m) =>
-          m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+  def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
+    val lastKnownEpoch = try {
+      val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
+      if(isrAndEpoch != null) {
+        val isrAndEpochInfo = isrAndEpoch.split(";")
+        if(isrAndEpochInfo.last.isEmpty)
+          throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
+        else
+          isrAndEpochInfo.last.toInt
+      }else {
+        throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
       }
-      epoch
+    } catch {
+      case e: ZkNoNodeException =>
+        throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
+      case e1 => throw e1
     }
-    else
-      throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty".format(topic, partition))
+    lastKnownEpoch
   }
 
   /**
-   * Gets the in-sync replicas (ISR) for a specific topic and partition
+   * Gets the assigned replicas (AR) for a specific topic and partition
    */
-  def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
-    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
-    if(leaderAndISR == null) Seq.empty[Int]
-    else {
-      JSON.parseFull(leaderAndISR) match {
-        case Some(m) =>
-          val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
-          Utils.getCSVList(ISRString).map(r => r.toInt)
-        case None => Seq.empty[Int]
+  def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
+    val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator)
+    topicAndPartitionAssignment.get(topic) match {
+      case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match {
+        case Some(replicaList) => replicaList
+        case None => Seq.empty[String]
       }
+      case None => Seq.empty[String]
     }
   }
 
   /**
-   * Gets the assigned replicas (AR) for a specific topic and partition
+   * Gets the in-sync replicas (ISR) for a specific topic and partition
    */
-  def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
-    val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
-    val assignedReplicas = if (jsonPartitionMap == null) {
+  def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
+    val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
+    if(replicaListAndEpochString == null)
       Seq.empty[Int]
-    } else {
-      JSON.parseFull(jsonPartitionMap) match {
-        case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
-          case None => Seq.empty[Int]
-          case Some(seq) => seq.map(_.toInt)
-        }
-        case None => Seq.empty[Int]
-      }
+    else {
+      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+      Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
     }
-    assignedReplicas
   }
 
   def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
@@ -171,7 +152,25 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = {
+  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
+    try {
+      // NOTE: first increment epoch, then become leader
+      val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
+      createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
+        "%d;%d".format(brokerId, newEpoch))
+      val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+      val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
+      updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
+        "%s;%d".format(updatedISR.mkString(","), newEpoch))
+      info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
+      Some(newEpoch, updatedISR)
+    } catch {
+      case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
+      case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
+    }
+  }
+
+  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
     // read previous epoch, increment it and write it to the leader path and the ISR path.
     val epoch = try {
       Some(getEpochForPartition(client, topic, partition))
@@ -199,12 +198,15 @@ object ZkUtils extends Logging {
       createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
     } catch {
       case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
+                                   "indicates that you either have configured a brokerid that is already in use, or " +
+                                   "else you have shutdown this broker and restarted it faster than the zookeeper " +
+                                   "timeout so it appears to be re-registering.")
     }
     info("Registering broker " + brokerIdPath + " succeeded with " + broker)
   }
 
-  def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
+  def getConsumerPartitionOwnerPath(group: String, topic: String, partition: String): String = {
     val topicDirs = new ZKGroupTopicDirs(group, topic)
     topicDirs.consumerOwnerDir + "/" + partition
   }
@@ -252,7 +254,7 @@ object ZkUtils extends Logging {
         // this can happen when there is connection loss; make sure the data is what we intend to write
         var storedData: String = null
         try {
-          storedData = readData(client, path)._1
+          storedData = readData(client, path)
         } catch {
           case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
           case e2 => throw e2
@@ -290,24 +292,17 @@ object ZkUtils extends Logging {
   /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
-   * Return the updated path zkVersion
    */
-  def updatePersistentPath(client: ZkClient, path: String, data: String): Int = {
-    var stat: Stat = null
+  def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
     try {
-      stat = client.writeData(path, data)
-      return stat.getVersion
+      client.writeData(path, data)
     } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
         try {
           client.createPersistent(path, data)
-          // When the new path is created, its zkVersion always starts from 0
-          return 0
         } catch {
-          case e: ZkNodeExistsException =>
-            stat = client.writeData(path, data)
-            return  stat.getVersion
+          case e: ZkNodeExistsException => client.writeData(path, data)
           case e2 => throw e2
         }
       }
@@ -316,22 +311,6 @@ object ZkUtils extends Logging {
   }
 
   /**
-   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
-   * exist, the current version is not the expected version, etc.) return (false, -1)
-   */
-  def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
-    try {
-      val stat = client.writeData(path, data, expectVersion)
-      info("Conditional update the zkPath %s with expected version %d succeed and return the new version: %d".format(path, expectVersion, stat.getVersion))
-      (true, stat.getVersion)
-    } catch {
-      case e: Exception =>
-        info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion))
-        (false, -1)
-    }
-  }
-
-  /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
    */
@@ -370,23 +349,12 @@ object ZkUtils extends Logging {
     }
   }
 
-  def readData(client: ZkClient, path: String): (String, Stat) = {
-    val stat: Stat = new Stat()
-    val dataStr: String = client.readData(path, stat)
-    (dataStr, stat)
+  def readData(client: ZkClient, path: String): String = {
+    client.readData(path)
   }
 
-  def readDataMaybeNull(client: ZkClient, path: String): (String, Stat) = {
-    val stat: Stat = new Stat()
-    var dataStr: String = null
-    try{
-      dataStr = client.readData(path, stat)
-      return (dataStr, stat)
-    } catch {
-      case e: ZkNoNodeException =>
-        return (null, stat)
-      case e2 => throw e2
-    }
+  def readDataMaybeNull(client: ZkClient, path: String): String = {
+    client.readData(path, true)
   }
 
   def getChildren(client: ZkClient, path: String): Seq[String] = {
@@ -398,6 +366,7 @@ object ZkUtils extends Logging {
   def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
     import scala.collection.JavaConversions._
     // triggers implicit conversion from java list to scala Seq
+
     try {
       client.getChildren(path)
     } catch {
@@ -419,77 +388,32 @@ object ZkUtils extends Logging {
     val cluster = new Cluster
     val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
     for (node <- nodes) {
-      val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
+      val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)
       cluster.add(Broker.createBroker(node.toInt, brokerZKString))
     }
     cluster
   }
 
-  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = {
+    val ret = new mutable.HashMap[String, Map[String, List[String]]]()
     topics.foreach{ topic =>
-      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
-      if (jsonPartitionMap != null) {
-        JSON.parseFull(jsonPartitionMap) match {
-          case Some(m) =>
-            val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
-            for((partition, replicas) <- replicaMap){
-              ret.put((topic, partition.toInt), replicas.map(_.toInt))
-              debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
-            }
-          case None =>
-        }
-      }
-                  }
-    ret
-  }
-
-  def getPartitionLeaderAndISRForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), LeaderAndISR] = {
-    val ret = new mutable.HashMap[(String, Int), LeaderAndISR]
-    val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
-    for((topic, partitions) <- partitionsForTopics){
-      for(partition <- partitions){
-        val leaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition.toInt)
-        if(leaderAndISROpt.isDefined)
-          ret.put((topic, partition.toInt), leaderAndISROpt.get)
-      }
-    }
-    ret
-  }
-
-  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
-    val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
-    topics.foreach{ topic =>
-      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))
       val partitionMap = if (jsonPartitionMap == null) {
-        Map[Int, Seq[Int]]()
+        Map[String, List[String]]()
       } else {
         JSON.parseFull(jsonPartitionMap) match {
-          case Some(m) =>
-            val m1 = m.asInstanceOf[Map[String, Seq[String]]]
-            m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
-          case None => Map[Int, Seq[Int]]()
+          case Some(m) => m.asInstanceOf[Map[String, List[String]]]
+          case None => Map[String, List[String]]()
         }
       }
       debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
       ret += (topic -> partitionMap)
-                  }
-    ret
-  }
-
-  def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): mutable.Map[(String, Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
-    for((topic, partitionAssignment) <- topicPartitionAssignment){
-      for((partition, replicaAssignment) <- partitionAssignment){
-        ret.put((topic, partition), replicaAssignment)
-      }
     }
     ret
   }
 
-  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[Int]] = {
-    getPartitionAssignmentForTopics(zkClient, topics).map
-    { topicAndPartitionMap =>
+  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
+    getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
       val partitionMap = topicAndPartitionMap._2
       debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
@@ -497,20 +421,14 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[(String, Int), Seq[Int]] = {
-    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
     val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
-    topicsAndPartitions.map
-    {
-      topicAndPartitionMap =>
-        val topic = topicAndPartitionMap._1
-        val partitionMap = topicAndPartitionMap._2
-        val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
-        for((relevantPartition, replicaAssignment) <- relevantPartitionsMap){
-          ret.put((topic, relevantPartition), replicaAssignment)
-        }
+    topicsAndPartitions.map{ topicAndPartitionMap =>
+      val topic = topicAndPartitionMap._1
+      val partitionMap = topicAndPartitionMap._2
+      val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) )
+      (topic -> relevantPartitions.keySet.map(_.toInt).toSeq)
     }
-    ret
   }
 
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
@@ -528,7 +446,8 @@ object ZkUtils extends Logging {
   def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
     val dirs = new ZKGroupDirs(group)
     val consumersInGroup = getConsumersInGroup(zkClient, group)
-    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient))
+    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
+      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
     consumersInGroup.zip(topicCountMaps).toMap
   }
 
@@ -551,7 +470,8 @@ object ZkUtils extends Logging {
     consumersPerTopicMap
   }
 
-  def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1))
+  def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
+    brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
 
   def getAllTopics(zkClient: ZkClient): Seq[String] = {
     val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
@@ -559,45 +479,17 @@ object ZkUtils extends Logging {
     else topics
   }
 
-  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = {
-    // read previous epoch, increment it and write it to the leader path and the ISR path.
-    val epoch = try {
-      Some(getEpochForPartition(client, topic, partition))
-    }catch {
-      case e: NoEpochForPartitionException => None
-      case e1 => throw e1
-    }
-    val newEpoch = epoch match {
-      case Some(partitionEpoch) =>
-        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
-        partitionEpoch + 1
-      case None =>
-        // this is the first time leader is elected for this partition. So set epoch to 1
-        debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
-        LeaderAndISR.initialLeaderEpoch
-    }
-    newEpoch
-  }
 }
 
-class LeaderElectionListener(topic: String,
-                                    partition: Int,
-                                    leaderLock: ReentrantLock,
-                                    leaderExistsOrChanged: Condition,
-                                    zkClient: ZkClient = null) extends IZkDataListener with Logging {
+class LeaderExistsListener(topic: String, partition: Int, leaderLock: ReentrantLock, leaderExists: Condition) extends IZkDataListener {
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
     val t = dataPath.split("/").takeRight(3).head
     val p = dataPath.split("/").takeRight(2).head.toInt
     leaderLock.lock()
     try {
-      if(t == topic && p == partition){
-        val newLeaderOpt = ZkUtils.getLeaderForPartition(zkClient, t, p)
-        if(newLeaderOpt.isDefined && ZkUtils.pathExists (zkClient, ZkUtils.getBrokerPath(newLeaderOpt.get))){
-          trace("In leader election listener on partition [%s, %d], live leader %d is elected".format(topic, partition, newLeaderOpt.get))
-          leaderExistsOrChanged.signal()
-        }
-      }
+      if(t == topic && p == partition)
+        leaderExists.signal()
     }
     finally {
       leaderLock.unlock()
@@ -605,9 +497,15 @@ class LeaderElectionListener(topic: Stri
   }
 
   @throws(classOf[Exception])
-  def handleDataDeleted(dataPath: String){
-    // Nothing
+  def handleDataDeleted(dataPath: String) {
+    leaderLock.lock()
+    try {
+      leaderExists.signal()
+    }finally {
+      leaderLock.unlock()
+    }
   }
+
 }
 
 object ZKStringSerializer extends ZkSerializer {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Tue Jul 31 22:50:59 2012
@@ -95,8 +95,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
     // wait to make sure the topic and partition have a leader for the successful case
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
@@ -128,8 +128,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -153,8 +153,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -182,8 +182,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
@@ -214,8 +214,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -239,8 +239,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -320,8 +320,8 @@ class ZookeeperConsumerConnectorTest ext
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     val zkConsumerConnector =
       new ZookeeperConsumerConnector(consumerConfig, true)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala Tue Jul 31 22:50:59 2012
@@ -25,8 +25,7 @@ import kafka.server.{KafkaServer, KafkaC
 import kafka.api._
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.admin.CreateTopicCommand
-import kafka.utils.{ZkUtils, ControllerTestUtils, TestUtils}
+import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils}
 
 
 class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness  {
@@ -37,13 +36,11 @@ class ControllerBasicTest extends JUnit3
   override def setUp() {
     super.setUp()
     brokers = configs.map(config => TestUtils.createServer(config))
-    CreateTopicCommand.createTopic(zkClient, "test1", 1, 4, "0:1:2:3")
-    CreateTopicCommand.createTopic(zkClient, "test2", 1, 4, "0:1:2:3")
   }
 
   override def tearDown() {
-    brokers.foreach(_.shutdown())
     super.tearDown()
+    brokers.foreach(_.shutdown())
   }
 
   def testControllerFailOver(){
@@ -52,39 +49,35 @@ class ControllerBasicTest extends JUnit3
     brokers(3).shutdown()
     Thread.sleep(1000)
 
-    var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
-    info("cur controller " + curController)
+    var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
     assertEquals(curController, "2")
 
-
     brokers(1).startup()
     brokers(2).shutdown()
     Thread.sleep(1000)
-    curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
-    info("cur controller " + curController)
-    assertEquals("Controller should be on broker 1", curController, "1")
+    curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+    assertEquals(curController, "1")
   }
 
   def testControllerCommandSend(){
     Thread.sleep(1000)
-
     for(broker <- brokers){
       if(broker.kafkaController.isActive){
-        val leaderAndISRRequest = ControllerTestUtils.createTestLeaderAndISRRequest()
-        val stopReplicaRequest = ControllerTestUtils.createTestStopReplicaRequest()
+        val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()
+        val stopReplicaRequest = ControllerTestUtils.createSampleStopReplicaRequest()
 
         val successCount: AtomicInteger = new AtomicInteger(0)
         val countDownLatch: CountDownLatch = new CountDownLatch(8)
 
         def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
-          val expectedResponse = ControllerTestUtils.createTestLeaderAndISRResponse()
+          val expectedResponse = ControllerTestUtils.createSampleLeaderAndISRResponse()
           if(response.equals(expectedResponse))
             successCount.addAndGet(1)
           countDownLatch.countDown()
         }
 
         def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
-          val expectedResponse = ControllerTestUtils.createTestStopReplicaResponse()
+          val expectedResponse = ControllerTestUtils.createSampleStopReplicaResponse()
           if(response.equals(expectedResponse))
             successCount.addAndGet(1)
           countDownLatch.countDown()
@@ -94,10 +87,10 @@ class ControllerBasicTest extends JUnit3
         broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
         broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
         broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(0, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(1, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(2, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(3, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
         countDownLatch.await()
 
         assertEquals(successCount.get(), 8)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Tue Jul 31 22:50:59 2012
@@ -57,7 +57,7 @@ class BackwardsCompatibilityTest extends
   // test for reading data with magic byte 0
   def testProtocolVersion0() {
     CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
     var fetchOffset: Long = 0L
     var messageCount: Int = 0

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -35,28 +35,27 @@ import kafka.admin.CreateTopicCommand
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
   val numNodes = 1
-  val configs =
+  val configs = 
     for(props <- TestUtils.createBrokerConfigs(numNodes))
-    yield new KafkaConfig(props)
+      yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                           c.brokerId,
-                                                           0,
-                                                           queue,
-                                                           new AtomicLong(0),
-                                                           new AtomicLong(0),
-                                                           new AtomicInteger(0)))
-
+                                                      c.brokerId,
+                                                      0,
+                                                      queue, 
+                                                      new AtomicLong(0), 
+                                                      new AtomicLong(0), 
+                                                      new AtomicInteger(0)))
+  
   var fetcher: ConsumerFetcherManager = null
 
   override def setUp() {
     super.setUp
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
     fetcher.stopAllConnections()
     fetcher.startConnections(topicInfos, cluster)
@@ -66,20 +65,20 @@ class FetcherTest extends JUnit3Suite wi
     fetcher.shutdown()
     super.tearDown
   }
-
+    
   def testFetcher() {
     val perNode = 2
     var count = sendMessages(perNode)
-
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     fetch(count)
     assertQueueEmpty()
     count = sendMessages(perNode)
     fetch(count)
     assertQueueEmpty()
   }
-
+  
   def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
-
+  
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
@@ -92,7 +91,7 @@ class FetcherTest extends JUnit3Suite wi
     }
     count
   }
-
+  
   def fetch(expected: Int) {
     var count = 0
     while(true) {
@@ -104,5 +103,5 @@ class FetcherTest extends JUnit3Suite wi
         return
     }
   }
-
+    
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -134,7 +134,7 @@ class LazyInitProducerTest extends JUnit
       builder.addFetch(topic, 0, 0, 10000)
     }
     // wait until leader is elected
-    topics.foreach(topic => TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500))
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500))
     producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
@@ -160,7 +160,7 @@ class LazyInitProducerTest extends JUnit
       builder.addFetch(topic, 0, 0, 10000)
     }
     // wait until leader is elected
-    topics.foreach(topic => TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 1500))
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1500))
 
     producer.send(produceList: _*)
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Tue Jul 31 22:50:59 2012
@@ -352,7 +352,7 @@ class PrimitiveApiTest extends JUnit3Sui
   def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
     for( topic <- topics ) {
       CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
-      TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
+      TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Jul 31 22:50:59 2012
@@ -117,8 +117,7 @@ class TopicMetadataTest extends JUnit3Su
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null,
-                             null, null, null, 1)
+    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Tue Jul 31 22:50:59 2012
@@ -54,8 +54,8 @@ class ZookeeperConsumerConnectorTest ext
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1")
 
-    waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue Jul 31 22:50:59 2012
@@ -118,7 +118,7 @@ class LogOffsetTest extends JUnit3Suite 
 
     // setup brokers in zookeeper as owners of partitions for this test
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
 
     var offsetChanged = false
     for(i <- 1 to 14) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Tue Jul 31 22:50:59 2012
@@ -27,10 +27,9 @@ import junit.framework.Assert._
 
 class SocketServerTest extends JUnitSuite {
 
-  val server: SocketServer = new SocketServer(0,
-                                              port = TestUtils.choosePort,
-                                              numProcessorThreads = 1,
-                                              monitoringPeriodSecs = 30,
+  val server: SocketServer = new SocketServer(port = TestUtils.choosePort, 
+                                              numProcessorThreads = 1, 
+                                              monitoringPeriodSecs = 30, 
                                               maxQueuedRequests = 50,
                                               maxRequestSize = 50)
   server.startup()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -106,6 +106,13 @@ class AsyncProducerTest extends JUnit3Su
     }
   }
 
+  def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
+    val producerDataList = new ListBuffer[ProducerData[String,String]]
+    for (i <- 0 until nEvents)
+      producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
+    producerDataList
+  }
+
   @Test
   def testBatchSize() {
     /**
@@ -523,13 +530,6 @@ class AsyncProducerTest extends JUnit3Su
     }
   }
 
-  def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
-    val producerDataList = new ListBuffer[ProducerData[String,String]]
-    for (i <- 0 until nEvents)
-      producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
-    producerDataList
-  }
-
   private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
     val encoder = new StringEncoder
     new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -104,7 +104,7 @@ class ProducerTest extends JUnit3Suite w
 
     // create topic with 1 partition and await leadership
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
 
     val producer1 = new Producer[String, String](config1)
     val producer2 = new Producer[String, String](config2)
@@ -155,10 +155,10 @@ class ProducerTest extends JUnit3Suite w
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 0, 500)
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 1, 500)
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 2, 500)
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 3, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 1, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 2, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 3, 500)
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
@@ -216,7 +216,7 @@ class ProducerTest extends JUnit3Suite w
 
     // create topics in ZK
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay
     try {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
+ * 
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -159,9 +159,9 @@ class SyncProducerTest extends JUnit3Sui
 
     // #2 - test that we get correct offsets when partition is owned by broker
     CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "topic1", 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500)
     CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
-    TestUtils.waitUntilLiveLeaderIsElected(zkClient, "topic3", 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
 
     val response2 = producer.send(request)
     Assert.assertNotNull(response2)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Tue Jul 31 22:50:59 2012
@@ -39,9 +39,8 @@ class HighwatermarkPersistenceTest exten
     val scheduler = new KafkaScheduler(2)
     scheduler.startUp
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
     replicaManager.startup()
-    replicaManager.startHighWaterMarksCheckPointThread()
     // sleep until flush ms
     Thread.sleep(configs.head.defaultFlushIntervalMs)
     var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
@@ -84,9 +83,8 @@ class HighwatermarkPersistenceTest exten
     val scheduler = new KafkaScheduler(2)
     scheduler.startUp
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
     replicaManager.startup()
-    replicaManager.startHighWaterMarksCheckPointThread()
     // sleep until flush ms
     Thread.sleep(configs.head.defaultFlushIntervalMs)
     var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Tue Jul 31 22:50:59 2012
@@ -100,7 +100,7 @@ class ISRExpirationTest extends JUnit3Su
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, null)
+    val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler)
     try {
       val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
       // create leader log

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Tue Jul 31 22:50:59 2012
@@ -20,11 +20,12 @@ package kafka.server
 import org.scalatest.junit.JUnit3Suite
 import kafka.zk.ZooKeeperTestHarness
 import kafka.admin.CreateTopicCommand
+import kafka.utils.TestUtils._
 import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
-import kafka.utils.TestUtils._
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
+
   val brokerId1 = 0
   val brokerId2 = 1
 
@@ -33,23 +34,23 @@ class LeaderElectionTest extends JUnit3S
 
   val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+
 
   override def setUp() {
     super.setUp()
-    // start both servers
-    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
-    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-    servers ++= List(server1, server2)
   }
 
   override def tearDown() {
-    servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
     super.tearDown()
   }
 
-  def testLeaderElectionAndEpoch {
+  def testLeaderElectionWithCreateTopic {
+    var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+    // start both servers
+    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+
+    servers ++= List(server1, server2)
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -58,39 +59,62 @@ class LeaderElectionTest extends JUnit3S
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    val leader1 = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
-    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch1)
-    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
-    assertTrue("Leader should get elected", leader1.isDefined)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
-    assertEquals("First epoch value should be 0", 0, leaderEpoch1)
+    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
     // kill the server hosting the preferred replica
-    servers.last.shutdown()
+    server1.shutdown()
+
     // check if leader moves to the other server
-    val leader2 = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 1500)
-    val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
-    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
-    debug("leader Epoc: " + leaderEpoch2)
-    assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
-    if(leader1.get == leader2.get)
-      assertEquals("Second epoch value should be " + leaderEpoch1, leaderEpoch1, leaderEpoch2)
-    else
-      assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
-
-    servers.last.startup()
-    servers.head.shutdown()
-    Thread.sleep(zookeeper.tickTime)
-    val leader3 = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 1500)
-    val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
-    debug("leader Epoc: " + leaderEpoch3)
-    debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
-    assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
-    if(leader2.get == leader3.get)
-      assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
-    else
-      assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
+    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+    val leaderPath = zkClient.getChildren(ZkUtils.getTopicPartitionPath(topic, "0"))
+    // bring the preferred replica back
+    servers.head.startup()
+
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+
+    // shutdown current leader (broker 1)
+    servers.last.shutdown()
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+
+    // test if the leader is the preferred replica
+    assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+    // shutdown the servers and delete data hosted on them
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
+  }
+
+  // Assuming leader election happens correctly, test if epoch changes as expected
+  def testEpoch() {
+    // keep switching leaders to see if epoch changes correctly
+    val topic = "new-topic"
+    val partitionId = 0
+
+    // setup 2 brokers in ZK
+    val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
+
+    // create topic with 1 partition, 2 replicas, one on each broker
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+    var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+    assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+    assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
+
+    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+    assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+    assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
+
+    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+    assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+    assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
+
+    TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Jul 31 22:50:59 2012
@@ -57,7 +57,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -89,7 +89,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -103,13 +103,13 @@ class LogRecoveryTest extends JUnit3Suit
     assertEquals(30L, hwFile1.read(topic, 0))
 
     // check if leader moves to the other server
-    leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     // bring the preferred replica back
     server1.startup()
 
-    leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
 
     assertEquals(30L, hwFile1.read(topic, 0))
@@ -118,7 +118,7 @@ class LogRecoveryTest extends JUnit3Suit
     assertEquals(30L, hwFile2.read(topic, 0))
 
     server2.startup()
-    leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
 
     sendMessages()
@@ -159,7 +159,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -202,7 +202,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -218,7 +218,7 @@ class LogRecoveryTest extends JUnit3Suit
 
     server2.startup()
     // check if leader moves to the other server
-    leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     assertEquals(60L, hwFile1.read(topic, 0))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Tue Jul 31 22:50:59 2012
@@ -51,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Sui
     // create a topic and partition and await leadership
     for (topic <- List(topic1,topic2)) {
       CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
-      TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 1000)
+      TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
     }
 
     // send test messages to leader

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Tue Jul 31 22:50:59 2012
@@ -73,7 +73,7 @@ class RequestPurgatoryTest {
     assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
   }
   
-  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest]("Mock Request Purgatory") {
+  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
     val satisfied = mutable.Set[DelayedRequest]()
     val expired = mutable.Set[DelayedRequest]()
     def awaitExpiration(delayed: DelayedRequest) = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Tue Jul 31 22:50:59 2012
@@ -73,7 +73,7 @@ class ServerShutdownTest extends JUnit3S
       val server = new KafkaServer(config)
       server.startup()
 
-      waitUntilLiveLeaderIsElected(zkClient, topic, 0, 1000)
+      waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
 
       var fetchedMessage: ByteBufferMessageSet = null
       while(fetchedMessage == null || fetchedMessage.validBytes == 0) {