You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/08/01 18:14:01 UTC
svn commit: r1368092 [3/4] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/
main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/
main/scala/kafka/network/ main/sca...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala Wed Aug 1 16:13:59 2012
@@ -33,7 +33,7 @@ object ConsumerOffsetChecker extends Log
// e.g., 127.0.0.1-1315436360737:127.0.0.1:9092
private def getConsumer(zkClient: ZkClient, bid: String): Option[SimpleConsumer] = {
- val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))
+ val brokerInfo = ZkUtils.readDataMaybeNull(zkClient, "/brokers/ids/%s".format(bid))._1
val consumer = brokerInfo match {
case BrokerIpPattern(ip, port) =>
Some(new SimpleConsumer(ip, port.toInt, 10000, 100000))
@@ -47,9 +47,9 @@ object ConsumerOffsetChecker extends Log
private def processPartition(zkClient: ZkClient,
group: String, topic: String, bidPid: String) {
val offset = ZkUtils.readData(zkClient, "/consumers/%s/offsets/%s/%s".
- format(group, topic, bidPid)).toLong
+ format(group, topic, bidPid))._1.toLong
val owner = ZkUtils.readDataMaybeNull(zkClient, "/consumers/%s/owners/%s/%s".
- format(group, topic, bidPid))
+ format(group, topic, bidPid))._1
println("%s,%s,%s (Group,Topic,BrokerId-PartitionId)".format(group, topic, bidPid))
println("%20s%s".format("Owner = ", owner))
println("%20s%d".format("Consumer offset = ", offset))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/ExportZkOffsets.scala Wed Aug 1 16:13:59 2012
@@ -100,7 +100,7 @@ object ExportZkOffsets extends Logging {
for (bidPid <- bidPidList) {
val zkGrpTpDir = new ZKGroupTopicDirs(consumerGrp,topic)
val offsetPath = zkGrpTpDir.consumerOffsetDir + "/" + bidPid
- val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath)
+ val offsetVal = ZkUtils.readDataMaybeNull(zkClient, offsetPath)._1
fileWriter.write(offsetPath + ":" + offsetVal + "\n")
debug(offsetPath + " => " + offsetVal)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/VerifyConsumerRebalance.scala Wed Aug 1 16:13:59 2012
@@ -104,7 +104,7 @@ object VerifyConsumerRebalance extends L
}
// try reading the partition owner path for see if a valid consumer id exists there
val partitionOwnerPath = topicDirs.consumerOwnerDir + "/" + partition
- val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)
+ val partitionOwner = ZkUtils.readDataMaybeNull(zkClient, partitionOwnerPath)._1
if(partitionOwner == null) {
error("No owner for topic %s partition %s".format(topic, partition))
rebalanceSucceeded = false
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/UpdateOffsetsInZK.scala Wed Aug 1 16:13:59 2012
@@ -45,7 +45,7 @@ object UpdateOffsetsInZK {
private def getAndSetOffsets(zkClient: ZkClient, offsetOption: Long, config: ConsumerConfig, topic: String): Unit = {
val cluster = ZkUtils.getCluster(zkClient)
val partitionsPerTopicMap = ZkUtils.getPartitionsForTopics(zkClient, List(topic).iterator)
- var partitions: Seq[String] = Nil
+ var partitions: Seq[Int] = Nil
partitionsPerTopicMap.get(topic) match {
case Some(l) => partitions = l.sortWith((s,t) => s < t)
@@ -54,7 +54,7 @@ object UpdateOffsetsInZK {
var numParts = 0
for (partition <- partitions) {
- val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition.toInt)
+ val brokerHostingPartition = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
val broker = brokerHostingPartition match {
case Some(b) => b
@@ -68,7 +68,7 @@ object UpdateOffsetsInZK {
val brokerInfo = brokerInfos.head
val consumer = new SimpleConsumer(brokerInfo.host, brokerInfo.port, 10000, 100 * 1024)
- val offsets = consumer.getOffsetsBefore(topic, partition.toInt, offsetOption, 1)
+ val offsets = consumer.getOffsetsBefore(topic, partition, offsetOption, 1)
val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
println("updating partition " + partition + " with new offset: " + offsets(0))
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala Wed Aug 1 16:13:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -19,20 +19,21 @@ package kafka.utils
import java.util.Properties
import kafka.cluster.{Broker, Cluster}
-import kafka.common.NoEpochForPartitionException
import kafka.consumer.TopicCount
import org.I0Itec.zkclient.{IZkDataListener, ZkClient}
import org.I0Itec.zkclient.exception.{ZkNodeExistsException, ZkNoNodeException, ZkMarshallingError}
import org.I0Itec.zkclient.serialize.ZkSerializer
import scala.collection._
import util.parsing.json.JSON
+import kafka.api.LeaderAndISR
+import kafka.common.NoEpochForPartitionException
+import org.apache.zookeeper.data.Stat
import java.util.concurrent.locks.{ReentrantLock, Condition}
object ZkUtils extends Logging {
val ConsumersPath = "/consumers"
val BrokerIdsPath = "/brokers/ids"
val BrokerTopicsPath = "/brokers/topics"
- val BrokerStatePath = "/brokers/state"
val ControllerPath = "/controller"
def getTopicPath(topic: String): String ={
@@ -44,53 +45,63 @@ object ZkUtils extends Logging {
}
def getController(zkClient: ZkClient): Int= {
- val controller = readDataMaybeNull(zkClient, ControllerPath)
+ val controller = readDataMaybeNull(zkClient, ControllerPath)._1
controller.toInt
}
- def getTopicPartitionPath(topic: String, partitionId: String): String ={
+ def getTopicPartitionPath(topic: String, partitionId: Int): String ={
getTopicPartitionsPath(topic) + "/" + partitionId
}
- def getTopicPartitionLeaderAndISR(topic: String, partitionId: String): String ={
+ def getTopicPartitionLeaderAndISRPath(topic: String, partitionId: Int): String ={
getTopicPartitionPath(topic, partitionId) + "/" + "leaderAndISR"
}
- def getTopicVersion(zkClient: ZkClient, topic: String): String ={
- readDataMaybeNull(zkClient, getTopicPath(topic))
- }
-
- def getTopicPartitionReplicasPath(topic: String, partitionId: String): String ={
- getTopicPartitionPath(topic, partitionId) + "/" + "replicas"
+ def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
+ ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
}
- def getTopicPartitionInSyncPath(topic: String, partitionId: String): String ={
- getTopicPartitionPath(topic, partitionId) + "/" + "isr"
+ def getAllLiveBrokerIds(zkClient: ZkClient): Set[Int] = {
+ ZkUtils.getChildren(zkClient, BrokerIdsPath).map(_.toInt).toSet
}
- def getTopicPartitionLeaderPath(topic: String, partitionId: String): String ={
- getTopicPartitionPath(topic, partitionId) + "/" + "leader"
+ def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
+ val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+ getBrokerInfoFromIds(zkClient, brokerIds.map(_.toInt))
}
- def getBrokerStateChangePath(brokerId: Int): String = {
- BrokerStatePath + "/" + brokerId
- }
- def getSortedBrokerList(zkClient: ZkClient): Seq[String] ={
- ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
+ def getLeaderAndISRForPartition(zkClient: ZkClient, topic: String, partition: Int):Option[LeaderAndISR] = {
+ val leaderAndISRPath = getTopicPartitionLeaderAndISRPath(topic, partition)
+ val ret = readDataMaybeNull(zkClient, leaderAndISRPath)
+ val leaderAndISRStr: String = ret._1
+ val stat = ret._2
+ if(leaderAndISRStr == null) None
+ else {
+ JSON.parseFull(leaderAndISRStr) match {
+ case Some(m) =>
+ val leader = m.asInstanceOf[Map[String, String]].get("leader").get.toInt
+ val epoch = m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
+ val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
+ val ISR = Utils.getCSVList(ISRString).map(r => r.toInt)
+ val zkPathVersion = stat.getVersion
+ debug("Leader %d, Epoch %d, isr %s, zk path version %d for topic %s and partition %d".format(leader, epoch, ISR.toString(), zkPathVersion, topic, partition))
+ Some(LeaderAndISR(leader, epoch, ISR.toList, zkPathVersion))
+ case None => None
+ }
+ }
}
- def getAllBrokersInCluster(zkClient: ZkClient): Seq[Broker] = {
- val brokerIds = ZkUtils.getChildren(zkClient, ZkUtils.BrokerIdsPath).sorted
- getBrokerInfoFromIds(zkClient, brokerIds.map(b => b.toInt))
- }
def getLeaderForPartition(zkClient: ZkClient, topic: String, partition: Int): Option[Int] = {
- val leaderAndEpoch = readDataMaybeNull(zkClient, getTopicPartitionLeaderPath(topic, partition.toString))
- if(leaderAndEpoch == null) None
+ val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
+ if(leaderAndISR == null) None
else {
- val leaderAndEpochInfo = leaderAndEpoch.split(";")
- Some(leaderAndEpochInfo.head.toInt)
+ JSON.parseFull(leaderAndISR) match {
+ case Some(m) =>
+ Some(m.asInstanceOf[Map[String, String]].get("leader").get.toInt)
+ case None => None
+ }
}
}
@@ -99,51 +110,53 @@ object ZkUtils extends Logging {
* leader fails after updating epoch in the leader path and before updating epoch in the ISR path, effectively some
* other broker will retry becoming leader with the same new epoch value.
*/
- def getEpochForPartition(client: ZkClient, topic: String, partition: Int): Int = {
- val lastKnownEpoch = try {
- val isrAndEpoch = readData(client, getTopicPartitionInSyncPath(topic, partition.toString))
- if(isrAndEpoch != null) {
- val isrAndEpochInfo = isrAndEpoch.split(";")
- if(isrAndEpochInfo.last.isEmpty)
- throw new NoEpochForPartitionException("No epoch in ISR path for topic %s partition %d is empty".format(topic, partition))
- else
- isrAndEpochInfo.last.toInt
- }else {
- throw new NoEpochForPartitionException("ISR path for topic %s partition %d is empty".format(topic, partition))
+ def getEpochForPartition(zkClient: ZkClient, topic: String, partition: Int): Int = {
+ val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
+ if(leaderAndISR != null) {
+ val epoch = JSON.parseFull(leaderAndISR) match {
+ case None => throw new NoEpochForPartitionException("No epoch, leaderAndISR data for topic %s partition %d is invalid".format(topic, partition))
+ case Some(m) =>
+ m.asInstanceOf[Map[String, String]].get("leaderEpoch").get.toInt
}
- } catch {
- case e: ZkNoNodeException =>
- throw new NoEpochForPartitionException("No epoch since leader never existed for topic %s partition %d".format(topic, partition))
- case e1 => throw e1
+ epoch
}
- lastKnownEpoch
+ else
+ throw new NoEpochForPartitionException("No epoch, ISR path for topic %s partition %d is empty".format(topic, partition))
}
/**
- * Gets the assigned replicas (AR) for a specific topic and partition
+ * Gets the in-sync replicas (ISR) for a specific topic and partition
*/
- def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[String] = {
- val topicAndPartitionAssignment = getPartitionAssignmentForTopics(zkClient, List(topic).iterator)
- topicAndPartitionAssignment.get(topic) match {
- case Some(partitionAssignment) => partitionAssignment.get(partition.toString) match {
- case Some(replicaList) => replicaList
- case None => Seq.empty[String]
+ def getInSyncReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
+ val leaderAndISR = readDataMaybeNull(zkClient, getTopicPartitionLeaderAndISRPath(topic, partition))._1
+ if(leaderAndISR == null) Seq.empty[Int]
+ else {
+ JSON.parseFull(leaderAndISR) match {
+ case Some(m) =>
+ val ISRString = m.asInstanceOf[Map[String, String]].get("ISR").get
+ Utils.getCSVList(ISRString).map(r => r.toInt)
+ case None => Seq.empty[Int]
}
- case None => Seq.empty[String]
}
}
/**
- * Gets the in-sync replicas (ISR) for a specific topic and partition
+ * Gets the assigned replicas (AR) for a specific topic and partition
*/
- def getInSyncReplicasForPartition(client: ZkClient, topic: String, partition: Int): Seq[Int] = {
- val replicaListAndEpochString = readDataMaybeNull(client, getTopicPartitionInSyncPath(topic, partition.toString))
- if(replicaListAndEpochString == null)
+ def getReplicasForPartition(zkClient: ZkClient, topic: String, partition: Int): Seq[Int] = {
+ val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+ val assignedReplicas = if (jsonPartitionMap == null) {
Seq.empty[Int]
- else {
- val replicasAndEpochInfo = replicaListAndEpochString.split(";")
- Utils.getCSVList(replicasAndEpochInfo.head).map(r => r.toInt)
+ } else {
+ JSON.parseFull(jsonPartitionMap) match {
+ case Some(m) => m.asInstanceOf[Map[String, List[String]]].get(partition.toString) match {
+ case None => Seq.empty[Int]
+ case Some(seq) => seq.map(_.toInt)
+ }
+ case None => Seq.empty[Int]
+ }
}
+ assignedReplicas
}
def isPartitionOnBroker(zkClient: ZkClient, topic: String, partition: Int, brokerId: Int): Boolean = {
@@ -152,25 +165,7 @@ object ZkUtils extends Logging {
replicas.contains(brokerId.toString)
}
- def tryToBecomeLeaderForPartition(client: ZkClient, topic: String, partition: Int, brokerId: Int): Option[(Int, Seq[Int])] = {
- try {
- // NOTE: first increment epoch, then become leader
- val newEpoch = incrementEpochForPartition(client, topic, partition, brokerId)
- createEphemeralPathExpectConflict(client, getTopicPartitionLeaderPath(topic, partition.toString),
- "%d;%d".format(brokerId, newEpoch))
- val currentISR = getInSyncReplicasForPartition(client, topic, partition)
- val updatedISR = if(currentISR.size == 0) List(brokerId) else currentISR
- updatePersistentPath(client, getTopicPartitionInSyncPath(topic, partition.toString),
- "%s;%d".format(updatedISR.mkString(","), newEpoch))
- info("Elected broker %d with epoch %d to be leader for topic %s partition %d".format(brokerId, newEpoch, topic, partition))
- Some(newEpoch, updatedISR)
- } catch {
- case e: ZkNodeExistsException => error("Leader exists for topic %s partition %d".format(topic, partition)); None
- case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe); None
- }
- }
-
- def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int) = {
+ def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int, leader: Int): Int = {
// read previous epoch, increment it and write it to the leader path and the ISR path.
val epoch = try {
Some(getEpochForPartition(client, topic, partition))
@@ -198,15 +193,12 @@ object ZkUtils extends Logging {
createEphemeralPathExpectConflict(zkClient, brokerIdPath, broker.getZKString)
} catch {
case e: ZkNodeExistsException =>
- throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " +
- "indicates that you either have configured a brokerid that is already in use, or " +
- "else you have shutdown this broker and restarted it faster than the zookeeper " +
- "timeout so it appears to be re-registering.")
+ throw new RuntimeException("A broker is already registered on the path " + brokerIdPath + ". This probably " + "indicates that you either have configured a brokerid that is already in use, or " + "else you have shutdown this broker and restarted it faster than the zookeeper " + "timeout so it appears to be re-registering.")
}
info("Registering broker " + brokerIdPath + " succeeded with " + broker)
}
- def getConsumerPartitionOwnerPath(group: String, topic: String, partition: String): String = {
+ def getConsumerPartitionOwnerPath(group: String, topic: String, partition: Int): String = {
val topicDirs = new ZKGroupTopicDirs(group, topic)
topicDirs.consumerOwnerDir + "/" + partition
}
@@ -254,7 +246,7 @@ object ZkUtils extends Logging {
// this can happen when there is connection loss; make sure the data is what we intend to write
var storedData: String = null
try {
- storedData = readData(client, path)
+ storedData = readData(client, path)._1
} catch {
case e1: ZkNoNodeException => // the node disappeared; treat as if node existed and let caller handles this
case e2 => throw e2
@@ -292,17 +284,24 @@ object ZkUtils extends Logging {
/**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
+ * Return the updated path zkVersion
*/
- def updatePersistentPath(client: ZkClient, path: String, data: String): Unit = {
+ def updatePersistentPath(client: ZkClient, path: String, data: String): Int = {
+ var stat: Stat = null
try {
- client.writeData(path, data)
+ stat = client.writeData(path, data)
+ return stat.getVersion
} catch {
case e: ZkNoNodeException => {
createParentPath(client, path)
try {
client.createPersistent(path, data)
+ // When the new path is created, its zkVersion always starts from 0
+ return 0
} catch {
- case e: ZkNodeExistsException => client.writeData(path, data)
+ case e: ZkNodeExistsException =>
+ stat = client.writeData(path, data)
+ return stat.getVersion
case e2 => throw e2
}
}
@@ -311,6 +310,22 @@ object ZkUtils extends Logging {
}
/**
+ * Conditional update the persistent path data, return (true, newVersion) if it succeeds, otherwise (the path doesn't
+ * exist, the current version is not the expected version, etc.) return (false, -1)
+ */
+ def conditionalUpdatePersistentPath(client: ZkClient, path: String, data: String, expectVersion: Int): (Boolean, Int) = {
+ try {
+ val stat = client.writeData(path, data, expectVersion)
+ info("Conditional update the zkPath %s with expected version %d succeed and return the new version: %d".format(path, expectVersion, stat.getVersion))
+ (true, stat.getVersion)
+ } catch {
+ case e: Exception =>
+ info("Conditional update the zkPath %s with expected version %d failed".format(path, expectVersion))
+ (false, -1)
+ }
+ }
+
+ /**
* Update the value of a persistent node with the given path and data.
* create parrent directory if necessary. Never throw NodeExistException.
*/
@@ -349,12 +364,23 @@ object ZkUtils extends Logging {
}
}
- def readData(client: ZkClient, path: String): String = {
- client.readData(path)
+ def readData(client: ZkClient, path: String): (String, Stat) = {
+ val stat: Stat = new Stat()
+ val dataStr: String = client.readData(path, stat)
+ (dataStr, stat)
}
- def readDataMaybeNull(client: ZkClient, path: String): String = {
- client.readData(path, true)
+ def readDataMaybeNull(client: ZkClient, path: String): (String, Stat) = {
+ val stat: Stat = new Stat()
+ var dataStr: String = null
+ try{
+ dataStr = client.readData(path, stat)
+ return (dataStr, stat)
+ } catch {
+ case e: ZkNoNodeException =>
+ return (null, stat)
+ case e2 => throw e2
+ }
}
def getChildren(client: ZkClient, path: String): Seq[String] = {
@@ -366,7 +392,6 @@ object ZkUtils extends Logging {
def getChildrenParentMayNotExist(client: ZkClient, path: String): Seq[String] = {
import scala.collection.JavaConversions._
// triggers implicit conversion from java list to scala Seq
-
try {
client.getChildren(path)
} catch {
@@ -388,32 +413,77 @@ object ZkUtils extends Logging {
val cluster = new Cluster
val nodes = getChildrenParentMayNotExist(zkClient, BrokerIdsPath)
for (node <- nodes) {
- val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)
+ val brokerZKString = readData(zkClient, BrokerIdsPath + "/" + node)._1
cluster.add(Broker.createBroker(node.toInt, brokerZKString))
}
cluster
}
- def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Map[String, List[String]]] = {
- val ret = new mutable.HashMap[String, Map[String, List[String]]]()
+ def getReplicaAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), Seq[Int]] = {
+ val ret = new mutable.HashMap[(String, Int), Seq[Int]]
topics.foreach{ topic =>
- val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))
+ val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
+ if (jsonPartitionMap != null) {
+ JSON.parseFull(jsonPartitionMap) match {
+ case Some(m) =>
+ val replicaMap = m.asInstanceOf[Map[String, Seq[String]]]
+ for((partition, replicas) <- replicaMap){
+ ret.put((topic, partition.toInt), replicas.map(_.toInt))
+ debug("Replicas assigned to topic [%s], partition [%s] are [%s]".format(topic, partition, replicas))
+ }
+ case None =>
+ }
+ }
+ }
+ ret
+ }
+
+ def getPartitionLeaderAndISRForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[(String, Int), LeaderAndISR] = {
+ val ret = new mutable.HashMap[(String, Int), LeaderAndISR]
+ val partitionsForTopics = getPartitionsForTopics(zkClient, topics)
+ for((topic, partitions) <- partitionsForTopics){
+ for(partition <- partitions){
+ val leaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition.toInt)
+ if(leaderAndISROpt.isDefined)
+ ret.put((topic, partition.toInt), leaderAndISROpt.get)
+ }
+ }
+ ret
+ }
+
+ def getPartitionAssignmentForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, collection.Map[Int, Seq[Int]]] = {
+ val ret = new mutable.HashMap[String, Map[Int, Seq[Int]]]()
+ topics.foreach{ topic =>
+ val jsonPartitionMap = readDataMaybeNull(zkClient, getTopicPath(topic))._1
val partitionMap = if (jsonPartitionMap == null) {
- Map[String, List[String]]()
+ Map[Int, Seq[Int]]()
} else {
JSON.parseFull(jsonPartitionMap) match {
- case Some(m) => m.asInstanceOf[Map[String, List[String]]]
- case None => Map[String, List[String]]()
+ case Some(m) =>
+ val m1 = m.asInstanceOf[Map[String, Seq[String]]]
+ m1.map(p => (p._1.toInt, p._2.map(_.toInt)))
+ case None => Map[Int, Seq[Int]]()
}
}
debug("partition map for /brokers/topics/%s is %s".format(topic, partitionMap))
ret += (topic -> partitionMap)
+ }
+ ret
+ }
+
+ def getReplicaAssignmentFromPartitionAssignment(topicPartitionAssignment: mutable.Map[String, collection.Map[Int, Seq[Int]]]): mutable.Map[(String, Int), Seq[Int]] = {
+ val ret = new mutable.HashMap[(String, Int), Seq[Int]]
+ for((topic, partitionAssignment) <- topicPartitionAssignment){
+ for((partition, replicaAssignment) <- partitionAssignment){
+ ret.put((topic, partition), replicaAssignment)
+ }
}
ret
}
- def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[String]] = {
- getPartitionAssignmentForTopics(zkClient, topics).map{ topicAndPartitionMap =>
+ def getPartitionsForTopics(zkClient: ZkClient, topics: Iterator[String]): mutable.Map[String, Seq[Int]] = {
+ getPartitionAssignmentForTopics(zkClient, topics).map
+ { topicAndPartitionMap =>
val topic = topicAndPartitionMap._1
val partitionMap = topicAndPartitionMap._2
debug("partition assignment of /brokers/topics/%s is %s".format(topic, partitionMap))
@@ -421,14 +491,20 @@ object ZkUtils extends Logging {
}
}
- def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[String, Seq[Int]] = {
+ def getPartitionsAssignedToBroker(zkClient: ZkClient, topics: Seq[String], brokerId: Int): Map[(String, Int), Seq[Int]] = {
+ val ret = new mutable.HashMap[(String, Int), Seq[Int]]
val topicsAndPartitions = getPartitionAssignmentForTopics(zkClient, topics.iterator)
- topicsAndPartitions.map{ topicAndPartitionMap =>
- val topic = topicAndPartitionMap._1
- val partitionMap = topicAndPartitionMap._2
- val relevantPartitions = partitionMap.filter( m => m._2.contains(brokerId.toString) )
- (topic -> relevantPartitions.keySet.map(_.toInt).toSeq)
+ topicsAndPartitions.map
+ {
+ topicAndPartitionMap =>
+ val topic = topicAndPartitionMap._1
+ val partitionMap = topicAndPartitionMap._2
+ val relevantPartitionsMap = partitionMap.filter( m => m._2.contains(brokerId) )
+ for((relevantPartition, replicaAssignment) <- relevantPartitionsMap){
+ ret.put((topic, relevantPartition), replicaAssignment)
+ }
}
+ ret
}
def deletePartition(zkClient : ZkClient, brokerId: Int, topic: String) {
@@ -446,8 +522,7 @@ object ZkUtils extends Logging {
def getConsumerTopicMaps(zkClient: ZkClient, group: String): Map[String, TopicCount] = {
val dirs = new ZKGroupDirs(group)
val consumersInGroup = getConsumersInGroup(zkClient, group)
- val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,
- ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId), zkClient))
+ val topicCountMaps = consumersInGroup.map(consumerId => TopicCount.constructTopicCount(consumerId,ZkUtils.readData(zkClient, dirs.consumerRegistryDir + "/" + consumerId)._1, zkClient))
consumersInGroup.zip(topicCountMaps).toMap
}
@@ -470,8 +545,7 @@ object ZkUtils extends Logging {
consumersPerTopicMap
}
- def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] =
- brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readData(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)) )
+ def getBrokerInfoFromIds(zkClient: ZkClient, brokerIds: Seq[Int]): Seq[Broker] = brokerIds.map( bid => Broker.createBroker(bid, ZkUtils.readDataMaybeNull(zkClient, ZkUtils.BrokerIdsPath + "/" + bid)._1))
def getAllTopics(zkClient: ZkClient): Seq[String] = {
val topics = ZkUtils.getChildrenParentMayNotExist(zkClient, BrokerTopicsPath)
@@ -479,17 +553,55 @@ object ZkUtils extends Logging {
else topics
}
+ def incrementEpochForPartition(client: ZkClient, topic: String, partition: Int) = {
+ // read previous epoch, increment it and write it to the leader path and the ISR path.
+ val epoch = try {
+ Some(getEpochForPartition(client, topic, partition))
+ }catch {
+ case e: NoEpochForPartitionException => None
+ case e1 => throw e1
+ }
+ val newEpoch = epoch match {
+ case Some(partitionEpoch) =>
+ debug("Existing epoch for topic %s partition %d is %d".format(topic, partition, partitionEpoch))
+ partitionEpoch + 1
+ case None =>
+ // this is the first time leader is elected for this partition. So set epoch to 1
+ debug("First epoch is 1 for topic %s partition %d".format(topic, partition))
+ LeaderAndISR.initialLeaderEpoch
+ }
+ newEpoch
+ }
}
-class LeaderExistsListener(topic: String, partition: Int, leaderLock: ReentrantLock, leaderExists: Condition) extends IZkDataListener {
+
+
+
+class LeaderExistsOrChangedListener(topic: String,
+ partition: Int,
+ leaderLock: ReentrantLock,
+ leaderExistsOrChanged: Condition,
+ oldLeaderOpt: Option[Int] = None,
+ zkClient: ZkClient = null) extends IZkDataListener with Logging {
@throws(classOf[Exception])
def handleDataChange(dataPath: String, data: Object) {
val t = dataPath.split("/").takeRight(3).head
val p = dataPath.split("/").takeRight(2).head.toInt
leaderLock.lock()
try {
- if(t == topic && p == partition)
- leaderExists.signal()
+ if(t == topic && p == partition){
+ if(oldLeaderOpt == None){
+ trace("In leader existence listener on partition [%s, %d], leader has been created".format(topic, partition))
+ leaderExistsOrChanged.signal()
+ }
+ else {
+ val newLeaderOpt = ZkUtils.getLeaderForPartition(zkClient, t, p)
+ if(newLeaderOpt.isDefined && newLeaderOpt.get != oldLeaderOpt.get){
+ trace("In leader change listener on partition [%s, %d], leader has been moved from %d to %d".format(topic, partition, oldLeaderOpt.get, newLeaderOpt.get))
+ leaderExistsOrChanged.signal()
+ }
+ }
+ }
}
finally {
leaderLock.unlock()
@@ -500,12 +612,11 @@ class LeaderExistsListener(topic: String
def handleDataDeleted(dataPath: String) {
leaderLock.lock()
try {
- leaderExists.signal()
+ leaderExistsOrChanged.signal()
}finally {
leaderLock.unlock()
}
}
-
}
object ZKStringSerializer extends ZkSerializer {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala Wed Aug 1 16:13:59 2012
@@ -95,8 +95,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
// wait to make sure the topic and partition have a leader for the successful case
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
@@ -128,8 +128,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -152,8 +152,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -181,8 +181,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages1_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages1 = (sentMessages1_1 ++ sentMessages1_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(
@@ -213,8 +213,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages2_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages2 = (sentMessages2_1 ++ sentMessages2_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages2_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages2_2 = getMessages(nMessages, topicMessageStreams2)
@@ -237,8 +237,8 @@ class ZookeeperConsumerConnectorTest ext
val sentMessages3_2 = sendMessagesToBrokerPartition(configs.last, topic, 1, nMessages, GZIPCompressionCodec)
val sentMessages3 = (sentMessages3_1 ++ sentMessages3_2).sortWith((s,t) => s.checksum < t.checksum)
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val receivedMessages3_1 = getMessages(nMessages, topicMessageStreams1)
val receivedMessages3_2 = getMessages(nMessages, topicMessageStreams2)
@@ -317,8 +317,8 @@ class ZookeeperConsumerConnectorTest ext
val consumerConfig = new ConsumerConfig(TestUtils.createConsumerProperties(zkConnect, group, consumer1))
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
val zkConsumerConnector =
new ZookeeperConsumerConnector(consumerConfig, true)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/controller/ControllerBasicTest.scala Wed Aug 1 16:13:59 2012
@@ -25,7 +25,8 @@ import kafka.server.{KafkaServer, KafkaC
import kafka.api._
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
-import kafka.utils.{ControllerTestUtils, ZkUtils, TestUtils}
+import kafka.admin.CreateTopicCommand
+import kafka.utils.{ZkUtils, ControllerTestUtils, TestUtils}
class ControllerBasicTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -36,11 +37,13 @@ class ControllerBasicTest extends JUnit3
override def setUp() {
super.setUp()
brokers = configs.map(config => TestUtils.createServer(config))
+ CreateTopicCommand.createTopic(zkClient, "test1", 1, 4, "0:1:2:3")
+ CreateTopicCommand.createTopic(zkClient, "test2", 1, 4, "0:1:2:3")
}
override def tearDown() {
- super.tearDown()
brokers.foreach(_.shutdown())
+ super.tearDown()
}
def testControllerFailOver(){
@@ -48,36 +51,37 @@ class ControllerBasicTest extends JUnit3
brokers(1).shutdown()
brokers(3).shutdown()
assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
- ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime))
- var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+ ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime))
+ var curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
assertEquals("Controller should move to broker 2", "2", curController)
+
brokers(1).startup()
brokers(2).shutdown()
assertTrue("Controller not elected", TestUtils.waitUntilTrue(() =>
- ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath) != null, zookeeper.tickTime))
- curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)
+ ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1 != null, zookeeper.tickTime))
+ curController = ZkUtils.readDataMaybeNull(zkClient, ZkUtils.ControllerPath)._1
assertEquals("Controller should move to broker 1", "1", curController)
}
def testControllerCommandSend(){
for(broker <- brokers){
if(broker.kafkaController.isActive){
- val leaderAndISRRequest = ControllerTestUtils.createSampleLeaderAndISRRequest()
- val stopReplicaRequest = ControllerTestUtils.createSampleStopReplicaRequest()
+ val leaderAndISRRequest = ControllerTestUtils.createTestLeaderAndISRRequest()
+ val stopReplicaRequest = ControllerTestUtils.createTestStopReplicaRequest()
val successCount: AtomicInteger = new AtomicInteger(0)
val countDownLatch: CountDownLatch = new CountDownLatch(8)
def compareLeaderAndISRResponseWithExpectedOne(response: RequestOrResponse){
- val expectedResponse = ControllerTestUtils.createSampleLeaderAndISRResponse()
+ val expectedResponse = ControllerTestUtils.createTestLeaderAndISRResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
}
def compareStopReplicaResponseWithExpectedOne(response: RequestOrResponse){
- val expectedResponse = ControllerTestUtils.createSampleStopReplicaResponse()
+ val expectedResponse = ControllerTestUtils.createTestStopReplicaResponse()
if(response.equals(expectedResponse))
successCount.addAndGet(1)
countDownLatch.countDown()
@@ -87,10 +91,10 @@ class ControllerBasicTest extends JUnit3
broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
- broker.kafkaController.sendRequest(0, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
- broker.kafkaController.sendRequest(1, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
- broker.kafkaController.sendRequest(2, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
- broker.kafkaController.sendRequest(3, stopReplicaRequest, compareStopReplicaResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(0, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(1, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(2, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
+ broker.kafkaController.sendRequest(3, leaderAndISRRequest, compareLeaderAndISRResponseWithExpectedOne)
countDownLatch.await()
assertEquals(successCount.get(), 8)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/BackwardsCompatibilityTest.scala Wed Aug 1 16:13:59 2012
@@ -57,7 +57,7 @@ class BackwardsCompatibilityTest extends
// test for reading data with magic byte 0
def testProtocolVersion0() {
CreateTopicCommand.createTopic(zkClient, topic, 0, 1, configs.head.brokerId.toString)
- TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
val lastOffset = simpleConsumer.getOffsetsBefore(topic, 0, OffsetRequest.LatestTime, 1)
var fetchOffset: Long = 0L
var messageCount: Int = 0
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala Wed Aug 1 16:13:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -35,27 +35,28 @@ import kafka.admin.CreateTopicCommand
class FetcherTest extends JUnit3Suite with KafkaServerTestHarness {
val numNodes = 1
- val configs =
+ val configs =
for(props <- TestUtils.createBrokerConfigs(numNodes))
- yield new KafkaConfig(props)
+ yield new KafkaConfig(props)
val messages = new mutable.HashMap[Int, Seq[Message]]
val topic = "topic"
val cluster = new Cluster(configs.map(c => new Broker(c.brokerId, c.brokerId.toString, "localhost", c.port)))
val shutdown = ZookeeperConsumerConnector.shutdownCommand
val queue = new LinkedBlockingQueue[FetchedDataChunk]
val topicInfos = configs.map(c => new PartitionTopicInfo(topic,
- c.brokerId,
- 0,
- queue,
- new AtomicLong(0),
- new AtomicLong(0),
- new AtomicInteger(0)))
-
+ c.brokerId,
+ 0,
+ queue,
+ new AtomicLong(0),
+ new AtomicLong(0),
+ new AtomicInteger(0)))
+
var fetcher: ConsumerFetcherManager = null
override def setUp() {
super.setUp
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, configs.head.brokerId.toString)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
fetcher = new ConsumerFetcherManager("consumer1", new ConsumerConfig(TestUtils.createConsumerProperties("", "", "")), zkClient)
fetcher.stopAllConnections()
fetcher.startConnections(topicInfos, cluster)
@@ -65,20 +66,20 @@ class FetcherTest extends JUnit3Suite wi
fetcher.shutdown()
super.tearDown
}
-
+
def testFetcher() {
val perNode = 2
var count = sendMessages(perNode)
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+
fetch(count)
assertQueueEmpty()
count = sendMessages(perNode)
fetch(count)
assertQueueEmpty()
}
-
+
def assertQueueEmpty(): Unit = assertEquals(0, queue.size)
-
+
def sendMessages(messagesPerNode: Int): Int = {
var count = 0
for(conf <- configs) {
@@ -91,7 +92,7 @@ class FetcherTest extends JUnit3Suite wi
}
count
}
-
+
def fetch(expected: Int) {
var count = 0
while(true) {
@@ -103,5 +104,5 @@ class FetcherTest extends JUnit3Suite wi
return
}
}
-
+
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala Wed Aug 1 16:13:59 2012
@@ -133,7 +133,7 @@ class LazyInitProducerTest extends JUnit
builder.addFetch(topic, 0, 0, 10000)
}
// wait until leader is elected
- topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500))
+ topics.foreach(topic => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500))
producer.send(produceList: _*)
// wait a bit for produced message to be available
@@ -158,7 +158,7 @@ class LazyInitProducerTest extends JUnit
builder.addFetch(topic, 0, 0, 10000)
}
// wait until leader is elected
- topics.foreach(topic => TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1500))
+ topics.foreach(topic => TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1500))
producer.send(produceList: _*)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala Wed Aug 1 16:13:59 2012
@@ -337,6 +337,7 @@ class PrimitiveApiTest extends JUnit3Sui
assertTrue("Topic new-topic not created after timeout", TestUtils.waitUntilTrue(() =>
AdminUtils.getTopicMetaDataFromZK(List(newTopic),
zkClient).head.errorCode != ErrorMapping.UnknownTopicCode, zookeeper.tickTime))
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, newTopic, 0, 500)
val fetchResponse = consumer.fetch(new FetchRequestBuilder().addFetch(newTopic, 0, 0, 10000).build())
assertFalse(fetchResponse.messageSet(newTopic, 0).iterator.hasNext)
}
@@ -348,7 +349,7 @@ class PrimitiveApiTest extends JUnit3Sui
def createSimpleTopicsAndAwaitLeader(zkClient: ZkClient, topics: Seq[String], brokerId: Int) {
for( topic <- topics ) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, brokerId.toString)
- TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
}
}
}
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Wed Aug 1 16:13:59 2012
@@ -117,7 +117,8 @@ class TopicMetadataTest extends JUnit3Su
// create the kafka request handler
val requestChannel = new RequestChannel(2, 5)
- val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper)
+ val apis = new KafkaApis(requestChannel, logManager, replicaManager, kafkaZookeeper, null,
+ null, null, null, 1)
// mock the receive API to return the request buffer as created above
val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/consumer/ZookeeperConsumerConnectorTest.scala Wed Aug 1 16:13:59 2012
@@ -54,8 +54,8 @@ class ZookeeperConsumerConnectorTest ext
// send some messages to each broker
val sentMessages1 = sendMessages(nMessages, "batch1")
- waitUntilLeaderIsElected(zkClient, topic, 0, 500)
- waitUntilLeaderIsElected(zkClient, topic, 1, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 1, 500)
// create a consumer
val consumerConfig1 = new ConsumerConfig(TestUtils.createConsumerProperties(zookeeperConnect, group, consumer1))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala Wed Aug 1 16:13:59 2012
@@ -118,7 +118,7 @@ class LogOffsetTest extends JUnit3Suite
// setup brokers in zookeeper as owners of partitions for this test
CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "1")
- TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 500)
var offsetChanged = false
for(i <- 1 to 14) {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Wed Aug 1 16:13:59 2012
@@ -27,9 +27,10 @@ import junit.framework.Assert._
class SocketServerTest extends JUnitSuite {
- val server: SocketServer = new SocketServer(port = TestUtils.choosePort,
- numProcessorThreads = 1,
- monitoringPeriodSecs = 30,
+ val server: SocketServer = new SocketServer(0,
+ port = TestUtils.choosePort,
+ numProcessorThreads = 1,
+ monitoringPeriodSecs = 30,
maxQueuedRequests = 50,
maxRequestSize = 50)
server.startup()
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Aug 1 16:13:59 2012
@@ -101,7 +101,7 @@ class ProducerTest extends JUnit3Suite w
// create topic with 1 partition and await leadership
CreateTopicCommand.createTopic(zkClient, "new-topic", 1, 2)
- TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
val producer1 = new Producer[String, String](config1)
val producer2 = new Producer[String, String](config2)
@@ -153,10 +153,10 @@ class ProducerTest extends JUnit3Suite w
// create topic
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0,0,0,0")
- TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
- TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 1, 500)
- TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 2, 500)
- TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 3, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 1, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 2, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 3, 500)
val config = new ProducerConfig(props)
val producer = new Producer[String, String](config)
@@ -182,6 +182,8 @@ class ProducerTest extends JUnit3Suite w
// restart server 1
server1.startup()
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
+
try {
// cross check if broker 1 got the messages
val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
@@ -209,7 +211,7 @@ class ProducerTest extends JUnit3Suite w
// create topics in ZK
CreateTopicCommand.createTopic(zkClient, "new-topic", 4, 2, "0:1,0:1,0:1,0:1")
- TestUtils.waitUntilLeaderIsElected(zkClient, "new-topic", 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "new-topic", 0, 500)
// do a simple test to make sure plumbing is okay
try {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala Wed Aug 1 16:13:59 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -159,9 +159,9 @@ class SyncProducerTest extends JUnit3Sui
// #2 - test that we get correct offsets when partition is owned by broker
CreateTopicCommand.createTopic(zkClient, "topic1", 1, 1)
- TestUtils.waitUntilLeaderIsElected(zkClient, "topic1", 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic1", 0, 500)
CreateTopicCommand.createTopic(zkClient, "topic3", 1, 1)
- TestUtils.waitUntilLeaderIsElected(zkClient, "topic3", 0, 500)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, "topic3", 0, 500)
val response2 = producer.send(request)
Assert.assertNotNull(response2)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Wed Aug 1 16:13:59 2012
@@ -39,9 +39,11 @@ class HighwatermarkPersistenceTest exten
val scheduler = new KafkaScheduler(2)
scheduler.startUp
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
replicaManager.startup()
- replicaManager.checkpointHighwaterMarks()
+ replicaManager.startHighWaterMarksCheckPointThread()
+ // sleep until flush ms
+ Thread.sleep(configs.head.defaultFlushIntervalMs)
var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
assertEquals(0L, fooPartition0Hw)
val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
@@ -80,7 +82,7 @@ class HighwatermarkPersistenceTest exten
val scheduler = new KafkaScheduler(2)
scheduler.startUp
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+ val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
replicaManager.startup()
replicaManager.checkpointHighwaterMarks()
var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Wed Aug 1 16:13:59 2012
@@ -100,7 +100,7 @@ class ISRExpirationTest extends JUnit3Su
// create kafka scheduler
val scheduler = new KafkaScheduler(2)
// create replica manager
- val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler)
+ val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler, null)
try {
val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
// create leader log
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Wed Aug 1 16:13:59 2012
@@ -25,7 +25,6 @@ import junit.framework.Assert._
import kafka.utils.{ZkUtils, Utils, TestUtils}
class LeaderElectionTest extends JUnit3Suite with ZooKeeperTestHarness {
-
val brokerId1 = 0
val brokerId2 = 1
@@ -34,23 +33,23 @@ class LeaderElectionTest extends JUnit3S
val configProps1 = TestUtils.createBrokerConfig(brokerId1, port1)
val configProps2 = TestUtils.createBrokerConfig(brokerId2, port2)
-
+ var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
override def setUp() {
super.setUp()
+ // start both servers
+ val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
+ val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
+ servers ++= List(server1, server2)
}
override def tearDown() {
+ servers.map(server => server.shutdown())
+ servers.map(server => Utils.rm(server.config.logDir))
super.tearDown()
}
- def testLeaderElectionWithCreateTopic {
- var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
- // start both servers
- val server1 = TestUtils.createServer(new KafkaConfig(configProps1))
- val server2 = TestUtils.createServer(new KafkaConfig(configProps2))
-
- servers ++= List(server1, server2)
+ def testLeaderElectionAndEpoch {
// start 2 brokers
val topic = "new-topic"
val partitionId = 0
@@ -59,62 +58,39 @@ class LeaderElectionTest extends JUnit3S
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
// wait until leader is elected
- var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
- assertTrue("Leader should get elected", leader.isDefined)
+ val leader1 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
+ val leaderEpoch1 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+ debug("leader Epoc: " + leaderEpoch1)
+ debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+ assertTrue("Leader should get elected", leader1.isDefined)
// NOTE: this is to avoid transient test failures
- assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
+ assertTrue("Leader could be broker 0 or broker 1", (leader1.getOrElse(-1) == 0) || (leader1.getOrElse(-1) == 1))
+ assertEquals("First epoch value should be 0", 0, leaderEpoch1)
// kill the server hosting the preferred replica
- server1.shutdown()
-
- // check if leader moves to the other server
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 1500)
- assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
-
- val leaderPath = zkClient.getChildren(ZkUtils.getTopicPartitionPath(topic, "0"))
- // bring the preferred replica back
- servers.head.startup()
-
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
- assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
-
- // shutdown current leader (broker 1)
servers.last.shutdown()
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
-
- // test if the leader is the preferred replica
- assertEquals("Leader must be preferred replica on broker 0", 0, leader.getOrElse(-1))
- // shutdown the servers and delete data hosted on them
- servers.map(server => server.shutdown())
- servers.map(server => Utils.rm(server.config.logDir))
- }
-
- // Assuming leader election happens correctly, test if epoch changes as expected
- def testEpoch() {
- // keep switching leaders to see if epoch changes correctly
- val topic = "new-topic"
- val partitionId = 0
-
- // setup 2 brokers in ZK
- val brokers = TestUtils.createBrokersInZk(zkClient, List(brokerId1, brokerId2))
-
- // create topic with 1 partition, 2 replicas, one on each broker
- CreateTopicCommand.createTopic(zkClient, topic, 1, 2, "0:1")
-
- var newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
- assertTrue("Broker 0 should become leader", newLeaderEpoch.isDefined)
- assertEquals("First epoch value should be 1", 1, newLeaderEpoch.get._1)
-
- ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
- newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 1)
- assertTrue("Broker 1 should become leader", newLeaderEpoch.isDefined)
- assertEquals("Second epoch value should be 2", 2, newLeaderEpoch.get._1)
-
- ZkUtils.deletePath(zkClient, ZkUtils.getTopicPartitionLeaderPath(topic, partitionId.toString))
- newLeaderEpoch = ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic, partitionId, 0)
- assertTrue("Broker 0 should become leader again", newLeaderEpoch.isDefined)
- assertEquals("Third epoch value should be 3", 3, newLeaderEpoch.get._1)
-
- TestUtils.deleteBrokersInZk(zkClient, List(brokerId1, brokerId2))
+ // check if leader moves to the other server
+ val leader2 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader1.get == 0) None else leader1)
+ val leaderEpoch2 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+ debug("Leader is elected to be: %s".format(leader1.getOrElse(-1)))
+ debug("leader Epoc: " + leaderEpoch2)
+ assertEquals("Leader must move to broker 0", 0, leader2.getOrElse(-1))
+ if(leader1.get == leader2.get)
+ assertEquals("Second epoch value should be " + leaderEpoch1, leaderEpoch1, leaderEpoch2)
+ else
+ assertEquals("Second epoch value should be %d".format(leaderEpoch1+1) , leaderEpoch1+1, leaderEpoch2)
+
+ servers.last.startup()
+ servers.head.shutdown()
+ Thread.sleep(zookeeper.tickTime)
+ val leader3 = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 1500, if(leader2.get == 1) None else leader2)
+ val leaderEpoch3 = ZkUtils.getEpochForPartition(zkClient, topic, partitionId)
+ debug("leader Epoc: " + leaderEpoch3)
+ debug("Leader is elected to be: %s".format(leader3.getOrElse(-1)))
+ assertEquals("Leader must return to 1", 1, leader3.getOrElse(-1))
+ if(leader2.get == leader3.get)
+ assertEquals("Second epoch value should be " + leaderEpoch2, leaderEpoch2, leaderEpoch3)
+ else
+ assertEquals("Second epoch value should be %d".format(leaderEpoch2+1) , leaderEpoch2+1, leaderEpoch3)
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Wed Aug 1 16:13:59 2012
@@ -57,7 +57,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -89,7 +89,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -103,13 +103,13 @@ class LogRecoveryTest extends JUnit3Suit
assertEquals(30L, hwFile1.read(topic, 0))
// check if leader moves to the other server
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
// bring the preferred replica back
server1.startup()
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
assertEquals(30L, hwFile1.read(topic, 0))
@@ -118,7 +118,7 @@ class LogRecoveryTest extends JUnit3Suit
assertEquals(30L, hwFile2.read(topic, 0))
server2.startup()
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
assertEquals("Leader must remain on broker 0", 0, leader.getOrElse(-1))
sendMessages()
@@ -159,7 +159,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -203,7 +203,7 @@ class LogRecoveryTest extends JUnit3Suit
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(_.brokerId).mkString(":"))
// wait until leader is elected
- var leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ var leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500)
assertTrue("Leader should get elected", leader.isDefined)
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
@@ -220,7 +220,7 @@ class LogRecoveryTest extends JUnit3Suit
server2.startup()
// check if leader moves to the other server
- leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
+ leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
assertEquals(60L, hwFile1.read(topic, 0))
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala Wed Aug 1 16:13:59 2012
@@ -51,7 +51,7 @@ class ReplicaFetchTest extends JUnit3Sui
// create a topic and partition and await leadership
for (topic <- List(topic1,topic2)) {
CreateTopicCommand.createTopic(zkClient, topic, 1, 2, configs.map(c => c.brokerId).mkString(":"))
- TestUtils.waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+ TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
}
// send test messages to leader
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala Wed Aug 1 16:13:59 2012
@@ -73,7 +73,7 @@ class RequestPurgatoryTest {
assertTrue("Time for expiration was about 20ms", (elapsed - expiration).abs < 10L)
}
- class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
+ class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest]("Mock Request Purgatory") {
val satisfied = mutable.Set[DelayedRequest]()
val expired = mutable.Set[DelayedRequest]()
def awaitExpiration(delayed: DelayedRequest) = {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1368092&r1=1368091&r2=1368092&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Wed Aug 1 16:13:59 2012
@@ -72,7 +72,7 @@ class ServerShutdownTest extends JUnit3S
val server = new KafkaServer(config)
server.startup()
- waitUntilLeaderIsElected(zkClient, topic, 0, 1000)
+ waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
var fetchedMessage: ByteBufferMessageSet = null
while(fetchedMessage == null || fetchedMessage.validBytes == 0) {