You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by pr...@apache.org on 2012/06/18 03:17:45 UTC

svn commit: r1351188 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/admin/ test/scala/unit/kafka/consumer/ test/scala/unit/kafka/producer/ test/scala/unit/kafk...

Author: prashanthmenon
Date: Mon Jun 18 01:17:44 2012
New Revision: 1351188

URL: http://svn.apache.org/viewvc?rev=1351188&view=rev
Log:
Create topic support (revisit based on v3 design); patched by Prashanth Menon; reviewed by Jun Rao; KAFKA-329

Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Mon Jun 18 01:17:44 2012
@@ -18,12 +18,12 @@
 package kafka.admin
 
 import java.util.Random
-import org.I0Itec.zkclient.ZkClient
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
 import kafka.api.{TopicMetadata, PartitionMetadata}
-import kafka.utils.{Logging, SystemTime, Utils, ZkUtils}
 import kafka.cluster.Broker
-import collection.mutable.HashMap
+import kafka.utils.{Logging, Utils, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import scala.collection.mutable
 
 object AdminUtils extends Logging {
   val rand = new Random
@@ -49,7 +49,7 @@ object AdminUtils extends Logging {
    */
   def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
           fixedStartIndex: Int = -1)  // for testing only
-    : Array[List[String]] = {
+    : Map[Int, List[String]] = {
     if (nPartitions <= 0)
       throw new AdministrationException("number of partitions must be larger than 0")
     if (replicationFactor <= 0)
@@ -57,7 +57,7 @@ object AdminUtils extends Logging {
     if (replicationFactor > brokerList.size)
       throw new AdministrationException("replication factor: " + replicationFactor +
               " larger than available brokers: " + brokerList.size)
-    val ret = new Array[List[String]](nPartitions)
+    val ret = new mutable.HashMap[Int, List[String]]()
     val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
 
     var secondReplicaShift = -1
@@ -68,47 +68,40 @@ object AdminUtils extends Logging {
       var replicaList = List(brokerList(firstReplicaIndex))
       for (j <- 0 until replicationFactor - 1)
         replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
-      ret(i) = replicaList.reverse
+      ret.put(i, replicaList.reverse)
     }
-    ret
+    ret.toMap
   }
 
-  def createReplicaAssignmentPathInZK(topic: String, replicaAssignmentList: Seq[List[String]], zkClient: ZkClient) {
+  def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, List[String]], zkClient: ZkClient) {
     try {
-      val topicVersion = SystemTime.milliseconds
-      ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
-      for (i <- 0 until replicaAssignmentList.size) {
-        val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
-        ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
-        debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
-      }
-    }
-    catch {
-      case e: ZkNodeExistsException =>
-        throw new AdministrationException("topic " + topic + " already exists, with version "
-          + ZkUtils.getTopicVersion (zkClient, topic))
-      case e2 =>
-        throw new AdministrationException(e2.toString)      
+      val zkPath = ZkUtils.getTopicPath(topic)
+      val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2)))
+      ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
+      debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
+    } catch {
+      case e: ZkNodeExistsException => throw new AdministrationException("topic %s already exists".format(topic))
+      case e2 => throw new AdministrationException(e2.toString)
     }
   }
 
   def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
-    val cachedBrokerInfo = new HashMap[Int, Broker]()
-
-    val metadataList = topics.map { topic =>
+    val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
+    topics.map { topic =>
       if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
-        val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic)
-        val partitionMetadata = new Array[PartitionMetadata](partitions.size)
+        val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
+        val sortedPartitions = topicPartitionAssignment.toList.sortWith( (m1,m2) => m1._1.toInt < m2._1.toInt )
 
-        for (i <-0 until partitionMetadata.size) {
-          val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
-          val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i))
-          val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
+        val partitionMetadata = sortedPartitions.map { partitionMap =>
+          val partition = partitionMap._1.toInt
+          val replicas = partitionMap._2
+          val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
+          val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
           debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
 
-          partitionMetadata(i) = new PartitionMetadata(partitions(i),
-            leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
-            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
+          new PartitionMetadata(partition,
+            leader.map(l => getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head),
+            getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)),
             getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
             None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
         }
@@ -117,7 +110,6 @@ object AdminUtils extends Logging {
         None
       }
     }
-    metadataList.toList
   }
 
   private def getBrokerInfoFromCache(zkClient: ZkClient,

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Mon Jun 18 01:17:44 2012
@@ -18,8 +18,9 @@
 package kafka.admin
 
 import joptsimple.OptionParser
-import org.I0Itec.zkclient.ZkClient
 import kafka.utils.{Logging, Utils, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import scala.collection.mutable
 
 object CreateTopicCommand extends Logging {
 
@@ -71,13 +72,11 @@ object CreateTopicCommand extends Loggin
       zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
       createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
       println("creation succeeded!")
-    }
-    catch {
+    } catch {
       case e =>
         println("creation failed because of " + e.getMessage)
         println(Utils.stackTrace(e))
-    }
-    finally {
+    } finally {
       if (zkClient != null)
         zkClient.close()
     }
@@ -85,19 +84,19 @@ object CreateTopicCommand extends Loggin
 
   def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
     val brokerList = ZkUtils.getSortedBrokerList(zkClient)
-    var replicaAssignment: Seq[List[String]] = null
 
-    if (replicaAssignmentStr == "")
-      replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
+    val partitionReplicaAssignment = if (replicaAssignmentStr == "")
+      AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
     else
-      replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
-    debug("Replica assignment list for %s is %s".format(topic, replicaAssignment))
-    AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+      getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+
+    debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
   }
 
-  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
+  def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Map[Int, List[String]] = {
     val partitionList = replicaAssignmentList.split(",")
-    val ret = new Array[List[String]](partitionList.size)
+    val ret = new mutable.HashMap[Int, List[String]]()
     for (i <- 0 until partitionList.size) {
       val brokerList = partitionList(i).split(":").map(s => s.trim())
       if (brokerList.size <= 0)
@@ -107,10 +106,10 @@ object CreateTopicCommand extends Loggin
       if (!brokerList.toSet.subsetOf(availableBrokerList))
         throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
                 "available broker:" + availableBrokerList.toString)
-      ret(i) = brokerList.toList
+      ret.put(i, brokerList.toList)
       if (ret(i).size != ret(0).size)
         throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
     }
-    ret
+    ret.toMap
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Jun 18 01:17:44 2012
@@ -329,8 +329,7 @@ class KafkaApis(val requestChannel: Requ
         case None =>
           /* check if auto creation of topics is turned on */
           if(config.autoCreateTopics) {
-            CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
-              config.defaultReplicationFactor)
+            CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
             info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
               .format(topic, config.numPartitions, config.defaultReplicationFactor))
             val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Mon Jun 18 01:17:44 2012
@@ -133,9 +133,8 @@ class KafkaZooKeeper(config: KafkaConfig
   def handleNewTopics(topics: Seq[String]) {
     // get relevant partitions to this broker
     val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
-    topicsAndPartitionsOnThisBroker.foreach { tp =>
-      val topic = tp._1
-      val partitionsAssignedToThisBroker = tp._2
+    debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+    for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
       // subscribe to leader changes for these partitions
       subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
       // start replicas for these partitions
@@ -143,37 +142,19 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
-  def handleNewPartitions(topic: String, partitions: Seq[Int]) {
-    info("Handling topic %s partitions %s".format(topic, partitions.mkString(",")))
-    // find the partitions relevant to this broker
-    val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topic, partitions, config.brokerId)
-    info("Partitions assigned to broker %d for topic %s are %s"
-      .format(config.brokerId, topic, partitionsAssignedToThisBroker.mkString(",")))
-
-    // subscribe to leader changes for these partitions
-    subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
-    // start replicas for these partitions
-    startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
-  }
-
   def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
     info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
     zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
     val topics = ZkUtils.getAllTopics(zkClient)
-    debug("Existing topics are %s".format(topics.mkString(",")))
-    topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
+    val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+    debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+    for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
+      // subscribe to leader changes for these partitions
+      subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
 
-    val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
-    debug("Partitions assigned to broker %d are %s".format(config.brokerId, partitionsAssignedToThisBroker.mkString(",")))
-    partitionsAssignedToThisBroker.foreach { tp =>
-      val topic = tp._1
-      val partitions = tp._2.map(p => p.toInt)
-      partitions.foreach { partition =>
-          // register leader change listener
-        zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
-      }
+      // start replicas for these partitions
       if(startReplicas)
-        startReplicasForPartitions(topic, partitions)
+        startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
     }
   }
 
@@ -199,11 +180,11 @@ class KafkaZooKeeper(config: KafkaConfig
   }
 
   private def startReplica(replica: Replica) {
-    info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId,
-      replica.brokerId))
+    info("Starting replica for topic %s partition %d on broker %d"
+      .format(replica.topic, replica.partition.partitionId, replica.brokerId))
     ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
-      case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,
-        leader))
+      case Some(leader) =>
+        info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader))
         // check if this broker is the leader, if not, then become follower
         if(leader != config.brokerId)
           becomeFollower(replica, leader, zkClient)
@@ -218,10 +199,9 @@ class KafkaZooKeeper(config: KafkaConfig
     val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
     val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
     val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
-    if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId,
-                       assignedReplicas, inSyncReplicas, liveBrokers)) {
-      info("Broker %d will participate in leader election for topic %s partition %d".format(config.brokerId, replica.topic,
-                                                                                           replica.partition.partitionId))
+    if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) {
+      info("Broker %d will participate in leader election for topic %s partition %d"
+        .format(config.brokerId, replica.topic, replica.partition.partitionId))
       // wait for some time if it is not the preferred replica
       try {
         if(replica.brokerId != assignedReplicas.head) {
@@ -233,7 +213,7 @@ class KafkaZooKeeper(config: KafkaConfig
             Thread.sleep(config.preferredReplicaWaitTime)
           }
         }
-      }catch {
+      } catch {
         case e => // ignoring
       }
       val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
@@ -279,7 +259,7 @@ class KafkaZooKeeper(config: KafkaConfig
                     " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
                       .format(partition, brokerId, assignedReplicas.mkString(",")))
                   true
-                }else {
+                } else {
                   info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
                     " partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
                       .format(partition, brokerId, assignedReplicas.mkString(",")))
@@ -297,7 +277,7 @@ class KafkaZooKeeper(config: KafkaConfig
           info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
             .format(topic, partition, brokerId) + "is part of the assigned replicas list")
           true
-        }else {
+        } else {
           info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
             .format(topic, partition, brokerId) + "is not part of the assigned replicas list")
           false
@@ -310,27 +290,19 @@ class KafkaZooKeeper(config: KafkaConfig
 
     @throws(classOf[Exception])
     def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+      import collection.JavaConversions
       topicListenerLock.synchronized {
         debug("Topic/partition change listener fired for path " + parentPath)
-        import scala.collection.JavaConversions._
-        val currentChildren = asBuffer(curChilds)
+        val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+        val newTopics = currentChildren -- allTopics
+        val deletedTopics = allTopics -- currentChildren
         allTopics.clear()
-        // check if topic has changed or a partition for an existing topic has changed
-        if(parentPath == ZkUtils.BrokerTopicsPath) {
-          val currentTopics = currentChildren
-          debug("New topics " + currentTopics.mkString(","))
-          // for each new topic [topic], watch the path /brokers/topics/[topic]/partitions
-          currentTopics.foreach { topic =>
-            zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this)
-            allTopics += topic
-          }
-          handleNewTopics(currentTopics)
-        }else {
-          val topic = parentPath.split("/").takeRight(2).head
-          debug("Partitions changed for topic %s on broker %d with new value %s"
-            .format(topic, config.brokerId, currentChildren.mkString(",")))
-          handleNewPartitions(topic, currentChildren.map(p => p.toInt).toSeq)
-        }
+        allTopics ++ currentChildren
+
+        debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
+        handleNewTopics(newTopics.toSeq)
+        // TODO: Handle topic deletions
+        //handleDeletedTopics(deletedTopics.toSeq)
       }
     }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Jun 18 01:17:44 2012
@@ -749,6 +749,21 @@ object Utils extends Logging {
     builder.toString
   }
 
+  def mapToJson[T <: Any](map: Map[String, List[String]]): String = {
+    val builder = new StringBuilder
+    builder.append("{ ")
+    var numElements = 0
+    for ( (key, value) <- map ) {
+      if (numElements > 0)
+        builder.append(",")
+      builder.append("\"" + key + "\": ")
+      builder.append("[%s]".format(value.map("\""+_+"\"").mkString(",")))
+      numElements += 1
+    }
+    builder.append(" }")
+    builder.toString
+  }
+
   def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
     for(arg <- required) {
       if(!options.has(arg)) {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Jun 18 01:17:44 2012
@@ -17,15 +17,16 @@
 
 package kafka.utils
 
-import org.I0Itec.zkclient.serialize.ZkSerializer
-import kafka.cluster.{Broker, Cluster}
-import scala.collection._
 import java.util.Properties
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
-import kafka.consumer.TopicCount
-import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
 import java.util.concurrent.locks.Condition
+import kafka.cluster.{Broker, Cluster}
 import kafka.common.NoEpochForPartitionException
+import kafka.consumer.TopicCount
+import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import org.I0Itec.zkclient.serialize.ZkSerializer
+import scala.collection._
+import util.parsing.json.JSON
 
 object ZkUtils extends Logging {
   val ConsumersPath = "/consumers"
@@ -110,7 +111,7 @@ object ZkUtils extends Logging {
       }else {
         throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
       }
-    }catch {
+    } catch {
       case e: ZkNoNodeException =>
         throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
       case e1 => throw e1
@@ -118,15 +119,23 @@ object ZkUtils extends Logging {
     lastKnownEpoch
   }
 
+  /**
+   * Gets the assigned replicas (AR) for a specific topic and partition
+   */
   def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
-    val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
-    if(replicaListString == null)
-      Seq.empty[String]
-    else {
-      Utils.getCSVList(replicaListString)
+    val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator)
+    topicAndPartitionAssignment.get(topic) match {
+      case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match {
+        case Some(replicaList) => replicaList
+        case None => Seq.empty[String]
+      }
+      case None => Seq.empty[String]
     }
   }
 
+  /**
+   * Gets the in-sync replicas (ISR) for a specific topic and partition
+   */
   def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
     val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
     if(replicaListAndEpochString == null)
@@ -225,8 +234,7 @@ object ZkUtils extends Logging {
   private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
     try {
       client.createEphemeral(path, data)
-    }
-    catch {
+    } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
         client.createEphemeral(path, data)
@@ -241,23 +249,20 @@ object ZkUtils extends Logging {
   def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
     try {
       createEphemeralPath(client, path, data)
-    }
-    catch {
+    } catch {
       case e: ZkNodeExistsException => {
         // this can happen when there is connection loss; make sure the data is what we intend to write
         var storedData: String = null
         try {
           storedData = readData(client, path)
-        }
-        catch {
+        } catch {
           case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
           case e2 => throw e2
         }
         if (storedData == null || storedData != data) {
           info("conflict in " + path + " data: " + data + " stored data: " + storedData)
           throw e
-        }
-        else {
+        } else {
           // otherwise, the creation succeeded, return normally
           info(path + " exists with value " + data + " during connection loss; this is ok")
         }
@@ -272,8 +277,7 @@ object ZkUtils extends Logging {
   def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
     try {
       client.createPersistent(path, data)
-    }
-    catch {
+    } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
         client.createPersistent(path, data)
@@ -292,14 +296,12 @@ object ZkUtils extends Logging {
   def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
     try {
       client.writeData(path, data)
-    }
-    catch {
+    } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
         try {
           client.createPersistent(path, data)
-        }
-        catch {
+        } catch {
           case e: ZkNodeExistsException => client.writeData(path, data)
           case e2 => throw e2
         }
@@ -315,8 +317,7 @@ object ZkUtils extends Logging {
   def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
     try {
       client.writeData(path, data)
-    }
-    catch {
+    } catch {
       case e: ZkNoNodeException => {
         createParentPath(client, path)
         client.createEphemeral(path, data)
@@ -328,8 +329,7 @@ object ZkUtils extends Logging {
   def deletePath(client: ZkClient, path: String): Boolean = {
     try {
       client.delete(path)
-    }
-    catch {
+    } catch {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
@@ -341,8 +341,7 @@ object ZkUtils extends Logging {
   def deletePathRecursive(client: ZkClient, path: String) {
     try {
       client.deleteRecursive(path)
-    }
-    catch {
+    } catch {
       case e: ZkNoNodeException =>
         // this can happen during a connection loss event, return normally
         info(path + " deleted during connection loss; this is ok")
@@ -368,16 +367,12 @@ object ZkUtils extends Logging {
     import scala.collection.JavaConversions._
     // triggers implicit conversion from java list to scala Seq
 
-    var ret: java.util.List[String] = null
     try {
-      ret = client.getChildren(path)
-    }
-    catch {
-      case e: ZkNoNodeException =>
-        return Nil
+      client.getChildren(path)
+    } catch {
+      case e: ZkNoNodeException => return Nil
       case e2 => throw e2
     }
-    return ret
   }
 
   /**
@@ -399,35 +394,40 @@ object ZkUtils extends Logging {
     cluster
   }
 
-  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
-    val ret = new mutable.HashMap[String, Seq[String]]()
-    topics.foreach { topic =>
-      // get the partitions that exist for topic
-      val partitions = getChildrenParentMayNotExist(zkClient, getTopicPartitionsPath(topic))
-      debug("children of /brokers/topics/%s are %s".format(topic, partitions))
-      ret += (topic -> partitions.sortWith((s,t) => s < t))
+  def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = {
+    val ret = new mutable.HashMap[String, Map[String, List[String]]]()
+    topics.foreach{ topic =>
+      val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))
+      val partitionMap = if (jsonPartitionMap == null) {
+        Map[String, List[String]]()
+      } else {
+        JSON.parseFull(jsonPartitionMap) match {
+          case Some(m) => m.asInstanceOf[Map[String, List[String]]]
+          case None => Map[String, List[String]]()
+        }
+      }
+      debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
+      ret += (topic -> partitionMap)
     }
     ret
   }
 
-  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
-    val topicsAndPartitions = getPartitionsForTopics(zkClient, topics.iterator)
-
-    topicsAndPartitions.map { tp =>
-      val topic = tp._1
-      val partitions = tp._2.map(p => p.toInt)
-      val relevantPartitions = partitions.filter { partition =>
-        val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
-        assignedReplicas.contains(brokerId)
-      }
-      (topic -> relevantPartitions)
+  def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
+    getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap =>
+      val topic = topicAndPartitionMap._1
+      val partitionMap = topicAndPartitionMap._2
+      debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
+      (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
     }
   }
 
-  def getPartitionsAssignedToBroker(zkClient: ZkClient, topic: String, partitions: Seq[Int], broker: Int): Seq[Int] = {
-    partitions.filter { p =>
-      val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, p).map(r => r.toInt)
-      assignedReplicas.contains(broker)
+  def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
+    val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
+    topicsAndPartitions.map{ topicAndPartitionMap =>
+      val topic = topicAndPartitionMap._1
+      val partitionMap = topicAndPartitionMap._2
+      val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) )
+      (topic -> relevantPartitions.keySet.map(_.toInt).toSeq)
     }
   }
 
@@ -470,14 +470,6 @@ object ZkUtils extends Logging {
     consumersPerTopicMap
   }
 
-  /**
-   * For a given topic, this returns the sorted list of partition ids registered for this topic
-   */
-  def getSortedPartitionIdsForTopic(zkClient: ZkClient, topic: String): Seq[Int] = {
-    val topicPartitionsPath = ZkUtils.getTopicPartitionsPath(topic)
-    ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).map(pid => pid.toInt).sortWith((s,t) => s < t)
-  }
-
   def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
     brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Mon Jun 18 01:17:44 2012
@@ -50,18 +50,18 @@ class AdminTest extends JUnit3Suite with
 
     // correct assignment
     {
-      val expectedAssignment = Array(
-        List("0", "1", "2"),
-        List("1", "2", "3"),
-        List("2", "3", "4"),
-        List("3", "4", "0"),
-        List("4", "0", "1"),
-        List("0", "2", "3"),
-        List("1", "3", "4"),
-        List("2", "4", "0"),
-        List("3", "0", "1"),
-        List("4", "1", "2")
-        )
+      val expectedAssignment = Map(
+        0 -> List("0", "1", "2"),
+        1 -> List("1", "2", "3"),
+        2 -> List("2", "3", "4"),
+        3 -> List("3", "4", "0"),
+        4 -> List("4", "0", "1"),
+        5 -> List("0", "2", "3"),
+        6 -> List("1", "3", "4"),
+        7 -> List("2", "4", "0"),
+        8 -> List("3", "0", "1"),
+        9 -> List("4", "1", "2")
+      )
 
       val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
       val e = (expectedAssignment.toList == actualAssignment.toList)
@@ -109,46 +109,51 @@ class AdminTest extends JUnit3Suite with
     // good assignment
     {
       val replicationAssignmentStr = "0:1:2,1:2:3"
-      val expectedReplicationAssignment = Array(
-        List("0", "1", "2"),
-        List("1", "2", "3")
+      val expectedReplicationAssignment = Map(
+        0 -> List("0", "1", "2"),
+        1 -> List("1", "2", "3")
       )
       val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
-      assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+      assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
+      for( (part, replicas) <- expectedReplicationAssignment ) {
+        assertEquals(replicas, actualReplicationAssignment(part))
+      }
     }
   }
 
   @Test
   def testTopicCreationInZK() {
-    val expectedReplicaAssignment = Array(
-      List("0", "1", "2"),
-      List("1", "2", "3"),
-      List("2", "3", "4"),
-      List("3", "4", "0"),
-      List("4", "0", "1"),
-      List("0", "2", "3"),
-      List("1", "3", "4"),
-      List("2", "4", "0"),
-      List("3", "0", "1"),
-      List("4", "1", "2"),
-      List("1", "2", "3"),
-      List("1", "3", "4")      
-      )
+    val expectedReplicaAssignment = Map(
+      0  -> List("0", "1", "2"),
+      1  -> List("1", "2", "3"),
+      2  -> List("2", "3", "4"),
+      3  -> List("3", "4", "0"),
+      4  -> List("4", "0", "1"),
+      5  -> List("0", "2", "3"),
+      6  -> List("1", "3", "4"),
+      7  -> List("2", "4", "0"),
+      8  -> List("3", "0", "1"),
+      9  -> List("4", "1", "2"),
+      10 -> List("1", "2", "3"),
+      11 -> List("1", "3", "4")
+    )
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
 
     val topic = "test"
     // create the topic
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
     val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
                                   .get.partitionsMetadata.map(p => p.replicas)
     val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
-    expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
+    assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
+    for( i <- 0 until actualReplicaList.size ) {
+      assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
+    }
 
     try {
-      AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+      AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
       fail("shouldn't be able to create a topic already exists")
-    }
-    catch {
+    } catch {
       case e: AdministrationException => // this is good
       case e2 => throw e2
     }
@@ -156,22 +161,26 @@ class AdminTest extends JUnit3Suite with
 
   @Test
   def testGetTopicMetadata() {
-    val expectedReplicaAssignment = Array(
-      List("0", "1", "2"),
-      List("1", "2", "3")
+    val expectedReplicaAssignment = Map(
+      0 -> List("0", "1", "2"),
+      1 -> List("1", "2", "3")
     )
     val topic = "auto-topic"
     TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
-    AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+    AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
 
     val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
     newTopicMetadata match {
-      case Some(metadata) => assertEquals(topic, metadata.topic)
+      case Some(metadata) =>
+        assertEquals(topic, metadata.topic)
         assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
         assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
         val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
         val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
-        assertEquals(expectedReplicaAssignment.toList, actualReplicaList)
+        assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
+        for(i <- 0 until actualReplicaList.size) {
+          assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
+        }
       case None => fail("Topic " + topic + " should've been automatically created")
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Mon Jun 18 01:17:44 2012
@@ -81,8 +81,7 @@ class ZookeeperConsumerConnectorTest ext
       try {
         getMessages(nMessages*2, topicMessageStreams0)
         fail("should get an exception")
-      }
-      catch {
+      } catch {
         case e: ConsumerTimeoutException => // this is ok
         case e => throw e
       }
@@ -90,15 +89,15 @@ class ZookeeperConsumerConnectorTest ext
 
     zkConsumerConnector0.shutdown
 
-    // wait to make sure the topic and partition have a leader for the successful case
-    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
-    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
-
     // send some messages to each broker
     val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
     val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
     val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
 
+    // wait to make sure the topic and partition have a leader for the successful case
+    waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+    waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+
     // create a consumer
     val consumerConfig1 = new ConsumerConfig(
       TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -141,7 +140,7 @@ class ZookeeperConsumerConnectorTest ext
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_2 = List( ("0", "group1_consumer1-0"),
                            ("1", "group1_consumer2-0"))
-   assertEquals(expected_2, actual_2)
+    assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
     val consumerConfig3 = new ConsumerConfig(
@@ -165,7 +164,7 @@ class ZookeeperConsumerConnectorTest ext
 
     // also check partition ownership
     val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
-   assertEquals(expected_2, actual_3)
+    assertEquals(expected_2, actual_3)
 
     zkConsumerConnector1.shutdown
     zkConsumerConnector2.shutdown
@@ -199,7 +198,7 @@ class ZookeeperConsumerConnectorTest ext
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_1 = List( ("0", "group1_consumer1-0"),
                            ("1", "group1_consumer1-0"))
-   assertEquals(expected_1, actual_1)
+    assertEquals(expected_1, actual_1)
 
     // commit consumed offsets
     zkConsumerConnector1.commitOffsets
@@ -227,7 +226,7 @@ class ZookeeperConsumerConnectorTest ext
     val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_2 = List( ("0", "group1_consumer1-0"),
                            ("1", "group1_consumer2-0"))
-   assertEquals(expected_2, actual_2)
+    assertEquals(expected_2, actual_2)
 
     // create a consumer with empty map
     val consumerConfig3 = new ConsumerConfig(
@@ -251,7 +250,7 @@ class ZookeeperConsumerConnectorTest ext
 
     // also check partition ownership
     val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
-   assertEquals(expected_2, actual_3)
+    assertEquals(expected_2, actual_3)
 
     zkConsumerConnector1.shutdown
     zkConsumerConnector2.shutdown
@@ -264,8 +263,6 @@ class ZookeeperConsumerConnectorTest ext
     val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
     requestHandlerLogger.setLevel(Level.FATAL)
 
-    var actualMessages: List[Message] = Nil
-
     // shutdown one server
     servers.last.shutdown
     Thread.sleep(500)
@@ -288,7 +285,7 @@ class ZookeeperConsumerConnectorTest ext
     val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
     val expected_1 = List( ("0", "group1_consumer0-0"),
                            ("1", "group1_consumer0-0"))
-   assertEquals(expected_1, actual_1)
+    assertEquals(expected_1, actual_1)
 
     zkConsumerConnector0.shutdown
     // at this point, only some part of the message set was consumed. So consumed offset should still be 0
@@ -361,8 +358,7 @@ class ZookeeperConsumerConnectorTest ext
     val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
 
     // create a consumer
-    val consumerConfig1 = new ConsumerConfig(
-      TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+    val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
     val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
     val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
     val topicRegistry = zkConsumerConnector1.getTopicRegistry

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Jun 18 01:17:44 2012
@@ -91,8 +91,11 @@ class ProducerTest extends JUnit3Suite w
     props.put("zk.connect", TestZKUtils.zookeeperConnect)
 
     val config = new ProducerConfig(props)
-    // create topic with 1 partition
+
+    // create topic with 1 partition and await leadership
     CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+
     val producer = new Producer[String, String](config)
     try {
       // Available partition ids should be 0.
@@ -132,6 +135,10 @@ class ProducerTest extends JUnit3Suite w
 
     // create topic
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 1, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 2, 500)
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 3, 500)
 
     val config = new ProducerConfig(props)
     val producer = new Producer[String, String](config)
@@ -189,6 +196,7 @@ class ProducerTest extends JUnit3Suite w
 
     // create topics in ZK
     CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+    TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
 
     // do a simple test to make sure plumbing is okay
     try {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Mon Jun 18 01:17:44 2012
@@ -52,8 +52,9 @@ class ReplicaFetchTest extends JUnit3Sui
     val followerBrokerId = configs.last.brokerId
     val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
 
-    // create a topic and partition
+    // create a topic and partition and await leadership
     CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+    TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
 
     // send test messages to leader
     val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)