You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 18:14:01 UTC

svn commit: r1368092 [3/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Wed Aug  1 16:13:59 2012
@@ -33,7 +33,7 @@ object ConsumerOffsetChecker extends Log
   // e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
 
   private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
-    val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))
+    val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
     val consumer = brokerInfo match {
       case BrokerIpPattern(ip, port) =>
         Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
@@ -47,9 +47,9 @@ object ConsumerOffsetChecker extends Log
   private def processPartition(zkClient: ZkClient,
                                group: String, topic: String, bidPid: String) {
     val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
-            format(group, topic, bidPid)).toLong
+            format(group, topic, bidPid))._1.toLong
     val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
-            format(group, topic, bidPid))
+            format(group, topic, bidPid))._1
     println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
     println("%20s%s".format("Owner = ", owner))
     println("%20s%d".format("Consumer offset = ", offset))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala Wed Aug  1 16:13:59 2012
@@ -100,7 +100,7 @@ object ExportZkOffsets extends Logging {
           for (bidPid <- bidPidList) {
             val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
             val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
-            val offsetVal  = ZkUtils.readDataMaybeNull(zkClient, offsetPath)
+            val offsetVal  = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1
             fileWriter.write(offsetPath + ":" + offsetVal + "\n")
             debug(offsetPath + " => " + offsetVal)
           }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Wed Aug  1 16:13:59 2012
@@ -104,7 +104,7 @@ object VerifyConsumerRebalance extends L
         }
         // try reading the partition owner path for see if a valid consumer id exists there
         val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
-        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)
+        val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1
         if(partitionOwner == null) {
           error("No owner for topic %s partition %s".format(topic, partition))
           rebalanceSucceeded = false

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Wed Aug  1 16:13:59 2012
@@ -45,7 +45,7 @@ object UpdateOffsetsInZK {
   private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
     val cluster = ZkUtils.getCluster(zkClient)
     val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
-    var partitions: Seq[String] = Nil
+    var partitions: Seq[Int] = Nil
 
     partitionsPerTopicMap.get(topic) match {
       case Some(l) =>  partitions = l.sortWith((s,t) => s < t)
@@ -54,7 +54,7 @@ object UpdateOffsetsInZK {
 
     var numParts = 0
     for (partition <- partitions) {
-      val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition.toInt)
+      val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
 
       val broker = brokerHostingPartition match {
         case Some(b) => b
@@ -68,7 +68,7 @@ object UpdateOffsetsInZK {
 
       val brokerInfo = brokerInfos.head
       val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
-      val offsets = consumer.getOffsetsBefore(topic, partition.toInt, offsetOption, 1)
+      val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
       val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
 
       println("updating partition " + partition + " with new offset: " + offsets(0))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Wed Aug  1 16:13:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -19,20 +19,21 @@ package kafka.utils
 
 import java.util.Properties
 import kafka.cluster.{Broker, Cluster}
-import kafka.common.NoEpochForPartitionException
 import kafka.consumer.TopicCount
 import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
 import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
 import org.I0Itec.zkclient.serialize.ZkSerializer
 import scala.collection._
 import util.parsing.json.JSON
+import kafka.api.LeaderAndISR
+import kafka.common.NoEpochForPartitionException
+import org.apache.zookeeper.data.Stat
 import java.util.concurrent.locks.{ReentrantLock, Condition}
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
   val BrokerIdsPath = "/brokers/ids"
   val BrokerTopicsPath = "/brokers/topics"
-  val BrokerStatePath = "/brokers/state"
   val ControllerPath = "/controller"
 
   def getTopicPath(topic: String): String ={
@@ -44,53 +45,63 @@ object ZkUtils extends Logging {
   }
 
   def getController(zkClient: ZkClient): Int= {
-    val controller = readDataMaybeNull(zkClient, ControllerPath)
+    val controller = readDataMaybeNull(zkClient, ControllerPath)._1
     controller.toInt
   }
 
-  def getTopicPartitionPath(topic: String, partitionId: String): String ={
+  def getTopicPartitionPath(topic: String, partitionId: Int): String ={
     getTopicPartitionsPath(topic) + "/" + partitionId
   }
 
-  def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={
+  def getTopicPartitionLeaderAndISRPath(topic: String, partitionId: Int): String ={
     getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
   }
 
-  def getTopicVersion(zkClient: ZkClient, topic: String): String ={
-    readDataMaybeNull(zkClient, getTopicPath(topic))
-  }
-
-  def getTopicPartitionReplicasPath(topic: String, partitionId: String): String ={
-    getTopicPartitionPath(topic, partitionId) + "/" + "replicas"
+  def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
+    ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
   }
 
-  def getTopicPartitionInSyncPath(topic: String, partitionId: String): String ={
-    getTopicPartitionPath(topic, partitionId) + "/" + "isr"
+  def getAllLiveBrokerIds(zkClient: ZkClient): Set[Int] = {
+    ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).toSet
   }
 
-  def getTopicPartitionLeaderPath(topic: String, partitionId: String): String ={
-    getTopicPartitionPath(topic, partitionId) + "/" + "leader"
+  def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
+    val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+    getBrokerInfoFromIds(zkClient, brokerIds.map(_.toInt))
   }
 
-  def getBrokerStateChangePath(brokerId: Int): String = {
-    BrokerStatePath + "/" + brokerId
-  }
 
-  def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
-      ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+  def getLeaderAndISRForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndISR] = {
+    val leaderAndISRPath = getTopicPartitionLeaderAndISRPath(topic, partition)
+    val ret = readDataMaybeNull(zkClient, leaderAndISRPath)
+    val leaderAndISRStr: String = ret._1
+    val stat = ret._2
+    if(leaderAndISRStr == null) None
+    else {
+      JSON.parseFull(leaderAndISRStr) match {
+        case Some(m) =>
+          val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
+          val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+          val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
+          val ISR = Utils.getCSVList(ISRString).map(r => r.toInt)
+          val zkPathVersion = stat.getVersion
+          debug("Leader %d, Epoch %d, isr %s, zk path version %d for topic %s and partition %d".format(leader, epoch, ISR.toString(), zkPathVersion, topic, partition))
+          Some(LeaderAndISR(leader, epoch, ISR.toList, zkPathVersion))
+        case None => None
+      }
+    }
   }
 
-  def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
-    val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
-    getBrokerInfoFromIds(zkClient, brokerIds.map(b => b.toInt))
-  }
 
   def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
-    val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
-    if(leaderAndEpoch == null) None
+    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
+    if(leaderAndISR == null) None
     else {
-      val leaderAndEpochInfo = leaderAndEpoch.split(";")
-      Some(leaderAndEpochInfo.head.toInt)
+      JSON.parseFull(leaderAndISR) match {
+        case Some(m) =>
+          Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
+        case None => None
+      }
     }
   }
 
@@ -99,51 +110,53 @@ object ZkUtils extends Logging {
    * leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
    * other broker will retry becoming leader with the same new epoch value.
    */
-  def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
-    val lastKnownEpoch = try {
-      val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
-      if(isrAndEpoch != null) {
-        val isrAndEpochInfo = isrAndEpoch.split(";")
-        if(isrAndEpochInfo.last.isEmpty)
-          throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
-        else
-          isrAndEpochInfo.last.toInt
-      }else {
-        throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
+  def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
+    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
+    if(leaderAndISR != null) {
+      val epoch = JSON.parseFull(leaderAndISR) match {
+        case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
+        case Some(m) =>
+          m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
       }
-    } catch {
-      case e: ZkNoNodeException =>
-        throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
-      case e1 => throw e1
+      epoch
     }
-    lastKnownEpoch
+    else
+      throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty".format(topic, partition))
   }
 
   /**
-   * Gets the assigned replicas (AR) for a specific topic and partition
+   * Gets the in-sync replicas (ISR) for a specific topic and partition
    */
-  def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
-    val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator)
-    topicAndPartitionAssignment.get(topic) match {
-      case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match {
-        case Some(replicaList) => replicaList
-        case None => Seq.empty[String]
+  def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
+    val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
+    if(leaderAndISR == null) Seq.empty[Int]
+    else {
+      JSON.parseFull(leaderAndISR) match {
+        case Some(m) =>
+          val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
+          Utils.getCSVList(ISRString).map(r => r.toInt)
+        case None => Seq.empty[Int]
       }
-      case None => Seq.empty[String]
     }
   }
 
   /**
-   * Gets the in-sync replicas (ISR) for a specific topic and partition
+   * Gets the assigned replicas (AR) for a specific topic and partition
    */
-  def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
-    val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
-    if(replicaListAndEpochString == null)
+  def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
+    val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+    val assignedReplicas = if (jsonPartitionMap == null) {
       Seq.empty[Int]
-    else {
-      val replicasAndEpochInfo = replicaListAndEpochString.split(";")
-      Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
+    } else {
+      JSON.parseFull(jsonPartitionMap) match {
+        case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
+          case None => Seq.empty[Int]
+          case Some(seq) => seq.map(_.toInt)
+        }
+        case None => Seq.empty[Int]
+      }
     }
+    assignedReplicas
   }
 
   def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
@@ -152,25 +165,7 @@ object ZkUtils extends Logging {
     replicas.contains(brokerId.toString)
   }
 
-  def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
-    try {
-      // NOTE: first increment epoch, then become leader
-      val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
-      createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
-        "%d;%d".format(brokerId, newEpoch))
-      val currentISR = getInSyncReplicasForPartition(client, topic, partition)
-      val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
-      updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
-        "%s;%d".format(updatedISR.mkString(","), newEpoch))
-      info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
-      Some(newEpoch, updatedISR)
-    } catch {
-      case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
-      case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
-    }
-  }
-
-  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
+  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = {
     // read previous epoch, increment it and write it to the leader path and the ISR path.
     val epoch = try {
       Some(getEpochForPartition(client, topic, partition))
@@ -198,15 +193,12 @@ object ZkUtils extends Logging {
       createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
     } catch {
       case e: ZkNodeExistsException =>
-        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
-                                   "indicates that you either have configured a brokerid that is already in use, or " +
-                                   "else you have shutdown this broker and restarted it faster than the zookeeper " +
-                                   "timeout so it appears to be re-registering.")
+        throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
     }
     info("Registering broker " + brokerIdPath + " succeeded with " + broker)
   }
 
-  def getConsumerPartitionOwnerPath(group: String, topic: String, partition: String): String = {
+  def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
     val topicDirs = new ZKGroupTopicDirs(group, topic)
     topicDirs.consumerOwnerDir + "/" + partition
   }
@@ -254,7 +246,7 @@ object ZkUtils extends Logging {
         // this can happen when there is connection loss; make sure the data is what we intend to write
         var storedData: String = null
         try {
-          storedData = readData(client, path)
+          storedData = readData(client, path)._1
         } catch {
           case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
           case e2 => throw e2
@@ -292,17 +284,24 @@ object ZkUtils extends Logging {
   /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
+   * Return the updated path zkVersion
    */
-  def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
+  def updatePersistentPath(client: ZkClient, path: String, data: String): Int = {
+    var stat: Stat = null
     try {
-      client.writeData(path, data)
+      stat = client.writeData(path, data)
+      return stat.getVersion
     } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
         try {
           client.createPersistent(path, data)
+          // When the new path is created, its zkVersion always starts from 0
+          return 0
         } catch {
-          case e: ZkNodeExistsException => client.writeData(path, data)
+          case e: ZkNodeExistsException =>
+            stat = client.writeData(path, data)
+            return  stat.getVersion
           case e2 => throw e2
         }
       }
@@ -311,6 +310,22 @@ object ZkUtils extends Logging {
   }
 
   /**
+   * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
+   * exist, the current version is not the expected version, etc.) return (false, -1)
+   */
+  def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
+    try {
+      val stat = client.writeData(path, data, expectVersion)
+      info("Conditional update the zkPath %s with expected version %d succeed and return the new version: %d".format(path, expectVersion, stat.getVersion))
+      (true, stat.getVersion)
+    } catch {
+      case e: Exception =>
+        info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion))
+        (false, -1)
+    }
+  }
+
+  /**
    * Update the value of a persistent node with the given path and data.
    * create parrent directory if necessary. Never throw NodeExistException.
    */
@@ -349,12 +364,23 @@ object ZkUtils extends Logging {
     }
   }
 
-  def readData(client: ZkClient, path: String): String = {
-    client.readData(path)
+  def readData(client: ZkClient, path: String): (String, Stat) = {
+    val stat: Stat = new Stat()
+    val dataStr: String = client.readData(path, stat)
+    (dataStr, stat)
   }
 
-  def readDataMaybeNull(client: ZkClient, path: String): String = {
-    client.readData(path, true)
+  def readDataMaybeNull(client: ZkClient, path: String): (String, Stat) = {
+    val stat: Stat = new Stat()
+    var dataStr: String = null
+    try{
+      dataStr = client.readData(path, stat)
+      return (dataStr, stat)
+    } catch {
+      case e: ZkNoNodeException =>
+        return (null, stat)
+      case e2 => throw e2
+    }
   }
 
   def getChildren(client: ZkClient, path: String): Seq[String] = {
@@ -366,7 +392,6 @@ object ZkUtils extends Logging {
   def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
     import scala.collection.JavaConversions._
     // triggers implicit conversion from java list to scala Seq
-
     try {
       client.getChildren(path)
     } catch {
@@ -388,32 +413,77 @@ object ZkUtils extends Logging {
     val cluster = new Cluster
     val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
     for (node <- nodes) {
-      val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)
+      val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
       cluster.add(Broker.createBroker(node.toInt, brokerZKString))
     }
     cluster
   }
 
-  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = {
-    val ret = new mutable.HashMap[String, Map[String, List[String]]]()
+  def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), Seq[Int]] = {
+    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
     topics.foreach{ topic =>
-      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))
+      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+      if (jsonPartitionMap != null) {
+        JSON.parseFull(jsonPartitionMap) match {
+          case Some(m) =>
+            val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
+            for((partition, replicas) <- replicaMap){
+              ret.put((topic, partition.toInt), replicas.map(_.toInt))
+              debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+            }
+          case None =>
+        }
+      }
+                  }
+    ret
+  }
+
+  def getPartitionLeaderAndISRForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), LeaderAndISR] = {
+    val ret = new mutable.HashMap[(String, Int), LeaderAndISR]
+    val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
+    for((topic, partitions) <- partitionsForTopics){
+      for(partition <- partitions){
+        val leaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition.toInt)
+        if(leaderAndISROpt.isDefined)
+          ret.put((topic, partition.toInt), leaderAndISROpt.get)
+      }
+    }
+    ret
+  }
+
+  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
+    val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
+    topics.foreach{ topic =>
+      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
       val partitionMap = if (jsonPartitionMap == null) {
-        Map[String, List[String]]()
+        Map[Int, Seq[Int]]()
       } else {
         JSON.parseFull(jsonPartitionMap) match {
-          case Some(m) => m.asInstanceOf[Map[String, List[String]]]
-          case None => Map[String, List[String]]()
+          case Some(m) =>
+            val m1 = m.asInstanceOf[Map[String, Seq[String]]]
+            m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
+          case None => Map[Int, Seq[Int]]()
         }
       }
       debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
       ret += (topic -> partitionMap)
+                  }
+    ret
+  }
+
+  def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): mutable.Map[(String, Int), Seq[Int]] = {
+    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+    for((topic, partitionAssignment) <- topicPartitionAssignment){
+      for((partition, replicaAssignment) <- partitionAssignment){
+        ret.put((topic, partition), replicaAssignment)
+      }
     }
     ret
   }
 
-  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
-    getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap =>
+  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[Int]] = {
+    getPartitionAssignmentForTopics(zkClient, topics).map
+    { topicAndPartitionMap =>
       val topic = topicAndPartitionMap._1
       val partitionMap = topicAndPartitionMap._2
       debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
@@ -421,14 +491,20 @@ object ZkUtils extends Logging {
     }
   }
 
-  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
+  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[(String, Int), Seq[Int]] = {
+    val ret = new mutable.HashMap[(String, Int), Seq[Int]]
     val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
-    topicsAndPartitions.map{ topicAndPartitionMap =>
-      val topic = topicAndPartitionMap._1
-      val partitionMap = topicAndPartitionMap._2
-      val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) )
-      (topic -> relevantPartitions.keySet.map(_.toInt).toSeq)
+    topicsAndPartitions.map
+    {
+      topicAndPartitionMap =>
+        val topic = topicAndPartitionMap._1
+        val partitionMap = topicAndPartitionMap._2
+        val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
+        for((relevantPartition, replicaAssignment) <- relevantPartitionsMap){
+          ret.put((topic, relevantPartition), replicaAssignment)
+        }
     }
+    ret
   }
 
   def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
@@ -446,8 +522,7 @@ object ZkUtils extends Logging {
   def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
     val dirs = new ZKGroupDirs(group)
     val consumersInGroup = getConsumersInGroup(zkClient, group)
-    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
-      ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
+    val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient))
     consumersInGroup.zip(topicCountMaps).toMap
   }
 
@@ -470,8 +545,7 @@ object ZkUtils extends Logging {
     consumersPerTopicMap
   }
 
-  def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
-    brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
+  def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1))
 
   def getAllTopics(zkClient: ZkClient): Seq[String] = {
     val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
@@ -479,17 +553,55 @@ object ZkUtils extends Logging {
     else topics
   }
 
+  def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = {
+    // read previous epoch, increment it and write it to the leader path and the ISR path.
+    val epoch = try {
+      Some(getEpochForPartition(client, topic, partition))
+    }catch {
+      case e: NoEpochForPartitionException => None
+      case e1 => throw e1
+    }
+    val newEpoch = epoch match {
+      case Some(partitionEpoch) =>
+        debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
+        partitionEpoch + 1
+      case None =>
+        // this is the first time leader is elected for this partition. So set epoch to 1
+        debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
+        LeaderAndISR.initialLeaderEpoch
+    }
+    newEpoch
+  }
 }
 
-class LeaderExistsListener(topic: String, partition: Int, leaderLock: ReentrantLock, leaderExists: Condition) extends IZkDataListener {
+
+
+
+class LeaderExistsOrChangedListener(topic: String,
+                                    partition: Int,
+                                    leaderLock: ReentrantLock,
+                                    leaderExistsOrChanged: Condition,
+                                    oldLeaderOpt: Option[Int] = None,
+                                    zkClient: ZkClient = null) extends IZkDataListener with Logging {
   @throws(classOf[Exception])
   def handleDataChange(dataPath: String, data: Object) {
     val t = dataPath.split("/").takeRight(3).head
     val p = dataPath.split("/").takeRight(2).head.toInt
     leaderLock.lock()
     try {
-      if(t == topic && p == partition)
-        leaderExists.signal()
+      if(t == topic && p == partition){
+        if(oldLeaderOpt == None){
+          trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic, partition))
+          leaderExistsOrChanged.signal()
+        }
+        else {
+          val newLeaderOpt = ZkUtils.getLeaderForPartition(zkClient, t, p)
+          if(newLeaderOpt.isDefined && newLeaderOpt.get != oldLeaderOpt.get){
+            trace("In leader change listener on partition [%s, %d], leader has been moved from %d to %d".format(topic, partition, oldLeaderOpt.get, newLeaderOpt.get))
+            leaderExistsOrChanged.signal()
+          }
+        }
+      }
     }
     finally {
       leaderLock.unlock()
@@ -500,12 +612,11 @@ class LeaderExistsListener(topic: String
   def handleDataDeleted(dataPath: String) {
     leaderLock.lock()
     try {
-      leaderExists.signal()
+      leaderExistsOrChanged.signal()
     }finally {
       leaderLock.unlock()
     }
   }
-
 }
 
 object ZKStringSerializer extends ZkSerializer {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Wed Aug  1 16:13:59 2012
@@ -95,8 +95,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
     // wait to make sure the topic and partition have a leader for the successful case
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
@@ -128,8 +128,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -152,8 +152,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -181,8 +181,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
@@ -213,8 +213,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -237,8 +237,8 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
     val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
     val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -317,8 +317,8 @@ class ZookeeperConsumerConnectorTest ext
 
     val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     val zkConsumerConnector =
       new ZookeeperConsumerConnector(consumerConfig, true)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala Wed Aug  1 16:13:59 2012
@@ -25,7 +25,8 @@ import kafka.server.{KafkaServer, KafkaC
 import kafka.api._
 import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicInteger
-import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils}
+import kafka.admin.CreateTopicCommand
+import kafka.utils.{ZkUtils, ControllerTestUtils, TestUtils}
 
 
 class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness  {
@@ -36,11 +37,13 @@ class ControllerBasicTest extends JUnit3
   override def setUp() {
     super.setUp()
     brokers = configs.map(config => TestUtils.createServer(config))
+    CreateTopicCommand.createTopic(zkClient, "test1", 1, 4, "0:1:2:3")
+    CreateTopicCommand.createTopic(zkClient, "test2", 1, 4, "0:1:2:3")
   }
 
   override def tearDown() {
-    super.tearDown()
     brokers.foreach(_.shutdown())
+    super.tearDown()
   }
 
   def testControllerFailOver(){
@@ -48,36 +51,37 @@ class ControllerBasicTest extends JUnit3
     brokers(1).shutdown()
     brokers(3).shutdown()
     assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
-      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime))
-    var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime))
+    var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
     assertEquals("Controller should move to broker 2", "2", curController)
 
+
     brokers(1).startup()
     brokers(2).shutdown()
     assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
-      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime))
-    curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+      ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime))
+    curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
     assertEquals("Controller should move to broker 1", "1", curController)
   }
 
   def testControllerCommandSend(){
     for(broker <- brokers){
       if(broker.kafkaController.isActive){
-        val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()
-        val stopReplicaRequest = ControllerTestUtils.createSampleStopReplicaRequest()
+        val leaderAndISRRequest = ControllerTestUtils.createTestLeaderAndISRRequest()
+        val stopReplicaRequest = ControllerTestUtils.createTestStopReplicaRequest()
 
         val successCount: AtomicInteger = new AtomicInteger(0)
         val countDownLatch: CountDownLatch = new CountDownLatch(8)
 
         def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
-          val expectedResponse = ControllerTestUtils.createSampleLeaderAndISRResponse()
+          val expectedResponse = ControllerTestUtils.createTestLeaderAndISRResponse()
           if(response.equals(expectedResponse))
             successCount.addAndGet(1)
           countDownLatch.countDown()
         }
 
         def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
-          val expectedResponse = ControllerTestUtils.createSampleStopReplicaResponse()
+          val expectedResponse = ControllerTestUtils.createTestStopReplicaResponse()
           if(response.equals(expectedResponse))
             successCount.addAndGet(1)
           countDownLatch.countDown()
@@ -87,10 +91,10 @@ class ControllerBasicTest extends JUnit3
         broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
         broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
         broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(0, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(1, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(2, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
-        broker.kafkaController.sendRequest(3, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+        broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
         countDownLatch.await()
 
         assertEquals(successCount.get(), 8)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Wed Aug  1 16:13:59 2012
@@ -57,7 +57,7 @@ class BackwardsCompatibilityTest extends
   // test for reading data with magic byte 0
   def testProtocolVersion0() {
     CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
-    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
     var fetchOffset: Long = 0L
     var messageCount: Int = 0

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Wed Aug  1 16:13:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -35,27 +35,28 @@ import kafka.admin.CreateTopicCommand
 class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
 
   val numNodes = 1
-  val configs = 
+  val configs =
     for(props <- TestUtils.createBrokerConfigs(numNodes))
-      yield new KafkaConfig(props)
+    yield new KafkaConfig(props)
   val messages = new mutable.HashMap[Int, Seq[Message]]
   val topic = "topic"
   val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
   val shutdown = ZookeeperConsumerConnector.shutdownCommand
   val queue = new LinkedBlockingQueue[FetchedDataChunk]
   val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
-                                                      c.brokerId,
-                                                      0,
-                                                      queue, 
-                                                      new AtomicLong(0), 
-                                                      new AtomicLong(0), 
-                                                      new AtomicInteger(0)))
-  
+                                                           c.brokerId,
+                                                           0,
+                                                           queue,
+                                                           new AtomicLong(0),
+                                                           new AtomicLong(0),
+                                                           new AtomicInteger(0)))
+
   var fetcher: ConsumerFetcherManager = null
 
   override def setUp() {
     super.setUp
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
     fetcher.stopAllConnections()
     fetcher.startConnections(topicInfos, cluster)
@@ -65,20 +66,20 @@ class FetcherTest extends JUnit3Suite wi
     fetcher.shutdown()
     super.tearDown
   }
-    
+
   def testFetcher() {
     val perNode = 2
     var count = sendMessages(perNode)
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+
     fetch(count)
     assertQueueEmpty()
     count = sendMessages(perNode)
     fetch(count)
     assertQueueEmpty()
   }
-  
+
   def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
-  
+
   def sendMessages(messagesPerNode: Int): Int = {
     var count = 0
     for(conf <- configs) {
@@ -91,7 +92,7 @@ class FetcherTest extends JUnit3Suite wi
     }
     count
   }
-  
+
   def fetch(expected: Int) {
     var count = 0
     while(true) {
@@ -103,5 +104,5 @@ class FetcherTest extends JUnit3Suite wi
         return
     }
   }
-    
+
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Wed Aug  1 16:13:59 2012
@@ -133,7 +133,7 @@ class LazyInitProducerTest extends JUnit
       builder.addFetch(topic, 0, 0, 10000)
     }
     // wait until leader is elected
-    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500))
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500))
     producer.send(produceList: _*)
 
     // wait a bit for produced message to be available
@@ -158,7 +158,7 @@ class LazyInitProducerTest extends JUnit
       builder.addFetch(topic, 0, 0, 10000)
     }
     // wait until leader is elected
-    topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1500))
+    topics.foreach(topic => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1500))
 
     producer.send(produceList: _*)
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Wed Aug  1 16:13:59 2012
@@ -337,6 +337,7 @@ class PrimitiveApiTest extends JUnit3Sui
     assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
       AdminUtils.getTopicMetaDataFromZK(List(newTopic),
         zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
     val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
     assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
   }
@@ -348,7 +349,7 @@ class PrimitiveApiTest extends JUnit3Sui
   def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
     for( topic <- topics ) {
       CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
-      TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
     }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Wed Aug  1 16:13:59 2012
@@ -117,7 +117,8 @@ class TopicMetadataTest extends JUnit3Su
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
-    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
+    val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null,
+                             null, null, null, 1)
 
     // mock the receive API to return the request buffer as created above
     val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Wed Aug  1 16:13:59 2012
@@ -54,8 +54,8 @@ class ZookeeperConsumerConnectorTest ext
     // send some messages to each broker
     val sentMessages1 = sendMessages(nMessages, "batch1")
 
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
 
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Wed Aug  1 16:13:59 2012
@@ -118,7 +118,7 @@ class LogOffsetTest extends JUnit3Suite 
 
     // setup brokers in zookeeper as owners of partitions for this test
     CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
-    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
 
     var offsetChanged = false
     for(i <- 1 to 14) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Wed Aug  1 16:13:59 2012
@@ -27,9 +27,10 @@ import junit.framework.Assert._
 
 class SocketServerTest extends JUnitSuite {
 
-  val server: SocketServer = new SocketServer(port = TestUtils.choosePort, 
-                                              numProcessorThreads = 1, 
-                                              monitoringPeriodSecs = 30, 
+  val server: SocketServer = new SocketServer(0,
+                                              port = TestUtils.choosePort,
+                                              numProcessorThreads = 1,
+                                              monitoringPeriodSecs = 30,
                                               maxQueuedRequests = 50,
                                               maxRequestSize = 50)
   server.startup()

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Aug  1 16:13:59 2012
@@ -101,7 +101,7 @@ class ProducerTest extends JUnit3Suite w
 
     // create topic with 1 partition and await leadership
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
-    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     val producer1 = new Producer[String, String](config1)
     val producer2 = new Producer[String, String](config2)
@@ -153,10 +153,10 @@ class ProducerTest extends JUnit3Suite w
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
-    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
-    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 1, 500)
-    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 2, 500)
-    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 3, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 3, 500)
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
@@ -182,6 +182,8 @@ class ProducerTest extends JUnit3Suite w
 
     // restart server 1
     server1.startup()
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+
     try {
       // cross check if broker 1 got the messages
       val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
@@ -209,7 +211,7 @@ class ProducerTest extends JUnit3Suite w
 
     // create topics in ZK
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
-    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay
     try {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Wed Aug  1 16:13:59 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -159,9 +159,9 @@ class SyncProducerTest extends JUnit3Sui
 
     // #2 - test that we get correct offsets when partition is owned by broker
     CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
-    TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500)
     CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
-    TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
+    TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500)
 
     val response2 = producer.send(request)
     Assert.assertNotNull(response2)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Wed Aug  1 16:13:59 2012
@@ -39,9 +39,11 @@ class HighwatermarkPersistenceTest exten
     val scheduler = new KafkaScheduler(2)
     scheduler.startUp
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
     replicaManager.startup()
-    replicaManager.checkpointHighwaterMarks()
+    replicaManager.startHighWaterMarksCheckPointThread()
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
     var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
     assertEquals(0L, fooPartition0Hw)
     val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
@@ -80,7 +82,7 @@ class HighwatermarkPersistenceTest exten
     val scheduler = new KafkaScheduler(2)
     scheduler.startUp
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
     replicaManager.startup()
     replicaManager.checkpointHighwaterMarks()
     var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Wed Aug  1 16:13:59 2012
@@ -100,7 +100,7 @@ class ISRExpirationTest extends JUnit3Su
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler)
+    val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, null)
     try {
       val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
       // create leader log

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Wed Aug  1 16:13:59 2012
@@ -25,7 +25,6 @@ import junit.framework.Assert._
 import kafka.utils.{ZkUtils, Utils, TestUtils}
 
 class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
-
   val brokerId1 = 0
   val brokerId2 = 1
 
@@ -34,23 +33,23 @@ class LeaderElectionTest extends JUnit3S
 
   val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
   val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-
+  var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   override def setUp() {
     super.setUp()
+    // start both servers
+    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+    servers ++= List(server1, server2)
   }
 
   override def tearDown() {
+    servers.map(server => server.shutdown())
+    servers.map(server => Utils.rm(server.config.logDir))
     super.tearDown()
   }
 
-  def testLeaderElectionWithCreateTopic {
-    var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
-    // start both servers
-    val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
-    val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-
-    servers ++= List(server1, server2)
+  def testLeaderElectionAndEpoch {
     // start 2 brokers
     val topic = "new-topic"
     val partitionId = 0
@@ -59,62 +58,39 @@ class LeaderElectionTest extends JUnit3S
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-    assertTrue("Leader should get elected", leader.isDefined)
+    val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
+    val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    debug("leader Epoc: " + leaderEpoch1)
+    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+    assertTrue("Leader should get elected", leader1.isDefined)
     // NOTE: this is to avoid transient test failures
-    assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+    assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+    assertEquals("First epoch value should be 0", 0, leaderEpoch1)
 
     // kill the server hosting the preferred replica
-    server1.shutdown()
-
-    // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
-    assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
-
-    val leaderPath = zkClient.getChildren(ZkUtils.getTopicPartitionPath(topic, "0"))
-    // bring the preferred replica back
-    servers.head.startup()
-
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-    assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
-
-    // shutdown current leader (broker 1)
     servers.last.shutdown()
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-
-    // test if the leader is the preferred replica
-    assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
-    // shutdown the servers and delete data hosted on them
-    servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
-  }
-
-  // Assuming leader election happens correctly, test if epoch changes as expected
-  def testEpoch() {
-    // keep switching leaders to see if epoch changes correctly
-    val topic = "new-topic"
-    val partitionId = 0
-
-    // setup 2 brokers in ZK
-    val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
-
-    // create topic with 1 partition, 2 replicas, one on each broker
-    CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
-
-    var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
-    assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
-    assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
-
-    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
-    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
-    assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
-    assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
-
-    ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
-    newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
-    assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
-    assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
-
-    TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
+    // check if leader moves to the other server
+    val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader1.get == 0) None else leader1)
+    val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+    debug("leader Epoc: " + leaderEpoch2)
+    assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
+    if(leader1.get == leader2.get)
+      assertEquals("Second epoch value should be " + leaderEpoch1, leaderEpoch1, leaderEpoch2)
+    else
+      assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
+
+    servers.last.startup()
+    servers.head.shutdown()
+    Thread.sleep(zookeeper.tickTime)
+    val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader2.get == 1) None else leader2)
+    val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+    debug("leader Epoc: " + leaderEpoch3)
+    debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
+    assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
+    if(leader2.get == leader3.get)
+      assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
+    else
+      assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Wed Aug  1 16:13:59 2012
@@ -57,7 +57,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -89,7 +89,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -103,13 +103,13 @@ class LogRecoveryTest extends JUnit3Suit
     assertEquals(30L, hwFile1.read(topic, 0))
 
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     // bring the preferred replica back
     server1.startup()
 
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
     assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
 
     assertEquals(30L, hwFile1.read(topic, 0))
@@ -118,7 +118,7 @@ class LogRecoveryTest extends JUnit3Suit
     assertEquals(30L, hwFile2.read(topic, 0))
 
     server2.startup()
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
     assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
 
     sendMessages()
@@ -159,7 +159,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -203,7 +203,7 @@ class LogRecoveryTest extends JUnit3Suit
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
 
     // wait until leader is elected
-    var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -220,7 +220,7 @@ class LogRecoveryTest extends JUnit3Suit
 
     server2.startup()
     // check if leader moves to the other server
-    leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+    leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
     assertEquals(60L, hwFile1.read(topic, 0))

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Wed Aug  1 16:13:59 2012
@@ -51,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Sui
     // create a topic and partition and await leadership
     for (topic <- List(topic1,topic2)) {
       CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
-      TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
     }
 
     // send test messages to leader

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Wed Aug  1 16:13:59 2012
@@ -73,7 +73,7 @@ class RequestPurgatoryTest {
     assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
   }
   
-  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
+  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest]("Mock Request Purgatory") {
     val satisfied = mutable.Set[DelayedRequest]()
     val expired = mutable.Set[DelayedRequest]()
     def awaitExpiration(delayed: DelayedRequest) = {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Wed Aug  1 16:13:59 2012
@@ -72,7 +72,7 @@ class ServerShutdownTest extends JUnit3S
       val server = new KafkaServer(config)
       server.startup()
 
-      waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+      waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
 
       var fetchedMessage: ByteBufferMessageSet = null
       while(fetchedMessage == null || fetchedMessage.validBytes == 0) {