You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/29 04:36:20 UTC

[1/3] KAFKA-631 Implement a log cleaner for Kafka. Reviewed by Neha.

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index cd724a3..48487e8 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -18,11 +18,13 @@ package kafka.server
 
 import org.scalatest.junit.JUnit3Suite
 import org.junit.Assert._
+import java.io.File
 import kafka.admin.CreateTopicCommand
 import kafka.utils.TestUtils._
 import kafka.utils.IntEncoder
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
+import kafka.common._
 import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
 
 class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -44,8 +46,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
   val message = "hello"
 
   var producer: Producer[Int, String] = null
-  var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
-  var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
+  var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
+  var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
   
   val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
@@ -80,9 +82,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
     producer.close()
-    val leaderHW = hwFile1.read(topic, 0)
+    val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(numMessages, leaderHW)
-    val followerHW = hwFile2.read(topic, 0)
+    val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(numMessages, followerHW)
     servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))})
   }
@@ -104,7 +106,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
     
-    assertEquals(0L, hwFile1.read(topic, 0))
+    assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     sendMessages(1)
     Thread.sleep(1000)
@@ -112,7 +114,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // kill the server hosting the preferred replica
     server1.shutdown()
-    assertEquals(hw, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
@@ -125,10 +127,10 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
       leader.isDefined && (leader.get == 0 || leader.get == 1))
 
-    assertEquals(hw, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
     server2.shutdown()
-    assertEquals(hw, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     server2.startup()
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
@@ -144,8 +146,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
-    assertEquals(hw, hwFile1.read(topic, 0))
-    assertEquals(hw, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 
@@ -155,8 +157,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     server2 = TestUtils.createServer(configs.last)
     servers ++= List(server1, server2)
 
-    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
-    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
+    hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
+    hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
 
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
@@ -176,9 +178,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
-    val leaderHW = hwFile1.read(topic, 0)
+    val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(hw, leaderHW)
-    val followerHW = hwFile2.read(topic, 0)
+    val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
     assertEquals(hw, followerHW)
     servers.foreach(server => Utils.rm(server.config.logDirs))
   }
@@ -189,8 +191,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     server2 = TestUtils.createServer(configs.last)
     servers ++= List(server1, server2)
 
-    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
-    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
+    hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
+    hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
 
     producer = new Producer[Int, String](new ProducerConfig(producerProps))
 
@@ -212,21 +214,21 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     // kill the server hosting the preferred replica
     server1.shutdown()
     server2.shutdown()
-    assertEquals(hw, hwFile1.read(topic, 0))
-    assertEquals(hw, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     server2.startup()
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
-    assertEquals(hw, hwFile1.read(topic, 0))
+    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     // bring the preferred replica back
     server1.startup()
 
-    assertEquals(hw, hwFile1.read(topic, 0))
-    assertEquals(hw, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
 
     sendMessages(2)
     hw += 2
@@ -237,8 +239,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
     producer.close()
-    assertEquals(hw, hwFile1.read(topic, 0))
-    assertEquals(hw, hwFile2.read(topic, 0))
+    assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+    assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
     servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 4619389..571e2df 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -68,7 +68,6 @@ class SimpleFetchTest extends JUnit3Suite {
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
     EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes()
     EasyMock.replay(logManager)
 
     val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
@@ -135,7 +134,6 @@ class SimpleFetchTest extends JUnit3Suite {
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
     EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes()
-    EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes()
     EasyMock.replay(logManager)
 
     val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 4853f2b..d5896ed 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit
  *   
  * Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time).
  */
-@nonthreadsafe
 class MockScheduler(val time: Time) extends Scheduler {
   
   /* a priority queue of tasks ordered by next execution time */
@@ -41,7 +40,9 @@ class MockScheduler(val time: Time) extends Scheduler {
   def startup() {}
   
   def shutdown() {
-    tasks.clear()
+    this synchronized {
+      tasks.clear()
+    }
   }
   
   /**
@@ -50,23 +51,26 @@ class MockScheduler(val time: Time) extends Scheduler {
    * If you are using the scheduler associated with a MockTime instance this call be triggered automatically.
    */
   def tick() {
-    val now = time.milliseconds
-    while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
-      /* pop and execute the task with the lowest next execution time */
-      val curr = tasks.head
-      this.tasks = tasks.tail
-      curr.fun()
-      /* if the task is periodic, reschedule it and re-enqueue */
-      if(curr.periodic) {
-        curr.nextExecution += curr.period
-        this.tasks += curr
+    this synchronized {
+      val now = time.milliseconds
+      while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+        /* pop and execute the task with the lowest next execution time */
+        val curr = tasks.dequeue
+        curr.fun()
+        /* if the task is periodic, reschedule it and re-enqueue */
+        if(curr.periodic) {
+          curr.nextExecution += curr.period
+          this.tasks += curr
+        }
       }
     }
   }
   
   def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) {
-    tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
-    tick()
+    this synchronized {
+      tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
+      tick()
+    }
   }
   
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5547d63..d12d24e 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -43,6 +43,8 @@ import kafka.common.TopicAndPartition
  * Utility functions to help with testing
  */
 object TestUtils extends Logging {
+  
+  val IoTmpDir = System.getProperty("java.io.tmpdir")
 
   val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
   val Digits = "0123456789"
@@ -74,8 +76,7 @@ object TestUtils extends Logging {
    * Create a temporary directory
    */
   def tempDir(): File = {
-    val ioDir = System.getProperty("java.io.tmpdir")
-    val f = new File(ioDir, "kafka-" + random.nextInt(1000000))
+    val f = new File(IoTmpDir, "kafka-" + random.nextInt(1000000))
     f.mkdirs()
     f.deleteOnExit()
     f
@@ -154,8 +155,8 @@ object TestUtils extends Logging {
    * Wrap the message in a message set
    * @param payload The bytes of the message
    */
-  def singleMessageSet(payload: Array[Byte], codec: CompressionCodec = NoCompressionCodec) =
-    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload))
+  def singleMessageSet(payload: Array[Byte], codec: CompressionCodec = NoCompressionCodec, key: Array[Byte] = null) =
+    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key))
 
   /**
    * Generate an array of random bytes
@@ -497,7 +498,7 @@ object TestUtils extends Logging {
 }
 
 object TestZKUtils {
-  val zookeeperConnect = "127.0.0.1:2182"
+  val zookeeperConnect = "127.0.0.1:" + TestUtils.choosePort()
 }
 
 class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {

http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index cce6c8e..48dd335 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -19,6 +19,7 @@ package kafka.utils
 
 import java.util.Arrays
 import java.nio.ByteBuffer
+import java.io._
 import org.apache.log4j.Logger
 import org.scalatest.junit.JUnitSuite
 import org.junit.Test
@@ -61,6 +62,25 @@ class UtilsTest extends JUnitSuite {
       assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
     }
   }
+  
+  @Test
+  def testReplaceSuffix() {
+    assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
+    assertEquals("blah.foo", Utils.replaceSuffix("blah.foo.txt", ".txt", ""))
+    assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""))
+    assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"))
+  }
+  
+  @Test
+  def testReadInt() {
+    val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
+    val buffer = ByteBuffer.allocate(4 * values.size)
+    for(i <- 0 until values.length) {
+      buffer.putInt(i*4, values(i))
+      assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4))
+    }
+
+  }
 
   @Test
   def testCsvList() {