You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/07/31 17:33:31 UTC
svn commit: r1367619 [4/4] - in /incubator/kafka/branches/0.8/core/src:
main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/
main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/
main/scala/kafka/network/ main/sca...
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1367619&r1=1367618&r2=1367619&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Jul 31 15:33:29 2012
@@ -5,7 +5,7 @@
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
@@ -43,33 +43,33 @@ import collection.mutable.{Map, Set}
* Utility functions to help with testing
*/
object TestUtils extends Logging {
-
+
val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
val Digits = "0123456789"
val LettersAndDigits = Letters + Digits
-
+
/* A consistent random number generator to make tests repeatable */
val seededRandom = new Random(192348092834L)
val random = new Random()
-
+
/**
* Choose a number of random available ports
*/
def choosePorts(count: Int): List[Int] = {
- val sockets =
+ val sockets =
for(i <- 0 until count)
- yield new ServerSocket(0)
+ yield new ServerSocket(0)
val socketList = sockets.toList
val ports = socketList.map(_.getLocalPort)
socketList.map(_.close)
ports
}
-
+
/**
* Choose an available port
*/
def choosePort(): Int = choosePorts(1).head
-
+
/**
* Create a temporary directory
*/
@@ -80,7 +80,7 @@ object TestUtils extends Logging {
f.deleteOnExit()
f
}
-
+
/**
* Create a temporary file
*/
@@ -89,12 +89,12 @@ object TestUtils extends Logging {
f.deleteOnExit()
f
}
-
+
/**
* Create a temporary file and return an open file channel for this file
*/
def tempChannel(): FileChannel = new RandomAccessFile(tempFile(), "rw").getChannel()
-
+
/**
* Create a kafka server instance with appropriate test settings
* USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
@@ -105,15 +105,15 @@ object TestUtils extends Logging {
server.startup()
server
}
-
+
/**
* Create a test config for the given node id
*/
def createBrokerConfigs(numConfigs: Int): List[Properties] = {
for((port, node) <- choosePorts(numConfigs).zipWithIndex)
- yield createBrokerConfig(node, port)
+ yield createBrokerConfig(node, port)
}
-
+
/**
* Create a test config for the given node id
*/
@@ -127,7 +127,7 @@ object TestUtils extends Logging {
props.put("replica.socket.timeout.ms", "1500")
props
}
-
+
/**
* Create a test config for a consumer
*/
@@ -150,9 +150,9 @@ object TestUtils extends Logging {
* Wrap the message in a message set
* @param payload The bytes of the message
*/
- def singleMessageSet(payload: Array[Byte]) =
+ def singleMessageSet(payload: Array[Byte]) =
new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(payload))
-
+
/**
* Generate an array of random bytes
* @param numBytes The size of the array
@@ -162,7 +162,7 @@ object TestUtils extends Logging {
seededRandom.nextBytes(bytes)
bytes
}
-
+
/**
* Generate a random string of letters and digits of the given length
* @param len The length of the string
@@ -183,7 +183,7 @@ object TestUtils extends Logging {
for(i <- 0 until b1.limit - b1.position)
assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position + i))
}
-
+
/**
* Throw an exception if the two iterators are of differing lengths or contain
* different messages on their Nth element
@@ -197,28 +197,28 @@ object TestUtils extends Logging {
// check if the expected iterator is longer
if (expected.hasNext) {
- var length1 = length;
- while (expected.hasNext) {
- expected.next
- length1 += 1
- }
- assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
+ var length1 = length;
+ while (expected.hasNext) {
+ expected.next
+ length1 += 1
+ }
+ assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
}
// check if the actual iterator was longer
if (actual.hasNext) {
- var length2 = length;
- while (actual.hasNext) {
- actual.next
- length2 += 1
- }
- assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true);
+ var length2 = length;
+ while (actual.hasNext) {
+ actual.next
+ length2 += 1
+ }
+ assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true);
}
}
/**
* Throw an exception if an iterable has different length than expected
- *
+ *
*/
def checkLength[T](s1: Iterator[T], expectedLength:Int) {
var n = 0
@@ -269,7 +269,7 @@ object TestUtils extends Logging {
* Create a hexidecimal string for the given bytes
*/
def hexString(bytes: Array[Byte]): String = hexString(ByteBuffer.wrap(bytes))
-
+
/**
* Create a hexidecimal string for the given bytes
*/
@@ -279,7 +279,7 @@ object TestUtils extends Logging {
builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
builder.toString
}
-
+
/**
* Create a producer for the given host and port
*/
@@ -340,7 +340,7 @@ object TestUtils extends Logging {
buffer += ("msg" + i)
buffer
}
-
+
/**
* Create a wired format request based on simple basic information
*/
@@ -381,34 +381,55 @@ object TestUtils extends Logging {
}
def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
- leaderPerPartitionMap.foreach(leaderForPartition => ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic,
- leaderForPartition._1, leaderForPartition._2))
+ leaderPerPartitionMap.foreach
+ {
+ leaderForPartition => {
+ val partition = leaderForPartition._1
+ val leader = leaderForPartition._2
+ try{
+ val currentLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition)
+ var newLeaderAndISR: LeaderAndISR = null
+ if(currentLeaderAndISROpt == None)
+ newLeaderAndISR = new LeaderAndISR(leader, List(leader))
+ else{
+ newLeaderAndISR = currentLeaderAndISROpt.get
+ newLeaderAndISR.leader = leader
+ newLeaderAndISR.leaderEpoch += 1
+ newLeaderAndISR.zkVersion += 1
+ }
+ ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath( topic, partition), newLeaderAndISR.toString)
+ } catch {
+ case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
+ }
+ }
+ }
}
- def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
+ def waitUntilLiveLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
+ // If the current leader is alive, just return it
+ val curLeaderOpt = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+ if(curLeaderOpt.isDefined && ZkUtils.pathExists(zkClient, ZkUtils.getBrokerPath(curLeaderOpt.get)))
+ return curLeaderOpt
+
val leaderLock = new ReentrantLock()
- val leaderExists = leaderLock.newCondition()
+ val liveLeaderIsElected = leaderLock.newCondition()
- info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition))
leaderLock.lock()
try {
- // check if leader already exists
+ zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partition), new LeaderElectionListener(topic, partition, leaderLock, liveLeaderIsElected, zkClient))
+ liveLeaderIsElected.await(timeoutMs, TimeUnit.MILLISECONDS)
val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
leader match {
- case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition))
- leader
- case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString),
- new LeaderExistsListener(topic, partition, leaderLock, leaderExists))
- info("No leader exists. Waiting for %d ms".format(timeoutMs))
- leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
- // check if leader is elected
- val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
- leader match {
- case Some(l) => info("Leader %d elected for topic %s partition %d".format(l, topic, partition))
- case None => error("Timing out after %d ms since leader is not elected for topic %s partition %d"
- .format(timeoutMs, topic, partition))
- }
- leader
+ case Some(l) =>
+ if(ZkUtils.pathExists(zkClient, ZkUtils.getBrokerPath(l))){
+ info("Leader %d is elected for topic %s partition %d".format(l, topic, partition))
+ return leader
+ } else {
+ warn("Timing out after %d ms but current leader in zookeeper is not alive, and no live leader for partition [%s, %d] is elected".format(topic, partition))
+ return None
+ }
+ case None => warn("Timing out after %d ms but no leader is elected for topic %s partition %d".format(timeoutMs, topic, partition))
+ return None
}
} finally {
leaderLock.unlock()
@@ -430,50 +451,49 @@ object TestUtils extends Logging {
}
object ControllerTestUtils{
- def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
+ def createTestLeaderAndISRRequest() : LeaderAndISRRequest = {
val topic1 = "test1"
val topic2 = "test2"
- val leader1 = 1;
- val ISR1 = List(1, 2, 3)
+ val leader1 = 0;
+ val isr1 = List(0, 1, 2)
- val leader2 = 2;
- val ISR2 = List(2, 3, 4)
+ val leader2 = 0;
+ val isr2 = List(0, 2, 3)
- val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
- val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
- val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
- ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
- new LeaderAndISRRequest(1, "client 1", 1, 4, map)
+ val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1)
+ val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2)
+ val map = Map(((topic1, 0), leaderAndISR1),
+ ((topic2, 0), leaderAndISR2))
+ new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map)
}
- def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
+ def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
val topic1 = "test1"
val topic2 = "test2"
- val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
- ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+ val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
+ ((topic2, 0), ErrorMapping.NoError))
new LeaderAndISRResponse(1, responseMap)
}
- def createSampleStopReplicaRequest() : StopReplicaRequest = {
+ def createTestStopReplicaRequest() : StopReplicaRequest = {
val topic1 = "test1"
val topic2 = "test2"
- new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
- (topic2, 1), (topic2, 2)))
+ new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
}
- def createSampleStopReplicaResponse() : StopReplicaResponse = {
+ def createTestStopReplicaResponse() : StopReplicaResponse = {
val topic1 = "test1"
val topic2 = "test2"
- val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
- ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+ val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
+ ((topic2, 0), ErrorMapping.NoError))
new StopReplicaResponse(1, responseMap)
}
}
object TestZKUtils {
- val zookeeperConnect = "127.0.0.1:2182"
+ val zookeeperConnect = "127.0.0.1:2182"
}
class StringSerializer extends Encoder[String] {
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1367619&r1=1367618&r2=1367619&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Tue Jul 31 15:33:29 2012
@@ -40,7 +40,7 @@ class ZKEphemeralTest extends JUnit3Suit
var testData: String = null
- testData = ZkUtils.readData(zkClient, "/tmp/zktest")
+ testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1
Assert.assertNotNull(testData)
zkClient.close