You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by pr...@apache.org on 2012/06/18 03:17:45 UTC
svn commit: r1351188 - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/server/ main/scala/kafka/utils/
test/scala/unit/kafka/admin/ test/scala/unit/kafka/consumer/
test/scala/unit/kafka/producer/ test/scala/unit/kafk...
Author: prashanthmenon
Date: Mon Jun 18 01:17:44 2012
New Revision: 1351188
URL: http://svn.apache.org/viewvc?rev=1351188&view=rev
Log:
Create topic support (revisit based on v3 design); patched by Prashanth Menon; reviewed by Jun Rao; KAFKA-329
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/AdminUtils.scala Mon Jun 18 01:17:44 2012
@@ -18,12 +18,12 @@
package kafka.admin
import java.util.Random
-import org.I0Itec.zkclient.ZkClient
-import org.I0Itec.zkclient.exception.ZkNodeExistsException
import kafka.api.{TopicMetadata, PartitionMetadata}
-import kafka.utils.{Logging, SystemTime, Utils, ZkUtils}
import kafka.cluster.Broker
-import collection.mutable.HashMap
+import kafka.utils.{Logging, Utils, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import org.I0Itec.zkclient.exception.ZkNodeExistsException
+import scala.collection.mutable
object AdminUtils extends Logging {
val rand = new Random
@@ -49,7 +49,7 @@ object AdminUtils extends Logging {
*/
def assignReplicasToBrokers(brokerList: Seq[String], nPartitions: Int, replicationFactor: Int,
fixedStartIndex: Int = -1) // for testing only
- : Array[List[String]] = {
+ : Map[Int, List[String]] = {
if (nPartitions <= 0)
throw new AdministrationException("number of partitions must be larger than 0")
if (replicationFactor <= 0)
@@ -57,7 +57,7 @@ object AdminUtils extends Logging {
if (replicationFactor > brokerList.size)
throw new AdministrationException("replication factor: " + replicationFactor +
" larger than available brokers: " + brokerList.size)
- val ret = new Array[List[String]](nPartitions)
+ val ret = new mutable.HashMap[Int, List[String]]()
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
var secondReplicaShift = -1
@@ -68,47 +68,40 @@ object AdminUtils extends Logging {
var replicaList = List(brokerList(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaList ::= brokerList(getWrappedIndex(firstReplicaIndex, secondReplicaShift, j, brokerList.size))
- ret(i) = replicaList.reverse
+ ret.put(i, replicaList.reverse)
}
- ret
+ ret.toMap
}
- def createReplicaAssignmentPathInZK(topic: String, replicaAssignmentList: Seq[List[String]], zkClient: ZkClient) {
+ def createTopicPartitionAssignmentPathInZK(topic: String, replicaAssignment: Map[Int, List[String]], zkClient: ZkClient) {
try {
- val topicVersion = SystemTime.milliseconds
- ZkUtils.createPersistentPath(zkClient, ZkUtils.BrokerTopicsPath + "/" + topic, topicVersion.toString)
- for (i <- 0 until replicaAssignmentList.size) {
- val zkPath = ZkUtils.getTopicPartitionReplicasPath(topic, i.toString)
- ZkUtils.updatePersistentPath(zkClient, zkPath, Utils.seqToCSV(replicaAssignmentList(i)))
- debug("Updated path %s with %s for replica assignment".format(zkPath, Utils.seqToCSV(replicaAssignmentList(i))))
- }
- }
- catch {
- case e: ZkNodeExistsException =>
- throw new AdministrationException("topic " + topic + " already exists, with version "
- + ZkUtils.getTopicVersion (zkClient, topic))
- case e2 =>
- throw new AdministrationException(e2.toString)
+ val zkPath = ZkUtils.getTopicPath(topic)
+ val jsonPartitionMap = Utils.mapToJson(replicaAssignment.map(e => (e._1.toString -> e._2)))
+ ZkUtils.createPersistentPath(zkClient, zkPath, jsonPartitionMap)
+ debug("Updated path %s with %s for replica assignment".format(zkPath, jsonPartitionMap))
+ } catch {
+ case e: ZkNodeExistsException => throw new AdministrationException("topic %s already exists".format(topic))
+ case e2 => throw new AdministrationException(e2.toString)
}
}
def getTopicMetaDataFromZK(topics: Seq[String], zkClient: ZkClient): Seq[Option[TopicMetadata]] = {
- val cachedBrokerInfo = new HashMap[Int, Broker]()
-
- val metadataList = topics.map { topic =>
+ val cachedBrokerInfo = new mutable.HashMap[Int, Broker]()
+ topics.map { topic =>
if (ZkUtils.pathExists(zkClient, ZkUtils.getTopicPath(topic))) {
- val partitions = ZkUtils.getSortedPartitionIdsForTopic(zkClient, topic)
- val partitionMetadata = new Array[PartitionMetadata](partitions.size)
+ val topicPartitionAssignment = ZkUtils.getPartitionAssignmentForTopics(zkClient, List(topic).iterator).get(topic).get
+ val sortedPartitions = topicPartitionAssignment.toList.sortWith( (m1,m2) => m1._1.toInt < m2._1.toInt )
- for (i <-0 until partitionMetadata.size) {
- val replicas = ZkUtils.readData(zkClient, ZkUtils.getTopicPartitionReplicasPath(topic, partitions(i).toString))
- val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partitions(i))
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partitions(i))
+ val partitionMetadata = sortedPartitions.map { partitionMap =>
+ val partition = partitionMap._1.toInt
+ val replicas = partitionMap._2
+ val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, topic, partition)
+ val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
debug("replicas = " + replicas + ", in sync replicas = " + inSyncReplicas + ", leader = " + leader)
- partitionMetadata(i) = new PartitionMetadata(partitions(i),
- leader match { case None => None case Some(l) => Some(getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l.toInt)).head) },
- getBrokerInfoFromCache(zkClient, cachedBrokerInfo, Utils.getCSVList(replicas).map(id => id.toInt)),
+ new PartitionMetadata(partition,
+ leader.map(l => getBrokerInfoFromCache(zkClient, cachedBrokerInfo, List(l)).head),
+ getBrokerInfoFromCache(zkClient, cachedBrokerInfo, replicas.map(id => id.toInt)),
getBrokerInfoFromCache(zkClient, cachedBrokerInfo, inSyncReplicas),
None /* Return log segment metadata when getOffsetsBefore will be replaced with this API */)
}
@@ -117,7 +110,6 @@ object AdminUtils extends Logging {
None
}
}
- metadataList.toList
}
private def getBrokerInfoFromCache(zkClient: ZkClient,
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/admin/CreateTopicCommand.scala Mon Jun 18 01:17:44 2012
@@ -18,8 +18,9 @@
package kafka.admin
import joptsimple.OptionParser
-import org.I0Itec.zkclient.ZkClient
import kafka.utils.{Logging, Utils, ZKStringSerializer, ZkUtils}
+import org.I0Itec.zkclient.ZkClient
+import scala.collection.mutable
object CreateTopicCommand extends Logging {
@@ -71,13 +72,11 @@ object CreateTopicCommand extends Loggin
zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer)
createTopic(zkClient, topic, nPartitions, replicationFactor, replicaAssignmentStr)
println("creation succeeded!")
- }
- catch {
+ } catch {
case e =>
println("creation failed because of " + e.getMessage)
println(Utils.stackTrace(e))
- }
- finally {
+ } finally {
if (zkClient != null)
zkClient.close()
}
@@ -85,19 +84,19 @@ object CreateTopicCommand extends Loggin
def createTopic(zkClient: ZkClient, topic: String, numPartitions: Int = 1, replicationFactor: Int = 1, replicaAssignmentStr: String = "") {
val brokerList = ZkUtils.getSortedBrokerList(zkClient)
- var replicaAssignment: Seq[List[String]] = null
- if (replicaAssignmentStr == "")
- replicaAssignment = AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
+ val partitionReplicaAssignment = if (replicaAssignmentStr == "")
+ AdminUtils.assignReplicasToBrokers(brokerList, numPartitions, replicationFactor)
else
- replicaAssignment = getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
- debug("Replica assignment list for %s is %s".format(topic, replicaAssignment))
- AdminUtils.createReplicaAssignmentPathInZK(topic, replicaAssignment, zkClient)
+ getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet)
+
+ debug("Replica assignment list for %s is %s".format(topic, partitionReplicaAssignment))
+ AdminUtils.createTopicPartitionAssignmentPathInZK(topic, partitionReplicaAssignment, zkClient)
}
- def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Array[List[String]] = {
+ def getManualReplicaAssignment(replicaAssignmentList: String, availableBrokerList: Set[String]): Map[Int, List[String]] = {
val partitionList = replicaAssignmentList.split(",")
- val ret = new Array[List[String]](partitionList.size)
+ val ret = new mutable.HashMap[Int, List[String]]()
for (i <- 0 until partitionList.size) {
val brokerList = partitionList(i).split(":").map(s => s.trim())
if (brokerList.size <= 0)
@@ -107,10 +106,10 @@ object CreateTopicCommand extends Loggin
if (!brokerList.toSet.subsetOf(availableBrokerList))
throw new AdministrationException("some specified brokers not available. specified brokers: " + brokerList.toString +
"available broker:" + availableBrokerList.toString)
- ret(i) = brokerList.toList
+ ret.put(i, brokerList.toList)
if (ret(i).size != ret(0).size)
throw new AdministrationException("partition " + i + " has different replication factor: " + brokerList)
}
- ret
+ ret.toMap
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Jun 18 01:17:44 2012
@@ -329,8 +329,7 @@ class KafkaApis(val requestChannel: Requ
case None =>
/* check if auto creation of topics is turned on */
if(config.autoCreateTopics) {
- CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions,
- config.defaultReplicationFactor)
+ CreateTopicCommand.createTopic(zkClient, topic, config.numPartitions, config.defaultReplicationFactor)
info("Auto creation of topic %s with %d partitions and replication factor %d is successful!"
.format(topic, config.numPartitions, config.defaultReplicationFactor))
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Mon Jun 18 01:17:44 2012
@@ -133,9 +133,8 @@ class KafkaZooKeeper(config: KafkaConfig
def handleNewTopics(topics: Seq[String]) {
// get relevant partitions to this broker
val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
- topicsAndPartitionsOnThisBroker.foreach { tp =>
- val topic = tp._1
- val partitionsAssignedToThisBroker = tp._2
+ debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+ for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
// subscribe to leader changes for these partitions
subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
// start replicas for these partitions
@@ -143,37 +142,19 @@ class KafkaZooKeeper(config: KafkaConfig
}
}
- def handleNewPartitions(topic: String, partitions: Seq[Int]) {
- info("Handling topic %s partitions %s".format(topic, partitions.mkString(",")))
- // find the partitions relevant to this broker
- val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topic, partitions, config.brokerId)
- info("Partitions assigned to broker %d for topic %s are %s"
- .format(config.brokerId, topic, partitionsAssignedToThisBroker.mkString(",")))
-
- // subscribe to leader changes for these partitions
- subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
- // start replicas for these partitions
- startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
- }
-
def subscribeToTopicAndPartitionsChanges(startReplicas: Boolean) {
info("Subscribing to %s path to watch for new topics".format(ZkUtils.BrokerTopicsPath))
zkClient.subscribeChildChanges(ZkUtils.BrokerTopicsPath, topicPartitionsChangeListener)
val topics = ZkUtils.getAllTopics(zkClient)
- debug("Existing topics are %s".format(topics.mkString(",")))
- topics.foreach(topic => zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), topicPartitionsChangeListener))
+ val topicsAndPartitionsOnThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
+ debug("Partitions assigned to broker %d are %s".format(config.brokerId, topicsAndPartitionsOnThisBroker.mkString(",")))
+ for( (topic, partitionsAssignedToThisBroker) <- topicsAndPartitionsOnThisBroker ) {
+ // subscribe to leader changes for these partitions
+ subscribeToLeaderForPartitions(topic, partitionsAssignedToThisBroker)
- val partitionsAssignedToThisBroker = ZkUtils.getPartitionsAssignedToBroker(zkClient, topics, config.brokerId)
- debug("Partitions assigned to broker %d are %s".format(config.brokerId, partitionsAssignedToThisBroker.mkString(",")))
- partitionsAssignedToThisBroker.foreach { tp =>
- val topic = tp._1
- val partitions = tp._2.map(p => p.toInt)
- partitions.foreach { partition =>
- // register leader change listener
- zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString), leaderChangeListener)
- }
+ // start replicas for these partitions
if(startReplicas)
- startReplicasForPartitions(topic, partitions)
+ startReplicasForPartitions(topic, partitionsAssignedToThisBroker)
}
}
@@ -199,11 +180,11 @@ class KafkaZooKeeper(config: KafkaConfig
}
private def startReplica(replica: Replica) {
- info("Starting replica for topic %s partition %d on broker %d".format(replica.topic, replica.partition.partitionId,
- replica.brokerId))
+ info("Starting replica for topic %s partition %d on broker %d"
+ .format(replica.topic, replica.partition.partitionId, replica.brokerId))
ZkUtils.getLeaderForPartition(zkClient, replica.topic, replica.partition.partitionId) match {
- case Some(leader) => info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,
- leader))
+ case Some(leader) =>
+ info("Topic %s partition %d has leader %d".format(replica.topic, replica.partition.partitionId,leader))
// check if this broker is the leader, if not, then become follower
if(leader != config.brokerId)
becomeFollower(replica, leader, zkClient)
@@ -218,10 +199,9 @@ class KafkaZooKeeper(config: KafkaConfig
val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId).map(_.toInt)
val inSyncReplicas = ZkUtils.getInSyncReplicasForPartition(zkClient, replica.topic, replica.partition.partitionId)
val liveBrokers = ZkUtils.getSortedBrokerList(zkClient).map(_.toInt)
- if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId,
- assignedReplicas, inSyncReplicas, liveBrokers)) {
- info("Broker %d will participate in leader election for topic %s partition %d".format(config.brokerId, replica.topic,
- replica.partition.partitionId))
+ if(canBecomeLeader(config.brokerId, replica.topic, replica.partition.partitionId, assignedReplicas, inSyncReplicas, liveBrokers)) {
+ info("Broker %d will participate in leader election for topic %s partition %d"
+ .format(config.brokerId, replica.topic, replica.partition.partitionId))
// wait for some time if it is not the preferred replica
try {
if(replica.brokerId != assignedReplicas.head) {
@@ -233,7 +213,7 @@ class KafkaZooKeeper(config: KafkaConfig
Thread.sleep(config.preferredReplicaWaitTime)
}
}
- }catch {
+ } catch {
case e => // ignoring
}
val newLeaderEpochAndISR = ZkUtils.tryToBecomeLeaderForPartition(zkClient, replica.topic,
@@ -279,7 +259,7 @@ class KafkaZooKeeper(config: KafkaConfig
" partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
.format(partition, brokerId, assignedReplicas.mkString(",")))
true
- }else {
+ } else {
info("No broker in the ISR %s for topic %s".format(inSyncReplicas.mkString(","), topic) +
" partition %d is alive. Broker %d can become leader since it is in the assigned replicas %s"
.format(partition, brokerId, assignedReplicas.mkString(",")))
@@ -297,7 +277,7 @@ class KafkaZooKeeper(config: KafkaConfig
info("ISR for topic %s partition %d is empty. Broker %d can become leader since it "
.format(topic, partition, brokerId) + "is part of the assigned replicas list")
true
- }else {
+ } else {
info("ISR for topic %s partition %d is empty. Broker %d cannot become leader since it "
.format(topic, partition, brokerId) + "is not part of the assigned replicas list")
false
@@ -310,27 +290,19 @@ class KafkaZooKeeper(config: KafkaConfig
@throws(classOf[Exception])
def handleChildChange(parentPath : String, curChilds : java.util.List[String]) {
+ import collection.JavaConversions
topicListenerLock.synchronized {
debug("Topic/partition change listener fired for path " + parentPath)
- import scala.collection.JavaConversions._
- val currentChildren = asBuffer(curChilds)
+ val currentChildren = JavaConversions.asBuffer(curChilds).toSet
+ val newTopics = currentChildren -- allTopics
+ val deletedTopics = allTopics -- currentChildren
allTopics.clear()
- // check if topic has changed or a partition for an existing topic has changed
- if(parentPath == ZkUtils.BrokerTopicsPath) {
- val currentTopics = currentChildren
- debug("New topics " + currentTopics.mkString(","))
- // for each new topic [topic], watch the path /brokers/topics/[topic]/partitions
- currentTopics.foreach { topic =>
- zkClient.subscribeChildChanges(ZkUtils.getTopicPartitionsPath(topic), this)
- allTopics += topic
- }
- handleNewTopics(currentTopics)
- }else {
- val topic = parentPath.split("/").takeRight(2).head
- debug("Partitions changed for topic %s on broker %d with new value %s"
- .format(topic, config.brokerId, currentChildren.mkString(",")))
- handleNewPartitions(topic, currentChildren.map(p => p.toInt).toSeq)
- }
+ allTopics ++ currentChildren
+
+ debug("New topics: [%s]. Deleted topics: [%s]".format(newTopics.mkString(","), deletedTopics.mkString(",")))
+ handleNewTopics(newTopics.toSeq)
+ // TODO: Handle topic deletions
+ //handleDeletedTopics(deletedTopics.toSeq)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Mon Jun 18 01:17:44 2012
@@ -749,6 +749,21 @@ object Utils extends Logging {
builder.toString
}
+ def mapToJson[T <: Any](map: Map[String, List[String]]): String = {
+ val builder = new StringBuilder
+ builder.append("{ ")
+ var numElements = 0
+ for ( (key, value) <- map ) {
+ if (numElements > 0)
+ builder.append(",")
+ builder.append("\"" + key + "\": ")
+ builder.append("[%s]".format(value.map("\""+_+"\"").mkString(",")))
+ numElements += 1
+ }
+ builder.append(" }")
+ builder.toString
+ }
+
def checkRequiredArgs(parser: OptionParser, options: OptionSet, required: OptionSpec[_]*) {
for(arg <- required) {
if(!options.has(arg)) {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Mon Jun 18 01:17:44 2012
@@ -17,15 +17,16 @@
package kafka.utils
-import org.I0Itec.zkclient.serialize.ZkSerializer
-import kafka.cluster.{Broker, Cluster}
-import scala.collection._
import java.util.Properties
-import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
-import kafka.consumer.TopicCount
-import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import java.util.concurrent.locks.Condition
+import kafka.cluster.{Broker, Cluster}
import kafka.common.NoEpochForPartitionException
+import kafka.consumer.TopicCount
+import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
+import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
+import org.I0Itec.zkclient.serialize.ZkSerializer
+import scala.collection._
+import util.parsing.json.JSON
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
@@ -110,7 +111,7 @@ object ZkUtils extends Logging {
}else {
throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
}
- }catch {
+ } catch {
case e: ZkNoNodeException =>
throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
case e1 => throw e1
@@ -118,15 +119,23 @@ object ZkUtils extends Logging {
lastKnownEpoch
}
+ /**
+ * Gets the assigned replicas (AR) for a specific topic and partition
+ */
def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
- val replicaListString = readDataMaybeNull(zkClient, getTopicPartitionReplicasPath(topic, partition.toString))
- if(replicaListString == null)
- Seq.empty[String]
- else {
- Utils.getCSVList(replicaListString)
+ val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator)
+ topicAndPartitionAssignment.get(topic) match {
+ case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match {
+ case Some(replicaList) => replicaList
+ case None => Seq.empty[String]
+ }
+ case None => Seq.empty[String]
}
}
+ /**
+ * Gets the in-sync replicas (ISR) for a specific topic and partition
+ */
def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
if(replicaListAndEpochString == null)
@@ -225,8 +234,7 @@ object ZkUtils extends Logging {
private def createEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.createEphemeral(path, data)
- }
- catch {
+ } catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createEphemeral(path, data)
@@ -241,23 +249,20 @@ object ZkUtils extends Logging {
def createEphemeralPathExpectConflict(client: ZkClient, path: String, data: String): Unit = {
try {
createEphemeralPath(client, path, data)
- }
- catch {
+ } catch {
case e: ZkNodeExistsException => {
// this can happen when there is connection loss; make sure the data is what we intend to write
var storedData: String = null
try {
storedData = readData(client, path)
- }
- catch {
+ } catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
case e2 => throw e2
}
if (storedData == null || storedData != data) {
info("conflict in " + path + " data: " + data + " stored data: " + storedData)
throw e
- }
- else {
+ } else {
// otherwise, the creation succeeded, return normally
info(path + " exists with value " + data + " during connection loss; this is ok")
}
@@ -272,8 +277,7 @@ object ZkUtils extends Logging {
def createPersistentPath(client: ZkClient, path: String, data: String = ""): Unit = {
try {
client.createPersistent(path, data)
- }
- catch {
+ } catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createPersistent(path, data)
@@ -292,14 +296,12 @@ object ZkUtils extends Logging {
def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.writeData(path, data)
- }
- catch {
+ } catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
client.createPersistent(path, data)
- }
- catch {
+ } catch {
case e: ZkNodeExistsException => client.writeData(path, data)
case e2 => throw e2
}
@@ -315,8 +317,7 @@ object ZkUtils extends Logging {
def updateEphemeralPath(client: ZkClient, path: String, data: String): Unit = {
try {
client.writeData(path, data)
- }
- catch {
+ } catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
client.createEphemeral(path, data)
@@ -328,8 +329,7 @@ object ZkUtils extends Logging {
def deletePath(client: ZkClient, path: String): Boolean = {
try {
client.delete(path)
- }
- catch {
+ } catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
@@ -341,8 +341,7 @@ object ZkUtils extends Logging {
def deletePathRecursive(client: ZkClient, path: String) {
try {
client.deleteRecursive(path)
- }
- catch {
+ } catch {
case e: ZkNoNodeException =>
// this can happen during a connection loss event, return normally
info(path + " deleted during connection loss; this is ok")
@@ -368,16 +367,12 @@ object ZkUtils extends Logging {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
- var ret: java.util.List[String] = null
try {
- ret = client.getChildren(path)
- }
- catch {
- case e: ZkNoNodeException =>
- return Nil
+ client.getChildren(path)
+ } catch {
+ case e: ZkNoNodeException => return Nil
case e2 => throw e2
}
- return ret
}
/**
@@ -399,35 +394,40 @@ object ZkUtils extends Logging {
cluster
}
- def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
- val ret = new mutable.HashMap[String, Seq[String]]()
- topics.foreach { topic =>
- // get the partitions that exist for topic
- val partitions = getChildrenParentMayNotExist(zkClient, getTopicPartitionsPath(topic))
- debug("children of /brokers/topics/%s are %s".format(topic, partitions))
- ret += (topic -> partitions.sortWith((s,t) => s < t))
+ def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = {
+ val ret = new mutable.HashMap[String, Map[String, List[String]]]()
+ topics.foreach{ topic =>
+ val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))
+ val partitionMap = if (jsonPartitionMap == null) {
+ Map[String, List[String]]()
+ } else {
+ JSON.parseFull(jsonPartitionMap) match {
+ case Some(m) => m.asInstanceOf[Map[String, List[String]]]
+ case None => Map[String, List[String]]()
+ }
+ }
+ debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
+ ret += (topic -> partitionMap)
}
ret
}
- def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
- val topicsAndPartitions = getPartitionsForTopics(zkClient, topics.iterator)
-
- topicsAndPartitions.map { tp =>
- val topic = tp._1
- val partitions = tp._2.map(p => p.toInt)
- val relevantPartitions = partitions.filter { partition =>
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, partition).map(r => r.toInt)
- assignedReplicas.contains(brokerId)
- }
- (topic -> relevantPartitions)
+ def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
+ getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap =>
+ val topic = topicAndPartitionMap._1
+ val partitionMap = topicAndPartitionMap._2
+ debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
+ (topic -> partitionMap.keys.toSeq.sortWith((s,t) => s < t))
}
}
- def getPartitionsAssignedToBroker(zkClient: ZkClient, topic: String, partitions: Seq[Int], broker: Int): Seq[Int] = {
- partitions.filter { p =>
- val assignedReplicas = ZkUtils.getReplicasForPartition(zkClient, topic, p).map(r => r.toInt)
- assignedReplicas.contains(broker)
+ def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
+ val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
+ topicsAndPartitions.map{ topicAndPartitionMap =>
+ val topic = topicAndPartitionMap._1
+ val partitionMap = topicAndPartitionMap._2
+ val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) )
+ (topic -> relevantPartitions.keySet.map(_.toInt).toSeq)
}
}
@@ -470,14 +470,6 @@ object ZkUtils extends Logging {
consumersPerTopicMap
}
- /**
- * For a given topic, this returns the sorted list of partition ids registered for this topic
- */
- def getSortedPartitionIdsForTopic(zkClient: ZkClient, topic: String): Seq[Int] = {
- val topicPartitionsPath = ZkUtils.getTopicPartitionsPath(topic)
- ZkUtils.getChildrenParentMayNotExist(zkClient, topicPartitionsPath).map(pid => pid.toInt).sortWith((s,t) => s < t)
- }
-
def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/admin/AdminTest.scala Mon Jun 18 01:17:44 2012
@@ -50,18 +50,18 @@ class AdminTest extends JUnit3Suite with
// correct assignment
{
- val expectedAssignment = Array(
- List("0", "1", "2"),
- List("1", "2", "3"),
- List("2", "3", "4"),
- List("3", "4", "0"),
- List("4", "0", "1"),
- List("0", "2", "3"),
- List("1", "3", "4"),
- List("2", "4", "0"),
- List("3", "0", "1"),
- List("4", "1", "2")
- )
+ val expectedAssignment = Map(
+ 0 -> List("0", "1", "2"),
+ 1 -> List("1", "2", "3"),
+ 2 -> List("2", "3", "4"),
+ 3 -> List("3", "4", "0"),
+ 4 -> List("4", "0", "1"),
+ 5 -> List("0", "2", "3"),
+ 6 -> List("1", "3", "4"),
+ 7 -> List("2", "4", "0"),
+ 8 -> List("3", "0", "1"),
+ 9 -> List("4", "1", "2")
+ )
val actualAssignment = AdminUtils.assignReplicasToBrokers(brokerList, 10, 3, 0)
val e = (expectedAssignment.toList == actualAssignment.toList)
@@ -109,46 +109,51 @@ class AdminTest extends JUnit3Suite with
// good assignment
{
val replicationAssignmentStr = "0:1:2,1:2:3"
- val expectedReplicationAssignment = Array(
- List("0", "1", "2"),
- List("1", "2", "3")
+ val expectedReplicationAssignment = Map(
+ 0 -> List("0", "1", "2"),
+ 1 -> List("1", "2", "3")
)
val actualReplicationAssignment = CreateTopicCommand.getManualReplicaAssignment(replicationAssignmentStr, brokerList)
- assertTrue(expectedReplicationAssignment.toList == actualReplicationAssignment.toList)
+ assertEquals(expectedReplicationAssignment.size, actualReplicationAssignment.size)
+ for( (part, replicas) <- expectedReplicationAssignment ) {
+ assertEquals(replicas, actualReplicationAssignment(part))
+ }
}
}
@Test
def testTopicCreationInZK() {
- val expectedReplicaAssignment = Array(
- List("0", "1", "2"),
- List("1", "2", "3"),
- List("2", "3", "4"),
- List("3", "4", "0"),
- List("4", "0", "1"),
- List("0", "2", "3"),
- List("1", "3", "4"),
- List("2", "4", "0"),
- List("3", "0", "1"),
- List("4", "1", "2"),
- List("1", "2", "3"),
- List("1", "3", "4")
- )
+ val expectedReplicaAssignment = Map(
+ 0 -> List("0", "1", "2"),
+ 1 -> List("1", "2", "3"),
+ 2 -> List("2", "3", "4"),
+ 3 -> List("3", "4", "0"),
+ 4 -> List("4", "0", "1"),
+ 5 -> List("0", "2", "3"),
+ 6 -> List("1", "3", "4"),
+ 7 -> List("2", "4", "0"),
+ 8 -> List("3", "0", "1"),
+ 9 -> List("4", "1", "2"),
+ 10 -> List("1", "2", "3"),
+ 11 -> List("1", "3", "4")
+ )
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3, 4))
val topic = "test"
// create the topic
- AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
val actualReplicaAssignment = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
.get.partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
- expectedReplicaAssignment.toList.zip(actualReplicaList).foreach(l => assertEquals(l._1, l._2))
+ assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
+ for( i <- 0 until actualReplicaList.size ) {
+ assertEquals(expectedReplicaAssignment.get(i).get, actualReplicaList(i))
+ }
try {
- AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
fail("shouldn't be able to create a topic already exists")
- }
- catch {
+ } catch {
case e: AdministrationException => // this is good
case e2 => throw e2
}
@@ -156,22 +161,26 @@ class AdminTest extends JUnit3Suite with
@Test
def testGetTopicMetadata() {
- val expectedReplicaAssignment = Array(
- List("0", "1", "2"),
- List("1", "2", "3")
+ val expectedReplicaAssignment = Map(
+ 0 -> List("0", "1", "2"),
+ 1 -> List("1", "2", "3")
)
val topic = "auto-topic"
TestUtils.createBrokersInZk(zkClient, List(0, 1, 2, 3))
- AdminUtils.createReplicaAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
+ AdminUtils.createTopicPartitionAssignmentPathInZK(topic, expectedReplicaAssignment, zkClient)
val newTopicMetadata = AdminUtils.getTopicMetaDataFromZK(List(topic), zkClient).head
newTopicMetadata match {
- case Some(metadata) => assertEquals(topic, metadata.topic)
+ case Some(metadata) =>
+ assertEquals(topic, metadata.topic)
assertNotNull("partition metadata list cannot be null", metadata.partitionsMetadata)
assertEquals("partition metadata list length should be 2", 2, metadata.partitionsMetadata.size)
val actualReplicaAssignment = metadata.partitionsMetadata.map(p => p.replicas)
val actualReplicaList = actualReplicaAssignment.map(r => r.map(b => b.id.toString).toList).toList
- assertEquals(expectedReplicaAssignment.toList, actualReplicaList)
+ assertEquals(expectedReplicaAssignment.size, actualReplicaList.size)
+ for(i <- 0 until actualReplicaList.size) {
+ assertEquals(expectedReplicaAssignment(i), actualReplicaList(i))
+ }
case None => fail("Topic " + topic + " should've been automatically created")
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Mon Jun 18 01:17:44 2012
@@ -81,8 +81,7 @@ class ZookeeperConsumerConnectorTest ext
try {
getMessages(nMessages*2, topicMessageStreams0)
fail("should get an exception")
- }
- catch {
+ } catch {
case e: ConsumerTimeoutException => // this is ok
case e => throw e
}
@@ -90,15 +89,15 @@ class ZookeeperConsumerConnectorTest ext
zkConsumerConnector0.shutdown
- // wait to make sure the topic and partition have a leader for the successful case
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
-
// send some messages to each broker
val sentMessages1_1 = sendMessagesToBrokerPartition(configs.head, topic, 0, nMessages)
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
+ // wait to make sure the topic and partition have a leader for the successful case
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+
// create a consumer
val consumerConfig1 = new ConsumerConfig(
TestUtils.createConsumerProperties(zkConnect, group, consumer1))
@@ -141,7 +140,7 @@ class ZookeeperConsumerConnectorTest ext
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List( ("0", "group1_consumer1-0"),
("1", "group1_consumer2-0"))
- assertEquals(expected_2, actual_2)
+ assertEquals(expected_2, actual_2)
// create a consumer with empty map
val consumerConfig3 = new ConsumerConfig(
@@ -165,7 +164,7 @@ class ZookeeperConsumerConnectorTest ext
// also check partition ownership
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
- assertEquals(expected_2, actual_3)
+ assertEquals(expected_2, actual_3)
zkConsumerConnector1.shutdown
zkConsumerConnector2.shutdown
@@ -199,7 +198,7 @@ class ZookeeperConsumerConnectorTest ext
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List( ("0", "group1_consumer1-0"),
("1", "group1_consumer1-0"))
- assertEquals(expected_1, actual_1)
+ assertEquals(expected_1, actual_1)
// commit consumed offsets
zkConsumerConnector1.commitOffsets
@@ -227,7 +226,7 @@ class ZookeeperConsumerConnectorTest ext
val actual_2 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_2 = List( ("0", "group1_consumer1-0"),
("1", "group1_consumer2-0"))
- assertEquals(expected_2, actual_2)
+ assertEquals(expected_2, actual_2)
// create a consumer with empty map
val consumerConfig3 = new ConsumerConfig(
@@ -251,7 +250,7 @@ class ZookeeperConsumerConnectorTest ext
// also check partition ownership
val actual_3 = getZKChildrenValues(dirs.consumerOwnerDir)
- assertEquals(expected_2, actual_3)
+ assertEquals(expected_2, actual_3)
zkConsumerConnector1.shutdown
zkConsumerConnector2.shutdown
@@ -264,8 +263,6 @@ class ZookeeperConsumerConnectorTest ext
val requestHandlerLogger = Logger.getLogger(classOf[kafka.server.KafkaRequestHandler])
requestHandlerLogger.setLevel(Level.FATAL)
- var actualMessages: List[Message] = Nil
-
// shutdown one server
servers.last.shutdown
Thread.sleep(500)
@@ -288,7 +285,7 @@ class ZookeeperConsumerConnectorTest ext
val actual_1 = getZKChildrenValues(dirs.consumerOwnerDir)
val expected_1 = List( ("0", "group1_consumer0-0"),
("1", "group1_consumer0-0"))
- assertEquals(expected_1, actual_1)
+ assertEquals(expected_1, actual_1)
zkConsumerConnector0.shutdown
// at this point, only some part of the message set was consumed. So consumed offset should still be 0
@@ -361,8 +358,7 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages1 = sendMessages(configs.head, nMessages, "batch1", NoCompressionCodec, 1)
// create a consumer
- val consumerConfig1 = new ConsumerConfig(
- TestUtils.createConsumerProperties(zkConnect, group, consumer1))
+ val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
val zkConsumerConnector1 = new ZookeeperConsumerConnector(consumerConfig1, true)
val topicMessageStreams1 = zkConsumerConnector1.createMessageStreams(Predef.Map(topic -> 1))
val topicRegistry = zkConsumerConnector1.getTopicRegistry
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Mon Jun 18 01:17:44 2012
@@ -91,8 +91,11 @@ class ProducerTest extends JUnit3Suite w
props.put("zk.connect", TestZKUtils.zookeeperConnect)
val config = new ProducerConfig(props)
- // create topic with 1 partition
+
+ // create topic with 1 partition and await leadership
CreateTopicCommand.createTopic(zkClient, "new-topic", 1)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+
val producer = new Producer[String, String](config)
try {
// Available partition ids should be 0.
@@ -132,6 +135,10 @@ class ProducerTest extends JUnit3Suite w
// create topic
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 1, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 2, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 3, 500)
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
@@ -189,6 +196,7 @@ class ProducerTest extends JUnit3Suite w
// create topics in ZK
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
// do a simple test to make sure plumbing is okay
try {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1351188&r1=1351187&r2=1351188&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Mon Jun 18 01:17:44 2012
@@ -52,8 +52,9 @@ class ReplicaFetchTest extends JUnit3Sui
val followerBrokerId = configs.last.brokerId
val leaderBroker = new Broker(leaderBrokerId, "localhost", "localhost", configs.head.port)
- // create a topic and partition
+ // create a topic and partition and await leadership
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
// send test messages to leader
val producer = TestUtils.createProducer[String, String](zkConnect, new StringEncoder)