You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 00:51:01 UTC
svn commit: r1367811 [3/4] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/
main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/
main/scala/kafka/network/ main/sca...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Tue Jul 31 22:50:59 2012
@@ -33,7 +33,7 @@ object ConsumerOffsetChecker extends Log
// e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
- val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
+ val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))
val consumer = brokerInfo match {
case BrokerIpPattern(ip, port) =>
Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
@@ -47,9 +47,9 @@ object ConsumerOffsetChecker extends Log
private def processPartition(zkClient: ZkClient,
group: String, topic: String, bidPid: String) {
val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
- format(group, topic, bidPid))._1.toLong
+ format(group, topic, bidPid)).toLong
val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
- format(group, topic, bidPid))._1
+ format(group, topic, bidPid))
println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
println("%20s%s".format("Owner = ", owner))
println("%20s%d".format("Consumer offset = ", offset))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala Tue Jul 31 22:50:59 2012
@@ -100,7 +100,7 @@ object ExportZkOffsets extends Logging {
for (bidPid <- bidPidList) {
val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
- val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1
+ val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath)
fileWriter.write(offsetPath + ":" + offsetVal + "\n")
debug(offsetPath + " => " + offsetVal)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Tue Jul 31 22:50:59 2012
@@ -104,7 +104,7 @@ object VerifyConsumerRebalance extends L
}
// try reading the partition owner path for see if a valid consumer id exists there
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
- val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1
+ val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)
if(partitionOwner == null) {
error("No owner for topic %s partition %s".format(topic, partition))
rebalanceSucceeded = false
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Tue Jul 31 22:50:59 2012
@@ -45,7 +45,7 @@ object UpdateOffsetsInZK {
private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
val cluster = ZkUtils.getCluster(zkClient)
val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
- var partitions: Seq[Int] = Nil
+ var partitions: Seq[String] = Nil
partitionsPerTopicMap.get(topic) match {
case Some(l) => partitions = l.sortWith((s,t) => s < t)
@@ -54,7 +54,7 @@ object UpdateOffsetsInZK {
var numParts = 0
for (partition <- partitions) {
- val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+ val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition.toInt)
val broker = brokerHostingPartition match {
case Some(b) => b
@@ -68,7 +68,7 @@ object UpdateOffsetsInZK {
val brokerInfo = brokerInfos.head
val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
- val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
+ val offsets = consumer.getOffsetsBefore(topic, partition.toInt, offsetOption, 1)
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
println("updating partition " + partition + " with new offset: " + offsets(0))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -19,29 +19,22 @@ package kafka.utils
import java.util.Properties
import kafka.cluster.{Broker, Cluster}
+import kafka.common.NoEpochForPartitionException
import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import org.I0Itec.zkclient.serialize.ZkSerializer
import scala.collection._
import util.parsing.json.JSON
-import kafka.api.LeaderAndISR
-import kafka.common.NoEpochForPartitionException
-import org.apache.zookeeper.data.Stat
import java.util.concurrent.locks.{ReentrantLock, Condition}
-import scala.throws
-
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
+ val BrokerStatePath = "/brokers/state"
val ControllerPath = "/controller"
- def getBrokerPath(brokerId: Int): String = {
- BrokerIdsPath + "/" + brokerId
- }
-
def getTopicPath(topic: String): String ={
BrokerTopicsPath + "/" + topic
}
@@ -51,63 +44,53 @@ object ZkUtils extends Logging {
}
def getController(zkClient: ZkClient): Int= {
- val controller = readDataMaybeNull(zkClient, ControllerPath)._1
+ val controller = readDataMaybeNull(zkClient, ControllerPath)
controller.toInt
}
- def getTopicPartitionPath(topic: String, partitionId: Int): String ={
+ def getTopicPartitionPath(topic: String, partitionId: String): String ={
getTopicPartitionsPath(topic) + "/" + partitionId
}
- def getTopicPartitionLeaderAndISRPath(topic: String, partitionId: Int): String ={
+ def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={
getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
}
- def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
- ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+ def getTopicVersion(zkClient: ZkClient, topic: String): String ={
+ readDataMaybeNull(zkClient, getTopicPath(topic))
}
- def getAllLiveBrokerIds(zkClient: ZkClient): Set[Int] = {
- ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).toSet
+ def getTopicPartitionReplicasPath(topic: String, partitionId: String): String ={
+ getTopicPartitionPath(topic, partitionId) + "/" + "replicas"
}
- def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
- val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
- getBrokerInfoFromIds(zkClient, brokerIds.map(_.toInt))
+ def getTopicPartitionInSyncPath(topic: String, partitionId: String): String ={
+ getTopicPartitionPath(topic, partitionId) + "/" + "isr"
}
+ def getTopicPartitionLeaderPath(topic: String, partitionId: String): String ={
+ getTopicPartitionPath(topic, partitionId) + "/" + "leader"
+ }
- def getLeaderAndISRForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndISR] = {
- val leaderAndISRPath = getTopicPartitionLeaderAndISRPath(topic, partition)
- val ret = readDataMaybeNull(zkClient, leaderAndISRPath)
- val leaderAndISRStr: String = ret._1
- val stat = ret._2
- if(leaderAndISRStr == null) None
- else {
- JSON.parseFull(leaderAndISRStr) match {
- case Some(m) =>
- val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
- val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
- val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
- val ISR = Utils.getCSVList(ISRString).map(r => r.toInt)
- val zkPathVersion = stat.getVersion
- debug("Leader %d, Epoch %d, isr %s, zk path version %d for topic %s and partition %d".format(leader, epoch, ISR.toString(), zkPathVersion, topic, partition))
- Some(LeaderAndISR(leader, epoch, ISR.toList, zkPathVersion))
- case None => None
- }
- }
+ def getBrokerStateChangePath(brokerId: Int): String = {
+ BrokerStatePath + "/" + brokerId
+ }
+
+ def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
+ ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
}
+ def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
+ val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+ getBrokerInfoFromIds(zkClient, brokerIds.map(b => b.toInt))
+ }
def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
- val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
- if(leaderAndISR == null) None
+ val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
+ if(leaderAndEpoch == null) None
else {
- JSON.parseFull(leaderAndISR) match {
- case Some(m) =>
- Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
- case None => None
- }
+ val leaderAndEpochInfo = leaderAndEpoch.split(";")
+ Some(leaderAndEpochInfo.head.toInt)
}
}
@@ -116,53 +99,51 @@ object ZkUtils extends Logging {
* leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
* other broker will retry becoming leader with the same new epoch value.
*/
- def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
- val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
- if(leaderAndISR != null) {
- val epoch = JSON.parseFull(leaderAndISR) match {
- case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
- case Some(m) =>
- m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+ def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
+ val lastKnownEpoch = try {
+ val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
+ if(isrAndEpoch != null) {
+ val isrAndEpochInfo = isrAndEpoch.split(";")
+ if(isrAndEpochInfo.last.isEmpty)
+ throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
+ else
+ isrAndEpochInfo.last.toInt
+ }else {
+ throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
}
- epoch
+ } catch {
+ case e: ZkNoNodeException =>
+ throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
+ case e1 => throw e1
}
- else
- throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty".format(topic, partition))
+ lastKnownEpoch
}
/**
- * Gets the in-sync replicas (ISR) for a specific topic and partition
+ * Gets the assigned replicas (AR) for a specific topic and partition
*/
- def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
- val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
- if(leaderAndISR == null) Seq.empty[Int]
- else {
- JSON.parseFull(leaderAndISR) match {
- case Some(m) =>
- val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
- Utils.getCSVList(ISRString).map(r => r.toInt)
- case None => Seq.empty[Int]
+ def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
+ val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator)
+ topicAndPartitionAssignment.get(topic) match {
+ case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match {
+ case Some(replicaList) => replicaList
+ case None => Seq.empty[String]
}
+ case None => Seq.empty[String]
}
}
/**
- * Gets the assigned replicas (AR) for a specific topic and partition
+ * Gets the in-sync replicas (ISR) for a specific topic and partition
*/
- def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
- val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
- val assignedReplicas = if (jsonPartitionMap == null) {
+ def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
+ val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
+ if(replicaListAndEpochString == null)
Seq.empty[Int]
- } else {
- JSON.parseFull(jsonPartitionMap) match {
- case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
- case None => Seq.empty[Int]
- case Some(seq) => seq.map(_.toInt)
- }
- case None => Seq.empty[Int]
- }
+ else {
+ val replicasAndEpochInfo = replicaListAndEpochString.split(";")
+ Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
}
- assignedReplicas
}
def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
@@ -171,7 +152,25 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
- def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = {
+ def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
+ try {
+ // NOTE: first increment epoch, then become leader
+ val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
+ createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
+ "%d;%d".format(brokerId, newEpoch))
+ val currentISR = getInSyncReplicasForPartition(client, topic, partition)
+ val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
+ updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
+ "%s;%d".format(updatedISR.mkString(","), newEpoch))
+ info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
+ Some(newEpoch, updatedISR)
+ } catch {
+ case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
+ case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
+ }
+ }
+
+ def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
// read previous epoch, increment it and write it to the leader path and the ISR path.
val epoch = try {
Some(getEpochForPartition(client, topic, partition))
@@ -199,12 +198,15 @@ object ZkUtils extends Logging {
createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
} catch {
case e: ZkNodeExistsException =>
- throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
+ throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
+ "indicates that you either have configured a brokerid that is already in use, or " +
+ "else you have shutdown this broker and restarted it faster than the zookeeper " +
+ "timeout so it appears to be re-registering.")
}
info("Registering broker " + brokerIdPath + " succeeded with " + broker)
}
- def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
+ def getConsumerPartitionOwnerPath(group: String, topic: String, partition: String): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition
}
@@ -252,7 +254,7 @@ object ZkUtils extends Logging {
// this can happen when there is connection loss; make sure the data is what we intend to write
var storedData: String = null
try {
- storedData = readData(client, path)._1
+ storedData = readData(client, path)
} catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
case e2 => throw e2
@@ -290,24 +292,17 @@ object ZkUtils extends Logging {
/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
- * Return the updated path zkVersion
*/
- def updatePersistentPath(client: ZkClient, path: String, data: String): Int = {
- var stat: Stat = null
+ def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
try {
- stat = client.writeData(path, data)
- return stat.getVersion
+ client.writeData(path, data)
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
client.createPersistent(path, data)
- // When the new path is created, its zkVersion always starts from 0
- return 0
} catch {
- case e: ZkNodeExistsException =>
- stat = client.writeData(path, data)
- return stat.getVersion
+ case e: ZkNodeExistsException => client.writeData(path, data)
case e2 => throw e2
}
}
@@ -316,22 +311,6 @@ object ZkUtils extends Logging {
}
/**
- * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
- * exist, the current version is not the expected version, etc.) return (false, -1)
- */
- def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
- try {
- val stat = client.writeData(path, data, expectVersion)
- info("Conditional update the zkPath %s with expected version %d succeed and return the new version: %d".format(path, expectVersion, stat.getVersion))
- (true, stat.getVersion)
- } catch {
- case e: Exception =>
- info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion))
- (false, -1)
- }
- }
-
- /**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
*/
@@ -370,23 +349,12 @@ object ZkUtils extends Logging {
}
}
- def readData(client: ZkClient, path: String): (String, Stat) = {
- val stat: Stat = new Stat()
- val dataStr: String = client.readData(path, stat)
- (dataStr, stat)
+ def readData(client: ZkClient, path: String): String = {
+ client.readData(path)
}
- def readDataMaybeNull(client: ZkClient, path: String): (String, Stat) = {
- val stat: Stat = new Stat()
- var dataStr: String = null
- try{
- dataStr = client.readData(path, stat)
- return (dataStr, stat)
- } catch {
- case e: ZkNoNodeException =>
- return (null, stat)
- case e2 => throw e2
- }
+ def readDataMaybeNull(client: ZkClient, path: String): String = {
+ client.readData(path, true)
}
def getChildren(client: ZkClient, path: String): Seq[String] = {
@@ -398,6 +366,7 @@ object ZkUtils extends Logging {
def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
+
try {
client.getChildren(path)
} catch {
@@ -419,77 +388,32 @@ object ZkUtils extends Logging {
val cluster = new Cluster
val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
for (node <- nodes) {
- val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
+ val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)
cluster.add(Broker.createBroker(node.toInt, brokerZKString))
}
cluster
}
- def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), Seq[Int]] = {
- val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+ def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = {
+ val ret = new mutable.HashMap[String, Map[String, List[String]]]()
topics.foreach{ topic =>
- val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
- if (jsonPartitionMap != null) {
- JSON.parseFull(jsonPartitionMap) match {
- case Some(m) =>
- val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
- for((partition, replicas) <- replicaMap){
- ret.put((topic, partition.toInt), replicas.map(_.toInt))
- debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
- }
- case None =>
- }
- }
- }
- ret
- }
-
- def getPartitionLeaderAndISRForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), LeaderAndISR] = {
- val ret = new mutable.HashMap[(String, Int), LeaderAndISR]
- val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
- for((topic, partitions) <- partitionsForTopics){
- for(partition <- partitions){
- val leaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition.toInt)
- if(leaderAndISROpt.isDefined)
- ret.put((topic, partition.toInt), leaderAndISROpt.get)
- }
- }
- ret
- }
-
- def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
- val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
- topics.foreach{ topic =>
- val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+ val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))
val partitionMap = if (jsonPartitionMap == null) {
- Map[Int, Seq[Int]]()
+ Map[String, List[String]]()
} else {
JSON.parseFull(jsonPartitionMap) match {
- case Some(m) =>
- val m1 = m.asInstanceOf[Map[String, Seq[String]]]
- m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
- case None => Map[Int, Seq[Int]]()
+ case Some(m) => m.asInstanceOf[Map[String, List[String]]]
+ case None => Map[String, List[String]]()
}
}
debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
ret += (topic -> partitionMap)
- }
- ret
- }
-
- def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): mutable.Map[(String, Int), Seq[Int]] = {
- val ret = new mutable.HashMap[(String, Int), Seq[Int]]
- for((topic, partitionAssignment) <- topicPartitionAssignment){
- for((partition, replicaAssignment) <- partitionAssignment){
- ret.put((topic, partition), replicaAssignment)
- }
}
ret
}
- def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[Int]] = {
- getPartitionAssignmentForTopics(zkClient, topics).map
- { topicAndPartitionMap =>
+ def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
+ getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
@@ -497,20 +421,14 @@ object ZkUtils extends Logging {
}
}
- def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[(String, Int), Seq[Int]] = {
- val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+ def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
- topicsAndPartitions.map
- {
- topicAndPartitionMap =>
- val topic = topicAndPartitionMap._1
- val partitionMap = topicAndPartitionMap._2
- val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
- for((relevantPartition, replicaAssignment) <- relevantPartitionsMap){
- ret.put((topic, relevantPartition), replicaAssignment)
- }
+ topicsAndPartitions.map{ topicAndPartitionMap =>
+ val topic = topicAndPartitionMap._1
+ val partitionMap = topicAndPartitionMap._2
+ val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) )
+ (topic -> relevantPartitions.keySet.map(_.toInt).toSeq)
}
- ret
}
def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
@@ -528,7 +446,8 @@ object ZkUtils extends Logging {
def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
val dirs = new ZKGroupDirs(group)
val consumersInGroup = getConsumersInGroup(zkClient, group)
- val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient))
+ val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
+ ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
consumersInGroup.zip(topicCountMaps).toMap
}
@@ -551,7 +470,8 @@ object ZkUtils extends Logging {
consumersPerTopicMap
}
- def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1))
+ def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
+ brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
def getAllTopics(zkClient: ZkClient): Seq[String] = {
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
@@ -559,45 +479,17 @@ object ZkUtils extends Logging {
else topics
}
- def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = {
- // read previous epoch, increment it and write it to the leader path and the ISR path.
- val epoch = try {
- Some(getEpochForPartition(client, topic, partition))
- }catch {
- case e: NoEpochForPartitionException => None
- case e1 => throw e1
- }
- val newEpoch = epoch match {
- case Some(partitionEpoch) =>
- debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
- partitionEpoch + 1
- case None =>
- // this is the first time leader is elected for this partition. So set epoch to 1
- debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
- LeaderAndISR.initialLeaderEpoch
- }
- newEpoch
- }
}
-class LeaderElectionListener(topic: String,
- partition: Int,
- leaderLock: ReentrantLock,
- leaderExistsOrChanged: Condition,
- zkClient: ZkClient = null) extends IZkDataListener with Logging {
+class LeaderExistsListener(topic: String, partition: Int, leaderLock: ReentrantLock, leaderExists: Condition) extends IZkDataListener {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
val t = dataPath.split("/").takeRight(3).head
val p = dataPath.split("/").takeRight(2).head.toInt
leaderLock.lock()
try {
- if(t == topic && p == partition){
- val newLeaderOpt = ZkUtils.getLeaderForPartition(zkClient, t, p)
- if(newLeaderOpt.isDefined && ZkUtils.pathExists (zkClient, ZkUtils.getBrokerPath(newLeaderOpt.get))){
- trace("In leader election listener on partition [%s, %d], live leader %d is elected".format(topic, partition, newLeaderOpt.get))
- leaderExistsOrChanged.signal()
- }
- }
+ if(t == topic && p == partition)
+ leaderExists.signal()
}
finally {
leaderLock.unlock()
@@ -605,9 +497,15 @@ class LeaderElectionListener(topic: Stri
}
@throws(classOf[Exception])
- def handleDataDeleted(dataPath: String){
- // Nothing
+ def handleDataDeleted(dataPath: String) {
+ leaderLock.lock()
+ try {
+ leaderExists.signal()
+ }finally {
+ leaderLock.unlock()
+ }
}
+
}
object ZKStringSerializer extends ZkSerializer {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Tue Jul 31 22:50:59 2012
@@ -95,8 +95,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
// wait to make sure the topic and partition have a leader for the successful case
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
@@ -128,8 +128,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -153,8 +153,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -182,8 +182,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
@@ -214,8 +214,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -239,8 +239,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -320,8 +320,8 @@ class ZookeeperConsumerConnectorTest ext
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
val zkConsumerConnector =
new ZookeeperConsumerConnector(consumerConfig, true)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala Tue Jul 31 22:50:59 2012
@@ -25,8 +25,7 @@ import kafka.server.{KafkaServer, KafkaC
import kafka.api._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
-import kafka.admin.CreateTopicCommand
-import kafka.utils.{ZkUtils, ControllerTestUtils, TestUtils}
+import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils}
class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -37,13 +36,11 @@ class ControllerBasicTest extends JUnit3
override def setUp() {
super.setUp()
brokers = configs.map(config => TestUtils.createServer(config))
- CreateTopicCommand.createTopic(zkClient, "test1", 1, 4, "0:1:2:3")
- CreateTopicCommand.createTopic(zkClient, "test2", 1, 4, "0:1:2:3")
}
override def tearDown() {
- brokers.foreach(_.shutdown())
super.tearDown()
+ brokers.foreach(_.shutdown())
}
def testControllerFailOver(){
@@ -52,39 +49,35 @@ class ControllerBasicTest extends JUnit3
brokers(3).shutdown()
Thread.sleep(1000)
- var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
- info("cur controller " + curController)
+ var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
assertEquals(curController, "2")
-
brokers(1).startup()
brokers(2).shutdown()
Thread.sleep(1000)
- curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
- info("cur controller " + curController)
- assertEquals("Controller should be on broker 1", curController, "1")
+ curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+ assertEquals(curController, "1")
}
def testControllerCommandSend(){
Thread.sleep(1000)
-
for(broker <- brokers){
if(broker.kafkaController.isActive){
- val leaderAndISRRequest = ControllerTestUtils.createTestLeaderAndISRRequest()
- val stopReplicaRequest = ControllerTestUtils.createTestStopReplicaRequest()
+ val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()
+ val stopReplicaRequest = ControllerTestUtils.createSampleStopReplicaRequest()
val successCount: AtomicInteger = new AtomicInteger(0)
val countDownLatch: CountDownLatch = new CountDownLatch(8)
def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
- val expectedResponse = ControllerTestUtils.createTestLeaderAndISRResponse()
+ val expectedResponse = ControllerTestUtils.createSampleLeaderAndISRResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
}
def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
- val expectedResponse = ControllerTestUtils.createTestStopReplicaResponse()
+ val expectedResponse = ControllerTestUtils.createSampleStopReplicaResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
@@ -94,10 +87,10 @@ class ControllerBasicTest extends JUnit3
broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
- broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
- broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
- broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
- broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(0, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(1, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(2, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(3, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
countDownLatch.await()
assertEquals(successCount.get(), 8)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Tue Jul 31 22:50:59 2012
@@ -57,7 +57,7 @@ class BackwardsCompatibilityTest extends
// test for reading data with magic byte 0
def testProtocolVersion0() {
CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
var fetchOffset: Long = 0L
var messageCount: Int = 0
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -35,28 +35,27 @@ import kafka.admin.CreateTopicCommand
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val numNodes = 1
- val configs =
+ val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
- yield new KafkaConfig(props)
+ yield new KafkaConfig(props)
val messages = new mutable.HashMap[Int, Seq[Message]]
val topic = "topic"
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
- c.brokerId,
- 0,
- queue,
- new AtomicLong(0),
- new AtomicLong(0),
- new AtomicInteger(0)))
-
+ c.brokerId,
+ 0,
+ queue,
+ new AtomicLong(0),
+ new AtomicLong(0),
+ new AtomicInteger(0)))
+
var fetcher: ConsumerFetcherManager = null
override def setUp() {
super.setUp
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
fetcher.stopAllConnections()
fetcher.startConnections(topicInfos, cluster)
@@ -66,20 +65,20 @@ class FetcherTest extends JUnit3Suite wi
fetcher.shutdown()
super.tearDown
}
-
+
def testFetcher() {
val perNode = 2
var count = sendMessages(perNode)
-
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
fetch(count)
assertQueueEmpty()
count = sendMessages(perNode)
fetch(count)
assertQueueEmpty()
}
-
+
def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
-
+
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
@@ -92,7 +91,7 @@ class FetcherTest extends JUnit3Suite wi
}
count
}
-
+
def fetch(expected: Int) {
var count = 0
while(true) {
@@ -104,5 +103,5 @@ class FetcherTest extends JUnit3Suite wi
return
}
}
-
+
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -134,7 +134,7 @@ class LazyInitProducerTest extends JUnit
builder.addFetch(topic, 0, 0, 10000)
}
// wait until leader is elected
- topics.foreach(topic => TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500))
+ topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500))
producer.send(produceList: _*)
// wait a bit for produced message to be available
@@ -160,7 +160,7 @@ class LazyInitProducerTest extends JUnit
builder.addFetch(topic, 0, 0, 10000)
}
// wait until leader is elected
- topics.foreach(topic => TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 1500))
+ topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1500))
producer.send(produceList: _*)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Tue Jul 31 22:50:59 2012
@@ -352,7 +352,7 @@ class PrimitiveApiTest extends JUnit3Sui
def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
for( topic <- topics ) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Tue Jul 31 22:50:59 2012
@@ -117,8 +117,7 @@ class TopicMetadataTest extends JUnit3Su
// create the kafka request handler
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null,
- null, null, null, 1)
+ val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
// mock the receive API to return the request buffer as created above
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Tue Jul 31 22:50:59 2012
@@ -54,8 +54,8 @@ class ZookeeperConsumerConnectorTest ext
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1")
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLiveLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElected(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Tue Jul 31 22:50:59 2012
@@ -118,7 +118,7 @@ class LogOffsetTest extends JUnit3Suite
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
var offsetChanged = false
for(i <- 1 to 14) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Tue Jul 31 22:50:59 2012
@@ -27,10 +27,9 @@ import junit.framework.Assert._
class SocketServerTest extends JUnitSuite {
- val server: SocketServer = new SocketServer(0,
- port = TestUtils.choosePort,
- numProcessorThreads = 1,
- monitoringPeriodSecs = 30,
+ val server: SocketServer = new SocketServer(port = TestUtils.choosePort,
+ numProcessorThreads = 1,
+ monitoringPeriodSecs = 30,
maxQueuedRequests = 50,
maxRequestSize = 50)
server.startup()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -106,6 +106,13 @@ class AsyncProducerTest extends JUnit3Su
}
}
+ def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
+ val producerDataList = new ListBuffer[ProducerData[String,String]]
+ for (i <- 0 until nEvents)
+ producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
+ producerDataList
+ }
+
@Test
def testBatchSize() {
/**
@@ -523,13 +530,6 @@ class AsyncProducerTest extends JUnit3Su
}
}
- def getProduceData(nEvents: Int): Seq[ProducerData[String,String]] = {
- val producerDataList = new ListBuffer[ProducerData[String,String]]
- for (i <- 0 until nEvents)
- producerDataList.append(new ProducerData[String,String]("topic1", null, List("msg" + i)))
- producerDataList
- }
-
private def messagesToSet(messages: Seq[String]): ByteBufferMessageSet = {
val encoder = new StringEncoder
new ByteBufferMessageSet(NoCompressionCodec, messages.map(m => encoder.toMessage(m)): _*)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -104,7 +104,7 @@ class ProducerTest extends JUnit3Suite w
// create topic with 1 partition and await leadership
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
val producer1 = new Producer[String, String](config1)
val producer2 = new Producer[String, String](config2)
@@ -155,10 +155,10 @@ class ProducerTest extends JUnit3Suite w
// create topic
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 0, 500)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 1, 500)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 2, 500)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 3, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 1, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 2, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 3, 500)
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
@@ -216,7 +216,7 @@ class ProducerTest extends JUnit3Suite w
// create topics in ZK
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "new-topic", 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
// do a simple test to make sure plumbing is okay
try {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Tue Jul 31 22:50:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -159,9 +159,9 @@ class SyncProducerTest extends JUnit3Sui
// #2 - test that we get correct offsets when partition is owned by broker
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "topic1", 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500)
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, "topic3", 0, 500)
+ TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
val response2 = producer.send(request)
Assert.assertNotNull(response2)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Tue Jul 31 22:50:59 2012
@@ -39,9 +39,8 @@ class HighwatermarkPersistenceTest exten
val scheduler = new KafkaScheduler(2)
scheduler.startUp
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
replicaManager.startup()
- replicaManager.startHighWaterMarksCheckPointThread()
// sleep until flush ms
Thread.sleep(configs.head.defaultFlushIntervalMs)
var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
@@ -84,9 +83,8 @@ class HighwatermarkPersistenceTest exten
val scheduler = new KafkaScheduler(2)
scheduler.startUp
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
replicaManager.startup()
- replicaManager.startHighWaterMarksCheckPointThread()
// sleep until flush ms
Thread.sleep(configs.head.defaultFlushIntervalMs)
var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Tue Jul 31 22:50:59 2012
@@ -100,7 +100,7 @@ class ISRExpirationTest extends JUnit3Su
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, null)
+ val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler)
try {
val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
// create leader log
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Tue Jul 31 22:50:59 2012
@@ -20,11 +20,12 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import kafka.zk.ZooKeeperTestHarness
import kafka.admin.CreateTopicCommand
+import kafka.utils.TestUtils._
import junit.framework.Assert._
import kafka.utils.{ZkUtils, Utils, TestUtils}
-import kafka.utils.TestUtils._
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
+
val brokerId1 = 0
val brokerId2 = 1
@@ -33,23 +34,23 @@ class LeaderElectionTest extends JUnit3S
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
- var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+
override def setUp() {
super.setUp()
- // start both servers
- val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
- val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
- servers ++= List(server1, server2)
}
override def tearDown() {
- servers.map(server => server.shutdown())
- servers.map(server => Utils.rm(server.config.logDir))
super.tearDown()
}
- def testLeaderElectionAndEpoch {
+ def testLeaderElectionWithCreateTopic {
+ var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
+ // start both servers
+ val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+ val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+
+ servers ++= List(server1, server2)
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
@@ -58,39 +59,62 @@ class LeaderElectionTest extends JUnit3S
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
// wait until leader is elected
- val leader1 = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
- val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
- debug("leader Epoc: " + leaderEpoch1)
- debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
- assertTrue("Leader should get elected", leader1.isDefined)
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
- assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
- assertEquals("First epoch value should be 0", 0, leaderEpoch1)
+ assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
// kill the server hosting the preferred replica
- servers.last.shutdown()
+ server1.shutdown()
+
// check if leader moves to the other server
- val leader2 = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 1500)
- val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
- debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
- debug("leader Epoc: " + leaderEpoch2)
- assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
- if(leader1.get == leader2.get)
- assertEquals("Second epoch value should be " + leaderEpoch1, leaderEpoch1, leaderEpoch2)
- else
- assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
-
- servers.last.startup()
- servers.head.shutdown()
- Thread.sleep(zookeeper.tickTime)
- val leader3 = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 1500)
- val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
- debug("leader Epoc: " + leaderEpoch3)
- debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
- assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
- if(leader2.get == leader3.get)
- assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
- else
- assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
+ assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
+
+ val leaderPath = zkClient.getChildren(ZkUtils.getTopicPartitionPath(topic, "0"))
+ // bring the preferred replica back
+ servers.head.startup()
+
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
+
+ // shutdown current leader (broker 1)
+ servers.last.shutdown()
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+
+ // test if the leader is the preferred replica
+ assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
+ // shutdown the servers and delete data hosted on them
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDir))
+ }
+
+ // Assuming leader election happens correctly, test if epoch changes as expected
+ def testEpoch() {
+ // keep switching leaders to see if epoch changes correctly
+ val topic = "new-topic"
+ val partitionId = 0
+
+ // setup 2 brokers in ZK
+ val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
+
+ // create topic with 1 partition, 2 replicas, one on each broker
+ CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
+
+ var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+ assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
+ assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
+
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+ newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
+ assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
+ assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
+
+ ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
+ newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
+ assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
+ assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
+
+ TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Tue Jul 31 22:50:59 2012
@@ -57,7 +57,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -89,7 +89,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -103,13 +103,13 @@ class LogRecoveryTest extends JUnit3Suit
assertEquals(30L, hwFile1.read(topic, 0))
// check if leader moves to the other server
- leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
// bring the preferred replica back
server1.startup()
- leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
assertEquals(30L, hwFile1.read(topic, 0))
@@ -118,7 +118,7 @@ class LogRecoveryTest extends JUnit3Suit
assertEquals(30L, hwFile2.read(topic, 0))
server2.startup()
- leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
sendMessages()
@@ -159,7 +159,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -202,7 +202,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -218,7 +218,7 @@ class LogRecoveryTest extends JUnit3Suit
server2.startup()
// check if leader moves to the other server
- leader = waitUntilLiveLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
assertEquals(60L, hwFile1.read(topic, 0))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Tue Jul 31 22:50:59 2012
@@ -51,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Sui
// create a topic and partition and await leadership
for (topic <- List(topic1,topic2)) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
- TestUtils.waitUntilLiveLeaderIsElected(zkClient, topic, 0, 1000)
+ TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
}
// send test messages to leader
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Tue Jul 31 22:50:59 2012
@@ -73,7 +73,7 @@ class RequestPurgatoryTest {
assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
}
- class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest]("Mock Request Purgatory") {
+ class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
val satisfied = mutable.Set[DelayedRequest]()
val expired = mutable.Set[DelayedRequest]()
def awaitExpiration(delayed: DelayedRequest) = {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1367811&r1=1367810&r2=1367811&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Tue Jul 31 22:50:59 2012
@@ -73,7 +73,7 @@ class ServerShutdownTest extends JUnit3S
val server = new KafkaServer(config)
server.startup()
- waitUntilLiveLeaderIsElected(zkClient, topic, 0, 1000)
+ waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {