You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2013/01/29 04:36:20 UTC
[1/3] KAFKA-631 Implement a log cleaner for Kafka. Reviewed by Neha.
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index cd724a3..48487e8 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -18,11 +18,13 @@ package kafka.server
import org.scalatest.junit.JUnit3Suite
import org.junit.Assert._
+import java.io.File
import kafka.admin.CreateTopicCommand
import kafka.utils.TestUtils._
import kafka.utils.IntEncoder
import kafka.utils.{Utils, TestUtils}
import kafka.zk.ZooKeeperTestHarness
+import kafka.common._
import kafka.producer.{ProducerConfig, KeyedMessage, Producer}
class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
@@ -44,8 +46,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
val message = "hello"
var producer: Producer[Int, String] = null
- var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
- var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
+ var hwFile1: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps1.logDirs(0), ReplicaManager.HighWatermarkFilename))
+ var hwFile2: OffsetCheckpoint = new OffsetCheckpoint(new File(configProps2.logDirs(0), ReplicaManager.HighWatermarkFilename))
var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
@@ -80,9 +82,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
producer.close()
- val leaderHW = hwFile1.read(topic, 0)
+ val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(numMessages, leaderHW)
- val followerHW = hwFile2.read(topic, 0)
+ val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(numMessages, followerHW)
servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))})
}
@@ -104,7 +106,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// NOTE: this is to avoid transient test failures
assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
- assertEquals(0L, hwFile1.read(topic, 0))
+ assertEquals(0L, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
sendMessages(1)
Thread.sleep(1000)
@@ -112,7 +114,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// kill the server hosting the preferred replica
server1.shutdown()
- assertEquals(hw, hwFile1.read(topic, 0))
+ assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
// check if leader moves to the other server
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
@@ -125,10 +127,10 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
assertTrue("Leader must remain on broker 1, in case of zookeeper session expiration it can move to broker 0",
leader.isDefined && (leader.get == 0 || leader.get == 1))
- assertEquals(hw, hwFile1.read(topic, 0))
+ assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
// since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
server2.shutdown()
- assertEquals(hw, hwFile2.read(topic, 0))
+ assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
server2.startup()
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
@@ -144,8 +146,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
producer.close()
- assertEquals(hw, hwFile1.read(topic, 0))
- assertEquals(hw, hwFile2.read(topic, 0))
+ assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+ assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
servers.foreach(server => Utils.rm(server.config.logDirs))
}
@@ -155,8 +157,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
- hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
- hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
+ hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
+ hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
producer = new Producer[Int, String](new ProducerConfig(producerProps))
@@ -176,9 +178,9 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
producer.close()
- val leaderHW = hwFile1.read(topic, 0)
+ val leaderHW = hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(hw, leaderHW)
- val followerHW = hwFile2.read(topic, 0)
+ val followerHW = hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L)
assertEquals(hw, followerHW)
servers.foreach(server => Utils.rm(server.config.logDirs))
}
@@ -189,8 +191,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
server2 = TestUtils.createServer(configs.last)
servers ++= List(server1, server2)
- hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
- hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
+ hwFile1 = new OffsetCheckpoint(new File(server1.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
+ hwFile2 = new OffsetCheckpoint(new File(server2.config.logDirs(0), ReplicaManager.HighWatermarkFilename))
producer = new Producer[Int, String](new ProducerConfig(producerProps))
@@ -212,21 +214,21 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// kill the server hosting the preferred replica
server1.shutdown()
server2.shutdown()
- assertEquals(hw, hwFile1.read(topic, 0))
- assertEquals(hw, hwFile2.read(topic, 0))
+ assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+ assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
server2.startup()
// check if leader moves to the other server
leader = waitUntilLeaderIsElectedOrChanged(zkClient, topic, partitionId, 500, leader)
assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
- assertEquals(hw, hwFile1.read(topic, 0))
+ assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
// bring the preferred replica back
server1.startup()
- assertEquals(hw, hwFile1.read(topic, 0))
- assertEquals(hw, hwFile2.read(topic, 0))
+ assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+ assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
sendMessages(2)
hw += 2
@@ -237,8 +239,8 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
// shutdown the servers to allow the hw to be checkpointed
servers.foreach(server => server.shutdown())
producer.close()
- assertEquals(hw, hwFile1.read(topic, 0))
- assertEquals(hw, hwFile2.read(topic, 0))
+ assertEquals(hw, hwFile1.read.getOrElse(TopicAndPartition(topic, 0), 0L))
+ assertEquals(hw, hwFile2.read.getOrElse(TopicAndPartition(topic, 0), 0L))
servers.foreach(server => Utils.rm(server.config.logDirs))
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index 4619389..571e2df 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -68,7 +68,6 @@ class SimpleFetchTest extends JUnit3Suite {
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes()
- EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes()
EasyMock.replay(logManager)
val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
@@ -135,7 +134,6 @@ class SimpleFetchTest extends JUnit3Suite {
val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes()
- EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes()
EasyMock.replay(logManager)
val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
index 4853f2b..d5896ed 100644
--- a/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
+++ b/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit
*
* Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time).
*/
-@nonthreadsafe
class MockScheduler(val time: Time) extends Scheduler {
/* a priority queue of tasks ordered by next execution time */
@@ -41,7 +40,9 @@ class MockScheduler(val time: Time) extends Scheduler {
def startup() {}
def shutdown() {
- tasks.clear()
+ this synchronized {
+ tasks.clear()
+ }
}
/**
@@ -50,23 +51,26 @@ class MockScheduler(val time: Time) extends Scheduler {
* If you are using the scheduler associated with a MockTime instance this call be triggered automatically.
*/
def tick() {
- val now = time.milliseconds
- while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
- /* pop and execute the task with the lowest next execution time */
- val curr = tasks.head
- this.tasks = tasks.tail
- curr.fun()
- /* if the task is periodic, reschedule it and re-enqueue */
- if(curr.periodic) {
- curr.nextExecution += curr.period
- this.tasks += curr
+ this synchronized {
+ val now = time.milliseconds
+ while(!tasks.isEmpty && tasks.head.nextExecution <= now) {
+ /* pop and execute the task with the lowest next execution time */
+ val curr = tasks.dequeue
+ curr.fun()
+ /* if the task is periodic, reschedule it and re-enqueue */
+ if(curr.periodic) {
+ curr.nextExecution += curr.period
+ this.tasks += curr
+ }
}
}
}
def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) {
- tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
- tick()
+ this synchronized {
+ tasks += MockTask(name, fun, time.milliseconds + delay, period = period)
+ tick()
+ }
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 5547d63..d12d24e 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -43,6 +43,8 @@ import kafka.common.TopicAndPartition
* Utility functions to help with testing
*/
object TestUtils extends Logging {
+
+ val IoTmpDir = System.getProperty("java.io.tmpdir")
val Letters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"
val Digits = "0123456789"
@@ -74,8 +76,7 @@ object TestUtils extends Logging {
* Create a temporary directory
*/
def tempDir(): File = {
- val ioDir = System.getProperty("java.io.tmpdir")
- val f = new File(ioDir, "kafka-" + random.nextInt(1000000))
+ val f = new File(IoTmpDir, "kafka-" + random.nextInt(1000000))
f.mkdirs()
f.deleteOnExit()
f
@@ -154,8 +155,8 @@ object TestUtils extends Logging {
* Wrap the message in a message set
* @param payload The bytes of the message
*/
- def singleMessageSet(payload: Array[Byte], codec: CompressionCodec = NoCompressionCodec) =
- new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload))
+ def singleMessageSet(payload: Array[Byte], codec: CompressionCodec = NoCompressionCodec, key: Array[Byte] = null) =
+ new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key))
/**
* Generate an array of random bytes
@@ -497,7 +498,7 @@ object TestUtils extends Logging {
}
object TestZKUtils {
- val zookeeperConnect = "127.0.0.1:2182"
+ val zookeeperConnect = "127.0.0.1:" + TestUtils.choosePort()
}
class IntEncoder(props: VerifiableProperties = null) extends Encoder[Int] {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e7edb5e1/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
index cce6c8e..48dd335 100644
--- a/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/UtilsTest.scala
@@ -19,6 +19,7 @@ package kafka.utils
import java.util.Arrays
import java.nio.ByteBuffer
+import java.io._
import org.apache.log4j.Logger
import org.scalatest.junit.JUnitSuite
import org.junit.Test
@@ -61,6 +62,25 @@ class UtilsTest extends JUnitSuite {
assertTrue(Arrays.equals(bytes, Utils.readBytes(ByteBuffer.wrap(bytes))))
}
}
+
+ @Test
+ def testReplaceSuffix() {
+ assertEquals("blah.foo.text", Utils.replaceSuffix("blah.foo.txt", ".txt", ".text"))
+ assertEquals("blah.foo", Utils.replaceSuffix("blah.foo.txt", ".txt", ""))
+ assertEquals("txt.txt", Utils.replaceSuffix("txt.txt.txt", ".txt", ""))
+ assertEquals("foo.txt", Utils.replaceSuffix("foo", "", ".txt"))
+ }
+
+ @Test
+ def testReadInt() {
+ val values = Array(0, 1, -1, Byte.MaxValue, Short.MaxValue, 2 * Short.MaxValue, Int.MaxValue/2, Int.MinValue/2, Int.MaxValue, Int.MinValue, Int.MaxValue)
+ val buffer = ByteBuffer.allocate(4 * values.size)
+ for(i <- 0 until values.length) {
+ buffer.putInt(i*4, values(i))
+ assertEquals("Written value should match read value.", values(i), Utils.readInt(buffer.array, i*4))
+ }
+
+ }
@Test
def testCsvList() {