You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/10 20:31:31 UTC
svn commit: r1419692 - in /kafka/trunk/core/src: main/scala/kafka/consumer/
main/scala/kafka/log/ main/scala/kafka/server/ main/scala/kafka/utils/
test/scala/unit/kafka/log/ test/scala/unit/kafka/server/
test/scala/unit/kafka/utils/
Author: jkreps
Date: Mon Dec 10 19:31:29 2012
New Revision: 1419692
URL: http://svn.apache.org/viewvc?rev=1419692&view=rev
Log:
KAFKA-597 Refactor scheduler. Fixes a couple of bugs, and adds the ability to mock scheduled tasks.
Added:
kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala
kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
Removed:
kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala
Modified:
kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
kafka/trunk/core/src/main/scala/kafka/log/Log.scala
kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala
kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
Modified: kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Dec 10 19:31:29 2012
@@ -86,7 +86,7 @@ private[kafka] class ZookeeperConsumerCo
private var zkClient: ZkClient = null
private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
- private val scheduler = new KafkaScheduler(1)
+ private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
private val messageStreamCreated = new AtomicBoolean(false)
private var sessionExpirationListener: ZKSessionExpireListener = null
@@ -114,8 +114,11 @@ private[kafka] class ZookeeperConsumerCo
if (config.autoCommit) {
scheduler.startup
info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
- scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
- config.autoCommitIntervalMs, false)
+ scheduler.schedule("kafka-consumer-autocommit",
+ autoCommit,
+ delay = config.autoCommitIntervalMs,
+ period = config.autoCommitIntervalMs,
+ unit = TimeUnit.MILLISECONDS)
}
KafkaMetricsReporter.startReporters(config.props)
@@ -160,7 +163,7 @@ private[kafka] class ZookeeperConsumerCo
wildcardTopicWatcher.shutdown()
try {
if (config.autoCommit)
- scheduler.shutdownNow()
+ scheduler.shutdown()
fetcher match {
case Some(f) => f.shutdown
case None =>
Modified: kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/Log.scala Mon Dec 10 19:31:29 2012
@@ -394,7 +394,7 @@ class Log(val dir: File,
maxIndexSize = maxIndexSize)
val prev = segments.put(segment.baseOffset, segment)
if(prev != null)
- throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
+ throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(dir.getName, newOffset))
segment
}
}
Modified: kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Mon Dec 10 19:31:29 2012
@@ -18,6 +18,7 @@
package kafka.log
import java.io._
+import java.util.concurrent.TimeUnit
import kafka.utils._
import scala.collection._
import kafka.common.{TopicAndPartition, KafkaException}
@@ -35,12 +36,13 @@ import kafka.server.KafkaConfig
* A background thread handles log retention by periodically truncating excess log segments.
*/
@threadsafe
-class LogManager(val config: KafkaConfig,
- scheduler: KafkaScheduler,
- private val time: Time) extends Logging {
+private[kafka] class LogManager(val config: KafkaConfig,
+ scheduler: Scheduler,
+ private val time: Time) extends Logging {
val CleanShutdownFile = ".kafka_cleanshutdown"
val LockFile = ".lock"
+ val InitialTaskDelayMs = 30*1000
val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
private val logFileSizeMap = config.logFileSizeMap
private val logFlushInterval = config.flushInterval
@@ -138,15 +140,19 @@ class LogManager(val config: KafkaConfig
def startup() {
/* Schedule the cleanup task to delete old logs */
if(scheduler != null) {
- info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
- scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
- info("Starting log flusher every " + config.flushSchedulerThreadRate +
- " ms with the following overrides " + logFlushIntervals)
- scheduler.scheduleWithRate(flushDirtyLogs,
- "kafka-logflusher-",
- config.flushSchedulerThreadRate,
- config.flushSchedulerThreadRate,
- isDaemon = false)
+ info("Starting log cleanup with a period of %d ms.".format(logCleanupIntervalMs))
+ scheduler.schedule("kafka-log-cleaner",
+ cleanupLogs,
+ delay = InitialTaskDelayMs,
+ period = logCleanupIntervalMs,
+ TimeUnit.MILLISECONDS)
+ info("Starting log flusher with a default period of %d ms with the following overrides: %s."
+ .format(config.defaultFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
+ scheduler.schedule("kafka-log-flusher",
+ flushDirtyLogs,
+ delay = InitialTaskDelayMs,
+ period = config.flushSchedulerThreadRate,
+ TimeUnit.MILLISECONDS)
}
}
Modified: kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Mon Dec 10 19:31:29 2012
@@ -48,6 +48,9 @@ class KafkaConfig private (val props: Ve
/* the number of io threads that the server uses for carrying out network requests */
val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
+ /* the number of threads to use for various background processing tasks */
+ val backgroundThreads = props.getIntInRange("background.threads", 4, (1, Int.MaxValue))
+
/* the number of queued requests allowed before blocking the network threads */
val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
Modified: kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Mon Dec 10 19:31:29 2012
@@ -40,7 +40,7 @@ class KafkaServer(val config: KafkaConfi
var replicaManager: ReplicaManager = null
var apis: KafkaApis = null
var kafkaController: KafkaController = null
- val kafkaScheduler = new KafkaScheduler(4)
+ val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
var zkClient: ZkClient = null
/**
@@ -53,7 +53,7 @@ class KafkaServer(val config: KafkaConfi
shutdownLatch = new CountDownLatch(1)
/* start scheduler */
- kafkaScheduler.startup
+ kafkaScheduler.startup()
/* start log manager */
logManager = new LogManager(config,
Modified: kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala Mon Dec 10 19:31:29 2012
@@ -38,7 +38,7 @@ object ReplicaManager {
class ReplicaManager(val config: KafkaConfig,
time: Time,
val zkClient: ZkClient,
- kafkaScheduler: KafkaScheduler,
+ scheduler: Scheduler,
val logManager: LogManager) extends Logging with KafkaMetricsGroup {
/* epoch of the controller that last changed the leader */
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
@@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaCo
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
- kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs)
+ scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.highWaterMarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
}
/**
@@ -91,7 +91,7 @@ class ReplicaManager(val config: KafkaCo
def startup() {
// start ISR expiration thread
- kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
+ scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaMaxLagTimeMs, unit = TimeUnit.MILLISECONDS)
}
def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short = {
Modified: kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Mon Dec 10 19:31:29 2012
@@ -22,51 +22,97 @@ import atomic._
import collection.mutable.HashMap
/**
- * A scheduler for running jobs in the background
+ * A scheduler for running jobs
+ *
+ * This interface controls a job scheduler that allows scheduling either repeating background jobs
+ * that execute periodically or delayed one-time actions that are scheduled in the future.
*/
-class KafkaScheduler(val numThreads: Int) extends Logging {
- private var executor:ScheduledThreadPoolExecutor = null
- private val daemonThreadFactory = new ThreadFactory() {
- def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true)
- }
- private val nonDaemonThreadFactory = new ThreadFactory() {
- def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false)
- }
- private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
+trait Scheduler {
+
+ /**
+ * Initialize this scheduler so it is ready to accept scheduling of tasks
+ */
+ def startup()
+
+ /**
+ * Shutdown this scheduler. When this method is complete no more executions of background tasks will occur.
+ * This includes tasks scheduled with a delayed execution.
+ */
+ def shutdown()
+
+ /**
+ * Schedule a task
+ * @param name The name of this task
+ * @param delay The amount of time to wait before the first execution
+ * @param period The period with which to execute the task. If < 0 the task will execute only once.
+ * @param unit The unit for the preceding times.
+ */
+ def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
+}
- def startup() = {
- executor = new ScheduledThreadPoolExecutor(numThreads)
- executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
- executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+/**
+ * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
+ *
+ * It has a pool of kafka-scheduler- threads that do the actual work.
+ *
+ * @param threads The number of threads in the thread pool
+ * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
+ * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
+ */
+@threadsafe
+class KafkaScheduler(val threads: Int,
+ val threadNamePrefix: String = "kafka-scheduler-",
+ daemon: Boolean = true) extends Scheduler with Logging {
+ @volatile private var executor: ScheduledThreadPoolExecutor = null
+ private val schedulerThreadId = new AtomicInteger(0)
+
+ override def startup() {
+ debug("Initializing task scheduler.")
+ this synchronized {
+ if(executor != null)
+ throw new IllegalStateException("This scheduler has already been started!")
+ executor = new ScheduledThreadPoolExecutor(threads)
+ executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+ executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+ executor.setThreadFactory(new ThreadFactory() {
+ def newThread(runnable: Runnable): Thread =
+ Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
+ })
+ }
}
-
- def hasShutdown: Boolean = executor.isShutdown
-
- private def ensureExecutorHasStarted = {
- if(executor == null)
- throw new IllegalStateException("Kafka scheduler has not been started")
+
+ override def shutdown() {
+ debug("Shutting down task scheduler.")
+ ensureStarted
+ executor.shutdown()
+ executor.awaitTermination(1, TimeUnit.DAYS)
+ this.executor = null
}
- def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = {
- ensureExecutorHasStarted
- if(isDaemon)
- executor.setThreadFactory(daemonThreadFactory)
+ def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
+ debug("Scheduling task %s with initial delay %d ms and period %d ms."
+ .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
+ ensureStarted
+ val runnable = new Runnable {
+ def run() = {
+ try {
+ trace("Begining execution of scheduled task '%s'.".format(name))
+ fun()
+ } catch {
+ case t => error("Uncaught exception in scheduled task '" + name +"'", t)
+ } finally {
+ trace("Completed execution of scheduled task '%s'.".format(name))
+ }
+ }
+ }
+ if(period >= 0)
+ executor.scheduleAtFixedRate(runnable, delay, period, unit)
else
- executor.setThreadFactory(nonDaemonThreadFactory)
- val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0))
- executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs,
- TimeUnit.MILLISECONDS)
- }
-
- def shutdownNow() {
- ensureExecutorHasStarted
- executor.shutdownNow()
- info("Forcing shutdown of Kafka scheduler")
+ executor.schedule(runnable, delay, unit)
}
-
- def shutdown() {
- ensureExecutorHasStarted
- executor.shutdown()
- info("Shutdown Kafka scheduler")
+
+ private def ensureStarted = {
+ if(executor == null)
+ throw new IllegalStateException("Kafka scheduler has not been started")
}
}
Modified: kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Mon Dec 10 19:31:29 2012
@@ -52,26 +52,6 @@ object Utils extends Logging {
new Runnable() {
def run() = fun()
}
-
- /**
- * Wrap the given function in a java.lang.Runnable that logs any errors encountered
- * @param fun A function
- * @return A Runnable that just executes the function
- */
- def loggedRunnable(fun: () => Unit, name: String): Runnable =
- new Runnable() {
- def run() = {
- Thread.currentThread().setName(name)
- try {
- fun()
- }
- catch {
- case t =>
- // log any error and the stack trace
- error("error in loggedRunnable", t)
- }
- }
- }
/**
* Create a daemon thread
Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Mon Dec 10 19:31:29 2012
@@ -35,7 +35,6 @@ class LogManagerTest extends JUnit3Suite
var config: KafkaConfig = null
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
- val scheduler = new KafkaScheduler(2)
override def setUp() {
super.setUp()
@@ -44,14 +43,12 @@ class LogManagerTest extends JUnit3Suite
override val flushInterval = 10000
override val logRetentionHours = maxLogAgeHours
}
- scheduler.startup
- logManager = new LogManager(config, scheduler, time)
+ logManager = new LogManager(config, time.scheduler, time)
logManager.startup
logDir = logManager.logDirs(0)
}
override def tearDown() {
- scheduler.shutdown()
if(logManager != null)
logManager.shutdown()
Utils.rm(logDir)
@@ -97,10 +94,9 @@ class LogManagerTest extends JUnit3Suite
assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
// update the last modified time of all log segments
- log.logSegments.foreach(_.log.file.setLastModified(time.currentMs))
+ log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
- time.currentMs += maxLogAgeHours*60*60*1000 + 1
- logManager.cleanupLogs()
+ time.sleep(maxLogAgeHours*60*60*1000 + 1)
assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
@@ -120,18 +116,14 @@ class LogManagerTest extends JUnit3Suite
@Test
def testCleanupSegmentsToMaintainSize() {
val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
- val retentionHours = 1
- val retentionMs = 1000 * 60 * 60 * retentionHours
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
- override val logRetentionHours = retentionHours
- override val flushInterval = 100
override val logRollHours = maxRollInterval
}
- logManager = new LogManager(config, scheduler, time)
+ logManager = new LogManager(config, time.scheduler, time)
logManager.startup
// create a log
@@ -149,7 +141,7 @@ class LogManagerTest extends JUnit3Suite
assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
// this cleanup shouldn't find any expired segments but should delete some to reduce size
- logManager.cleanupLogs()
+ time.sleep(logManager.InitialTaskDelayMs)
assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
try {
@@ -170,11 +162,11 @@ class LogManagerTest extends JUnit3Suite
val props = TestUtils.createBrokerConfig(0, -1)
logManager.shutdown()
config = new KafkaConfig(props) {
- override val flushSchedulerThreadRate = 5
- override val defaultFlushIntervalMs = 5
+ override val flushSchedulerThreadRate = 1000
+ override val defaultFlushIntervalMs = 1000
override val flushInterval = Int.MaxValue
}
- logManager = new LogManager(config, scheduler, SystemTime)
+ logManager = new LogManager(config, time.scheduler, time)
logManager.startup
val log = logManager.getOrCreateLog(name, 0)
val lastFlush = log.lastFlushTime
@@ -182,7 +174,7 @@ class LogManagerTest extends JUnit3Suite
var set = TestUtils.singleMessageSet("test".getBytes())
log.append(set)
}
- Thread.sleep(config.flushSchedulerThreadRate)
+ time.sleep(logManager.InitialTaskDelayMs)
assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
}
@@ -198,7 +190,7 @@ class LogManagerTest extends JUnit3Suite
TestUtils.tempDir().getAbsolutePath)
props.put("log.directories", dirs.mkString(","))
logManager.shutdown()
- logManager = new LogManager(new KafkaConfig(props), scheduler, time)
+ logManager = new LogManager(new KafkaConfig(props), time.scheduler, time)
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
@@ -214,7 +206,7 @@ class LogManagerTest extends JUnit3Suite
*/
def testTwoLogManagersUsingSameDirFails() {
try {
- new LogManager(logManager.config, scheduler, time)
+ new LogManager(logManager.config, time.scheduler, time)
fail("Should not be able to create a second log manager instance with the same data directory")
} catch {
case e: KafkaException => // this is good
Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Mon Dec 10 19:31:29 2012
@@ -66,7 +66,7 @@ class LogTest extends JUnitSuite {
// create a log
val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
- time.currentMs += rollMs + 1
+ time.sleep(rollMs + 1)
// segment age is less than its limit
log.append(set)
@@ -76,13 +76,13 @@ class LogTest extends JUnitSuite {
assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
for(numSegments <- 2 until 4) {
- time.currentMs += rollMs + 1
+ time.sleep(rollMs + 1)
log.append(set)
assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
}
val numSegments = log.numberOfSegments
- time.currentMs += rollMs + 1
+ time.sleep(rollMs + 1)
log.append(new ByteBufferMessageSet())
assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
}
Modified: kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Mon Dec 10 19:31:29 2012
@@ -24,7 +24,7 @@ import org.junit._
import org.junit.Assert._
import kafka.common.KafkaException
import kafka.cluster.Replica
-import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
+import kafka.utils._
class HighwatermarkPersistenceTest extends JUnit3Suite {
Added: kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala?rev=1419692&view=auto
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala (added)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala Mon Dec 10 19:31:29 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils
+
+import scala.collection._
+import java.util.concurrent.TimeUnit
+
+/**
+ * A mock scheduler that executes tasks synchronously using a mock time instance. Tasks are executed synchronously when
+ * the time is advanced. This class is meant to be used in conjunction with MockTime.
+ *
+ * Example usage
+ * <code>
+ * val time = new MockTime
+ * time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000)
+ * time.sleep(1001) // this should cause our scheduled task to fire
+ * </code>
+ *
+ * Two gotchas:
+ * <ol>
+ * <li> Incrementing the time by more than one task period will result in the correct number of executions of each scheduled task
+ * but the order of these executions is not specified.
+ * <li> Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time)
+ * </ol>
+ */
+class MockScheduler(val time: Time) extends Scheduler {
+
+ var tasks = mutable.ArrayBuffer[MockScheduled]()
+
+ def startup() {}
+
+ def shutdown() {
+ tasks.clear()
+ }
+
+ /**
+ * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
+ * when this method is called and the execution happens synchronously in the calling thread.
+ * If you are using the scheduler associated with a MockTime instance this call will happen automatically.
+ */
+ def tick() {
+ var tasks = mutable.ArrayBuffer[MockScheduled]()
+ val now = time.milliseconds
+ for(task <- this.tasks) {
+ if(task.nextExecution <= now) {
+ if(task.period >= 0) {
+ val executions = (now - task.nextExecution) / task.period
+ for(i <- 0 to executions.toInt)
+ task.fun()
+ task.nextExecution += (executions + 1) * task.period
+ tasks += task
+ } else {
+ task.fun()
+ }
+ } else {
+ tasks += task
+ }
+ }
+ this.tasks = tasks
+ }
+
+ def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+ tasks += MockScheduled(name, fun, time.milliseconds + delay, period = period)
+ tick()
+ }
+
+}
+
+case class MockScheduled(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long)
\ No newline at end of file
Added: kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala?rev=1419692&view=auto
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala (added)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala Mon Dec 10 19:31:29 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+
+/**
+ * A class used for unit testing things which depend on the Time interface.
+ *
+ * This class never manually advances the clock, it only does so when you call
+ * sleep(ms)
+ *
+ * It also comes with an associated scheduler instance for managing background tasks in
+ * a deterministic way.
+ */
+class MockTime(@volatile private var currentMs: Long) extends Time {
+
+ val scheduler = new MockScheduler(this)
+
+ def this() = this(System.currentTimeMillis)
+
+ def milliseconds: Long = currentMs
+
+ def nanoseconds: Long =
+ TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+
+ def sleep(ms: Long) {
+ this.currentMs += ms
+ scheduler.tick()
+ }
+
+ override def toString() = "MockTime(%d)".format(milliseconds)
+
+}
Added: kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala?rev=1419692&view=auto
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala (added)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala Mon Dec 10 19:31:29 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils
+
+import junit.framework.Assert._
+import java.util.concurrent.atomic._
+import org.junit.{Test, After, Before}
+import kafka.utils.TestUtils.retry
+
+class SchedulerTest {
+
+ val scheduler = new KafkaScheduler(1)
+ val mockTime = new MockTime
+ val counter1 = new AtomicInteger(0)
+ val counter2 = new AtomicInteger(0)
+
+ @Before
+ def setup() {
+ scheduler.startup()
+ }
+
+ @After
+ def teardown() {
+ scheduler.shutdown()
+ }
+
+ @Test
+ def testMockSchedulerNonPeriodicTask() {
+ mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+ mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100)
+ assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
+ assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
+ mockTime.sleep(1)
+ assertEquals("Counter1 should be incremented", 1, counter1.get)
+ assertEquals("Counter2 should not be incremented", 0, counter2.get)
+ mockTime.sleep(100000)
+ assertEquals("More sleeping should not result in more incrementing on counter1.", 1, counter1.get)
+ assertEquals("Counter2 should now be incremented.", 1, counter2.get)
+ }
+
+ @Test
+ def testMockSchedulerPeriodicTask() {
+ mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1, period=1)
+ mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100, period=100)
+ assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
+ assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
+ mockTime.sleep(1)
+ assertEquals("Counter1 should be incremented", 1, counter1.get)
+ assertEquals("Counter2 should not be incremented", 0, counter2.get)
+ mockTime.sleep(100)
+ assertEquals("Counter1 should be incremented 101 times", 101, counter1.get)
+ assertEquals("Counter2 should not be incremented once", 1, counter2.get)
+ }
+
+ @Test
+ def testNonPeriodicTask() {
+ scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
+ retry(30000, () => assertEquals(counter1.get, 1))
+ Thread.sleep(5)
+ assertEquals("Should only run once", 1, counter1.get)
+ }
+
+ @Test
+ def testPeriodicTask() {
+ scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5)
+ retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20))
+ }
+}
\ No newline at end of file
Modified: kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Dec 10 19:31:29 2012
@@ -433,17 +433,21 @@ object TestUtils extends Logging {
* Execute the given block. If it throws an assert error, retry. Repeat
* until no error is thrown or the time limit ellapses
*/
- def retry(waitTime: Long, block: () => Unit) {
+ def retry(maxWaitMs: Long, block: () => Unit) {
+ var wait = 1L
val startTime = System.currentTimeMillis()
while(true) {
try {
block()
+ return
} catch {
case e: AssertionError =>
- if(System.currentTimeMillis - startTime > waitTime)
+ if(System.currentTimeMillis - startTime > maxWaitMs) {
throw e
- else
- Thread.sleep(100)
+ } else {
+ Thread.sleep(wait)
+ wait += math.min(wait, 1000)
+ }
}
}
}