You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/07/31 17:33:31 UTC

svn commit: r1367619 [4/4] - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/admin/ main/scala/kafka/api/ main/scala/kafka/cluster/ main/scala/kafka/common/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/network/ main/sca...

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1367619&r1=1367618&r2=1367619&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Tue Jul 31 15:33:29 2012
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -43,33 +43,33 @@ import collection.mutable.{Map, Set}
  * Utility functions to help with testing
  */
 object TestUtils extends Logging {
-  
+
   val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
   val Digits = "0123456789"
   val LettersAndDigits = Letters + Digits
-  
+
   /* A consistent random number generator to make tests repeatable */
   val seededRandom = new Random(192348092834L)
   val random = new Random()
-  
+
   /**
    * Choose a number of random available ports
    */
   def choosePorts(count: Int): List[Int] = {
-    val sockets = 
+    val sockets =
       for(i <- 0 until count)
-        yield new ServerSocket(0)
+      yield new ServerSocket(0)
     val socketList = sockets.toList
     val ports = socketList.map(_.getLocalPort)
     socketList.map(_.close)
     ports
   }
-  
+
   /**
    * Choose an available port
    */
   def choosePort(): Int = choosePorts(1).head
-  
+
   /**
    * Create a temporary directory
    */
@@ -80,7 +80,7 @@ object TestUtils extends Logging {
     f.deleteOnExit()
     f
   }
-  
+
   /**
    * Create a temporary file
    */
@@ -89,12 +89,12 @@ object TestUtils extends Logging {
     f.deleteOnExit()
     f
   }
-  
+
   /**
    * Create a temporary file and return an open file channel for this file
    */
   def tempChannel(): FileChannel = new RandomAccessFile(tempFile(), "rw").getChannel()
-  
+
   /**
    * Create a kafka server instance with appropriate test settings
    * USING THIS IS A SIGN YOU ARE NOT WRITING A REAL UNIT TEST
@@ -105,15 +105,15 @@ object TestUtils extends Logging {
     server.startup()
     server
   }
-  
+
   /**
    * Create a test config for the given node id
    */
   def createBrokerConfigs(numConfigs: Int): List[Properties] = {
     for((port, node) <- choosePorts(numConfigs).zipWithIndex)
-      yield createBrokerConfig(node, port)
+    yield createBrokerConfig(node, port)
   }
-  
+
   /**
    * Create a test config for the given node id
    */
@@ -127,7 +127,7 @@ object TestUtils extends Logging {
     props.put("replica.socket.timeout.ms", "1500")
     props
   }
-  
+
   /**
    * Create a test config for a consumer
    */
@@ -150,9 +150,9 @@ object TestUtils extends Logging {
    * Wrap the message in a message set
    * @param payload The bytes of the message
    */
-  def singleMessageSet(payload: Array[Byte]) = 
+  def singleMessageSet(payload: Array[Byte]) =
     new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = new Message(payload))
-  
+
   /**
    * Generate an array of random bytes
    * @param numBytes The size of the array
@@ -162,7 +162,7 @@ object TestUtils extends Logging {
     seededRandom.nextBytes(bytes)
     bytes
   }
-  
+
   /**
    * Generate a random string of letters and digits of the given length
    * @param len The length of the string
@@ -183,7 +183,7 @@ object TestUtils extends Logging {
     for(i <- 0 until b1.limit - b1.position)
       assertEquals("byte " + i + " byte not equal.", b1.get(b1.position + i), b2.get(b1.position + i))
   }
-  
+
   /**
    * Throw an exception if the two iterators are of differing lengths or contain
    * different messages on their Nth element
@@ -197,28 +197,28 @@ object TestUtils extends Logging {
 
     // check if the expected iterator is longer
     if (expected.hasNext) {
-     var length1 = length;
-     while (expected.hasNext) {
-       expected.next
-       length1 += 1
-     }
-     assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
+      var length1 = length;
+      while (expected.hasNext) {
+        expected.next
+        length1 += 1
+      }
+      assertFalse("Iterators have uneven length-- first has more: "+length1 + " > " + length, true);
     }
 
     // check if the actual iterator was longer
     if (actual.hasNext) {
-     var length2 = length;
-     while (actual.hasNext) {
-       actual.next
-       length2 += 1
-     }
-     assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true);
+      var length2 = length;
+      while (actual.hasNext) {
+        actual.next
+        length2 += 1
+      }
+      assertFalse("Iterators have uneven length-- second has more: "+length2 + " > " + length, true);
     }
   }
 
   /**
    *  Throw an exception if an iterable has different length than expected
-   *  
+   *
    */
   def checkLength[T](s1: Iterator[T], expectedLength:Int) {
     var n = 0
@@ -269,7 +269,7 @@ object TestUtils extends Logging {
    * Create a hexidecimal string for the given bytes
    */
   def hexString(bytes: Array[Byte]): String = hexString(ByteBuffer.wrap(bytes))
-  
+
   /**
    * Create a hexidecimal string for the given bytes
    */
@@ -279,7 +279,7 @@ object TestUtils extends Logging {
       builder.append(String.format("%x", Integer.valueOf(buffer.get(buffer.position + i))))
     builder.toString
   }
-  
+
   /**
    * Create a producer for the given host and port
    */
@@ -340,7 +340,7 @@ object TestUtils extends Logging {
       buffer += ("msg" + i)
     buffer
   }
-  
+
   /**
    * Create a wired format request based on simple basic information
    */
@@ -381,34 +381,55 @@ object TestUtils extends Logging {
   }
 
   def makeLeaderForPartition(zkClient: ZkClient, topic: String, leaderPerPartitionMap: scala.collection.immutable.Map[Int, Int]) {
-    leaderPerPartitionMap.foreach(leaderForPartition => ZkUtils.tryToBecomeLeaderForPartition(zkClient, topic,
-      leaderForPartition._1, leaderForPartition._2))
+    leaderPerPartitionMap.foreach
+    {
+      leaderForPartition => {
+        val partition = leaderForPartition._1
+        val leader = leaderForPartition._2
+        try{
+          val currentLeaderAndISROpt = ZkUtils.getLeaderAndISRForPartition(zkClient, topic, partition)
+          var newLeaderAndISR: LeaderAndISR = null
+          if(currentLeaderAndISROpt == None)
+            newLeaderAndISR = new LeaderAndISR(leader, List(leader))
+          else{
+            newLeaderAndISR = currentLeaderAndISROpt.get
+            newLeaderAndISR.leader = leader
+            newLeaderAndISR.leaderEpoch += 1
+            newLeaderAndISR.zkVersion += 1
+          }
+          ZkUtils.updatePersistentPath(zkClient, ZkUtils.getTopicPartitionLeaderAndISRPath( topic, partition), newLeaderAndISR.toString)
+        } catch {
+          case oe => error("Error while electing leader for topic %s partition %d".format(topic, partition), oe)
+        }
+      }
+    }
   }
 
-  def waitUntilLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
+  def waitUntilLiveLeaderIsElected(zkClient: ZkClient, topic: String, partition: Int, timeoutMs: Long): Option[Int] = {
+    // If the current leader is alive, just return it
+    val curLeaderOpt = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
+    if(curLeaderOpt.isDefined && ZkUtils.pathExists(zkClient, ZkUtils.getBrokerPath(curLeaderOpt.get)))
+      return curLeaderOpt
+
     val leaderLock = new ReentrantLock()
-    val leaderExists = leaderLock.newCondition()
+    val liveLeaderIsElected = leaderLock.newCondition()
 
-    info("Waiting for leader to be elected for topic %s partition %d".format(topic, partition))
     leaderLock.lock()
     try {
-      // check if leader already exists
+      zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderAndISRPath(topic, partition), new LeaderElectionListener(topic, partition, leaderLock,  liveLeaderIsElected, zkClient))
+      liveLeaderIsElected.await(timeoutMs, TimeUnit.MILLISECONDS)
       val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
       leader match {
-        case Some(l) => info("Leader %d exists for topic %s partition %d".format(l, topic, partition))
-          leader
-        case None => zkClient.subscribeDataChanges(ZkUtils.getTopicPartitionLeaderPath(topic, partition.toString),
-          new LeaderExistsListener(topic, partition, leaderLock, leaderExists))
-        info("No leader exists. Waiting for %d ms".format(timeoutMs))
-        leaderExists.await(timeoutMs, TimeUnit.MILLISECONDS)
-          // check if leader is elected
-        val leader = ZkUtils.getLeaderForPartition(zkClient, topic, partition)
-        leader match {
-          case Some(l) => info("Leader %d elected for topic %s partition %d".format(l, topic, partition))
-          case None => error("Timing out after %d ms since leader is not elected for topic %s partition %d"
-            .format(timeoutMs, topic, partition))
-        }
-        leader
+        case Some(l) =>
+          if(ZkUtils.pathExists(zkClient, ZkUtils.getBrokerPath(l))){
+            info("Leader %d is elected for topic %s partition %d".format(l, topic, partition))
+            return leader
+          } else {
+            warn("Timing out after %d ms but current leader in zookeeper is not alive, and no live leader for partition [%s, %d] is elected".format(topic, partition))
+            return None
+          }
+        case None => warn("Timing out after %d ms but no leader is elected for topic %s partition %d".format(timeoutMs, topic, partition))
+        return None
       }
     } finally {
       leaderLock.unlock()
@@ -430,50 +451,49 @@ object TestUtils extends Logging {
 }
 
 object ControllerTestUtils{
-  def createSampleLeaderAndISRRequest() : LeaderAndISRRequest = {
+  def createTestLeaderAndISRRequest() : LeaderAndISRRequest = {
     val topic1 = "test1"
     val topic2 = "test2"
 
-    val leader1 = 1;
-    val ISR1 = List(1, 2, 3)
+    val leader1 = 0;
+    val isr1 = List(0, 1, 2)
 
-    val leader2 = 2;
-    val ISR2 = List(2, 3, 4)
+    val leader2 = 0;
+    val isr2 = List(0, 2, 3)
 
-    val leaderAndISR1 = new LeaderAndISR(leader1, 1, ISR1, 1)
-    val leaderAndISR2 = new LeaderAndISR(leader2, 1, ISR2, 2)
-    val map = Map(((topic1, 1), leaderAndISR1), ((topic1, 2), leaderAndISR1),
-                  ((topic2, 1), leaderAndISR2), ((topic2, 2), leaderAndISR2))
-    new LeaderAndISRRequest(1, "client 1", 1, 4, map)
+    val leaderAndISR1 = new LeaderAndISR(leader1, 1, isr1, 1)
+    val leaderAndISR2 = new LeaderAndISR(leader2, 1, isr2, 2)
+    val map = Map(((topic1, 0), leaderAndISR1),
+                  ((topic2, 0), leaderAndISR2))
+    new LeaderAndISRRequest( LeaderAndISRRequest.NotInit, map)
   }
 
-  def createSampleLeaderAndISRResponse() : LeaderAndISRResponse = {
+  def createTestLeaderAndISRResponse() : LeaderAndISRResponse = {
     val topic1 = "test1"
     val topic2 = "test2"
-    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
-                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+    val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
+                          ((topic2, 0), ErrorMapping.NoError))
     new LeaderAndISRResponse(1, responseMap)
   }
 
 
-  def createSampleStopReplicaRequest() : StopReplicaRequest = {
+  def createTestStopReplicaRequest() : StopReplicaRequest = {
     val topic1 = "test1"
     val topic2 = "test2"
-    new StopReplicaRequest(1, "client 1", 1000, Set((topic1, 1), (topic1, 2),
-                                                    (topic2, 1), (topic2, 2)))
+    new StopReplicaRequest(Set((topic1, 0), (topic2, 0)))
   }
 
-  def createSampleStopReplicaResponse() : StopReplicaResponse = {
+  def createTestStopReplicaResponse() : StopReplicaResponse = {
     val topic1 = "test1"
     val topic2 = "test2"
-    val responseMap = Map(((topic1, 1), ErrorMapping.NoError), ((topic1, 2), ErrorMapping.NoError),
-                          ((topic2, 1), ErrorMapping.NoError), ((topic2, 2), ErrorMapping.NoError))
+    val responseMap = Map(((topic1, 0), ErrorMapping.NoError),
+                          ((topic2, 0), ErrorMapping.NoError))
     new StopReplicaResponse(1, responseMap)
   }
 }
 
 object TestZKUtils {
-  val zookeeperConnect = "127.0.0.1:2182"  
+  val zookeeperConnect = "127.0.0.1:2182"
 }
 
 class StringSerializer extends Encoder[String] {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala?rev=1367619&r1=1367618&r2=1367619&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/zk/ZKEphemeralTest.scala Tue Jul 31 15:33:29 2012
@@ -40,7 +40,7 @@ class ZKEphemeralTest extends JUnit3Suit
 
     var testData: String = null
 
-    testData = ZkUtils.readData(zkClient, "/tmp/zktest")
+    testData = ZkUtils.readData(zkClient, "/tmp/zktest")._1
     Assert.assertNotNull(testData)
 
     zkClient.close