You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/12/10 20:31:31 UTC

svn commit: r1419692 - in /kafka/trunk/core/src: main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/log/ test/scala/unit/kafka/server/ test/scala/unit/kafka/utils/

Author: jkreps
Date: Mon Dec 10 19:31:29 2012
New Revision: 1419692

URL: http://svn.apache.org/viewvc?rev=1419692&view=rev
Log:
KAFKA-597 Refactor scheduler. Fixes a couple of bugs, and adds the ability to mock scheduled tasks.


Added:
    kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
    kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala
    kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
Removed:
    kafka/trunk/core/src/main/scala/kafka/utils/MockTime.scala
Modified:
    kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    kafka/trunk/core/src/main/scala/kafka/log/Log.scala
    kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
    kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
    kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
    kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala
    kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Mon Dec 10 19:31:29 2012
@@ -86,7 +86,7 @@ private[kafka] class ZookeeperConsumerCo
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
   private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
-  private val scheduler = new KafkaScheduler(1)
+  private val scheduler = new KafkaScheduler(threads = 1, threadNamePrefix = "kafka-consumer-scheduler-")
   private val messageStreamCreated = new AtomicBoolean(false)
 
   private var sessionExpirationListener: ZKSessionExpireListener = null
@@ -114,8 +114,11 @@ private[kafka] class ZookeeperConsumerCo
   if (config.autoCommit) {
     scheduler.startup
     info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
-    scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
-      config.autoCommitIntervalMs, false)
+    scheduler.schedule("kafka-consumer-autocommit", 
+                       autoCommit, 
+                       delay = config.autoCommitIntervalMs,
+                       period = config.autoCommitIntervalMs, 
+                       unit = TimeUnit.MILLISECONDS)
   }
 
   KafkaMetricsReporter.startReporters(config.props)
@@ -160,7 +163,7 @@ private[kafka] class ZookeeperConsumerCo
         wildcardTopicWatcher.shutdown()
       try {
         if (config.autoCommit)
-          scheduler.shutdownNow()
+          scheduler.shutdown()
         fetcher match {
           case Some(f) => f.shutdown
           case None =>

Modified: kafka/trunk/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/Log.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/Log.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/Log.scala Mon Dec 10 19:31:29 2012
@@ -394,7 +394,7 @@ class Log(val dir: File, 
                                    maxIndexSize = maxIndexSize)
       val prev = segments.put(segment.baseOffset, segment)
       if(prev != null)
-        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exsits".format(dir.getName, newOffset))
+        throw new KafkaException("Trying to roll a new log segment for topic partition %s with start offset %d while it already exists.".format(dir.getName, newOffset))
       segment
     }
   }

Modified: kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/log/LogManager.scala Mon Dec 10 19:31:29 2012
@@ -18,6 +18,7 @@
 package kafka.log
 
 import java.io._
+import java.util.concurrent.TimeUnit
 import kafka.utils._
 import scala.collection._
 import kafka.common.{TopicAndPartition, KafkaException}
@@ -35,12 +36,13 @@ import kafka.server.KafkaConfig
  * A background thread handles log retention by periodically truncating excess log segments.
  */
 @threadsafe
-class LogManager(val config: KafkaConfig,
-                 scheduler: KafkaScheduler,
-                 private val time: Time) extends Logging {
+private[kafka] class LogManager(val config: KafkaConfig,
+                                scheduler: Scheduler,
+                                private val time: Time) extends Logging {
 
   val CleanShutdownFile = ".kafka_cleanshutdown"
   val LockFile = ".lock"
+  val InitialTaskDelayMs = 30*1000
   val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
   private val logFileSizeMap = config.logFileSizeMap
   private val logFlushInterval = config.flushInterval
@@ -138,15 +140,19 @@ class LogManager(val config: KafkaConfig
   def startup() {
     /* Schedule the cleanup task to delete old logs */
     if(scheduler != null) {
-      info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
-      scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
-      info("Starting log flusher every " + config.flushSchedulerThreadRate +
-           " ms with the following overrides " + logFlushIntervals)
-      scheduler.scheduleWithRate(flushDirtyLogs, 
-                                 "kafka-logflusher-",
-                                 config.flushSchedulerThreadRate, 
-                                 config.flushSchedulerThreadRate, 
-                                 isDaemon = false)
+      info("Starting log cleanup with a period of %d ms.".format(logCleanupIntervalMs))
+      scheduler.schedule("kafka-log-cleaner", 
+                         cleanupLogs, 
+                         delay = InitialTaskDelayMs, 
+                         period = logCleanupIntervalMs, 
+                         TimeUnit.MILLISECONDS)
+      info("Starting log flusher with a default period of %d ms with the following overrides: %s."
+          .format(config.defaultFlushIntervalMs, logFlushIntervals.map(e => e._1.toString + "=" + e._2.toString).mkString(", ")))
+      scheduler.schedule("kafka-log-flusher", 
+                         flushDirtyLogs, 
+                         delay = InitialTaskDelayMs, 
+                         period = config.flushSchedulerThreadRate, 
+                         TimeUnit.MILLISECONDS)
     }
   }
   

Modified: kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/KafkaConfig.scala Mon Dec 10 19:31:29 2012
@@ -48,6 +48,9 @@ class KafkaConfig private (val props: Ve
   /* the number of io threads that the server uses for carrying out network requests */
   val numIoThreads = props.getIntInRange("io.threads", 8, (1, Int.MaxValue))
   
+  /* the number of threads to use for various background processing tasks */
+  val backgroundThreads = props.getIntInRange("background.threads", 4, (1, Int.MaxValue))
+  
   /* the number of queued requests allowed before blocking the network threads */
   val numQueuedRequests = props.getIntInRange("max.queued.requests", 500, (1, Int.MaxValue))
   

Modified: kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/KafkaServer.scala Mon Dec 10 19:31:29 2012
@@ -40,7 +40,7 @@ class KafkaServer(val config: KafkaConfi
   var replicaManager: ReplicaManager = null
   var apis: KafkaApis = null
   var kafkaController: KafkaController = null
-  val kafkaScheduler = new KafkaScheduler(4)
+  val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
   var zkClient: ZkClient = null
 
   /**
@@ -53,7 +53,7 @@ class KafkaServer(val config: KafkaConfi
     shutdownLatch = new CountDownLatch(1)
 
     /* start scheduler */
-    kafkaScheduler.startup
+    kafkaScheduler.startup()
 
     /* start log manager */
     logManager = new LogManager(config,

Modified: kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/server/ReplicaManager.scala Mon Dec 10 19:31:29 2012
@@ -38,7 +38,7 @@ object ReplicaManager {
 class ReplicaManager(val config: KafkaConfig, 
                      time: Time, 
                      val zkClient: ZkClient, 
-                     kafkaScheduler: KafkaScheduler,
+                     scheduler: Scheduler,
                      val logManager: LogManager) extends Logging with KafkaMetricsGroup {
   /* epoch of the controller that last changed the leader */
   @volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
@@ -72,7 +72,7 @@ class ReplicaManager(val config: KafkaCo
 
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
-      kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.highWaterMarkCheckpointIntervalMs)
+      scheduler.schedule("highwatermark-checkpoint", checkpointHighWatermarks, period = config.highWaterMarkCheckpointIntervalMs, unit = TimeUnit.MILLISECONDS)
   }
 
   /**
@@ -91,7 +91,7 @@ class ReplicaManager(val config: KafkaCo
 
   def startup() {
     // start ISR expiration thread
-    kafkaScheduler.scheduleWithRate(maybeShrinkIsr, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
+    scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaMaxLagTimeMs, unit = TimeUnit.MILLISECONDS)
   }
 
   def stopReplica(topic: String, partitionId: Int, deletePartition: Boolean): Short  = {

Modified: kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/utils/KafkaScheduler.scala Mon Dec 10 19:31:29 2012
@@ -22,51 +22,97 @@ import atomic._
 import collection.mutable.HashMap
 
 /**
- * A scheduler for running jobs in the background
+ * A scheduler for running jobs
+ * 
+ * This interface controls a job scheduler that allows scheduling either repeating background jobs 
+ * that execute periodically or delayed one-time actions that are scheduled in the future.
  */
-class KafkaScheduler(val numThreads: Int) extends Logging {
-  private var executor:ScheduledThreadPoolExecutor = null
-  private val daemonThreadFactory = new ThreadFactory() {
-      def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true)
-    }
-  private val nonDaemonThreadFactory = new ThreadFactory() {
-      def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false)
-    }
-  private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
+trait Scheduler {
+  
+  /**
+   * Initialize this scheduler so it is ready to accept scheduling of tasks
+   */
+  def startup()
+  
+  /**
+   * Shutdown this scheduler. When this method is complete no more executions of background tasks will occur. 
+   * This includes tasks scheduled with a delayed execution.
+   */
+  def shutdown()
+  
+  /**
+   * Schedule a task
+   * @param name The name of this task
+   * @param delay The amount of time to wait before the first execution
+   * @param period The period with which to execute the task. If < 0 the task will execute only once.
+   * @param unit The unit for the preceding times.
+   */
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS)
+}
 
-  def startup() = {
-    executor = new ScheduledThreadPoolExecutor(numThreads)
-    executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
-    executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+/**
+ * A scheduler based on java.util.concurrent.ScheduledThreadPoolExecutor
+ * 
+ * It has a pool of kafka-scheduler- threads that do the actual work.
+ * 
+ * @param threads The number of threads in the thread pool
+ * @param threadNamePrefix The name to use for scheduler threads. This prefix will have a number appended to it.
+ * @param daemon If true the scheduler threads will be "daemon" threads and will not block jvm shutdown.
+ */
+@threadsafe
+class KafkaScheduler(val threads: Int, 
+                     val threadNamePrefix: String = "kafka-scheduler-", 
+                     daemon: Boolean = true) extends Scheduler with Logging {
+  @volatile private var executor: ScheduledThreadPoolExecutor = null
+  private val schedulerThreadId = new AtomicInteger(0)
+  
+  override def startup() {
+    debug("Initializing task scheduler.")
+    this synchronized {
+      if(executor != null)
+        throw new IllegalStateException("This scheduler has already been started!")
+      executor = new ScheduledThreadPoolExecutor(threads)
+      executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
+      executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
+      executor.setThreadFactory(new ThreadFactory() {
+                                  def newThread(runnable: Runnable): Thread = 
+                                    Utils.newThread(threadNamePrefix + schedulerThreadId.getAndIncrement(), runnable, daemon)
+                                })
+    }
   }
-
-  def hasShutdown: Boolean = executor.isShutdown
-
-  private def ensureExecutorHasStarted = {
-    if(executor == null)
-      throw new IllegalStateException("Kafka scheduler has not been started")
+  
+  override def shutdown() {
+    debug("Shutting down task scheduler.")
+    ensureStarted
+    executor.shutdown()
+    executor.awaitTermination(1, TimeUnit.DAYS)
+    this.executor = null
   }
 
-  def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = {
-    ensureExecutorHasStarted
-    if(isDaemon)
-      executor.setThreadFactory(daemonThreadFactory)
+  def schedule(name: String, fun: ()=>Unit, delay: Long, period: Long, unit: TimeUnit) = {
+    debug("Scheduling task %s with initial delay %d ms and period %d ms."
+        .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
+    ensureStarted
+    val runnable = new Runnable {
+      def run() = {
+        try {
+          trace("Begining execution of scheduled task '%s'.".format(name))
+          fun()
+        } catch {
+          case t => error("Uncaught exception in scheduled task '" + name +"'", t)
+        } finally {
+          trace("Completed execution of scheduled task '%s'.".format(name))
+        }
+      }
+    }
+    if(period >= 0)
+      executor.scheduleAtFixedRate(runnable, delay, period, unit)
     else
-      executor.setThreadFactory(nonDaemonThreadFactory)
-    val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0))
-    executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs,
-      TimeUnit.MILLISECONDS)
-  }
-
-  def shutdownNow() {
-    ensureExecutorHasStarted
-    executor.shutdownNow()
-    info("Forcing shutdown of Kafka scheduler")
+      executor.schedule(runnable, delay, unit)
   }
-
-  def shutdown() {
-    ensureExecutorHasStarted
-    executor.shutdown()
-    info("Shutdown Kafka scheduler")
+  
+  private def ensureStarted = {
+    if(executor == null)
+      throw new IllegalStateException("Kafka scheduler has not been started")
   }
 }

Modified: kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ kafka/trunk/core/src/main/scala/kafka/utils/Utils.scala Mon Dec 10 19:31:29 2012
@@ -52,26 +52,6 @@ object Utils extends Logging {
     new Runnable() {
       def run() = fun()
     }
-  
-  /**
-   * Wrap the given function in a java.lang.Runnable that logs any errors encountered
-   * @param fun A function
-   * @return A Runnable that just executes the function
-   */
-  def loggedRunnable(fun: () => Unit, name: String): Runnable =
-    new Runnable() {
-      def run() = {
-        Thread.currentThread().setName(name)
-        try {
-          fun()
-        }
-        catch {
-          case t =>
-            // log any error and the stack trace
-            error("error in loggedRunnable", t)
-        }
-      }
-    }
 
   /**
    * Create a daemon thread

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Mon Dec 10 19:31:29 2012
@@ -35,7 +35,6 @@ class LogManagerTest extends JUnit3Suite
   var config: KafkaConfig = null
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
-  val scheduler = new KafkaScheduler(2)
 
   override def setUp() {
     super.setUp()
@@ -44,14 +43,12 @@ class LogManagerTest extends JUnit3Suite
                    override val flushInterval = 10000
                    override val logRetentionHours = maxLogAgeHours
                  }
-    scheduler.startup
-    logManager = new LogManager(config, scheduler, time)
+    logManager = new LogManager(config, time.scheduler, time)
     logManager.startup
     logDir = logManager.logDirs(0)
   }
 
   override def tearDown() {
-    scheduler.shutdown()
     if(logManager != null)
       logManager.shutdown()
     Utils.rm(logDir)
@@ -97,10 +94,9 @@ class LogManagerTest extends JUnit3Suite
     assertTrue("There should be more than one segment now.", log.numberOfSegments > 1)
 
     // update the last modified time of all log segments
-    log.logSegments.foreach(_.log.file.setLastModified(time.currentMs))
+    log.logSegments.foreach(_.log.file.setLastModified(time.milliseconds))
 
-    time.currentMs += maxLogAgeHours*60*60*1000 + 1
-    logManager.cleanupLogs()
+    time.sleep(maxLogAgeHours*60*60*1000 + 1)
     assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
 
@@ -120,18 +116,14 @@ class LogManagerTest extends JUnit3Suite
   @Test
   def testCleanupSegmentsToMaintainSize() {
     val setSize = TestUtils.singleMessageSet("test".getBytes()).sizeInBytes
-    val retentionHours = 1
-    val retentionMs = 1000 * 60 * 60 * retentionHours
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)) // each segment will be 10 messages
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
-      override val logRetentionHours = retentionHours
-      override val flushInterval = 100
       override val logRollHours = maxRollInterval
     }
-    logManager = new LogManager(config, scheduler, time)
+    logManager = new LogManager(config, time.scheduler, time)
     logManager.startup
 
     // create a log
@@ -149,7 +141,7 @@ class LogManagerTest extends JUnit3Suite
     assertEquals("There should be example 100 segments.", 100, log.numberOfSegments)
 
     // this cleanup shouldn't find any expired segments but should delete some to reduce size
-    logManager.cleanupLogs()
+    time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
     try {
@@ -170,11 +162,11 @@ class LogManagerTest extends JUnit3Suite
     val props = TestUtils.createBrokerConfig(0, -1)
     logManager.shutdown()
     config = new KafkaConfig(props) {
-                   override val flushSchedulerThreadRate = 5
-                   override val defaultFlushIntervalMs = 5
+                   override val flushSchedulerThreadRate = 1000
+                   override val defaultFlushIntervalMs = 1000
                    override val flushInterval = Int.MaxValue
                  }
-    logManager = new LogManager(config, scheduler, SystemTime)
+    logManager = new LogManager(config, time.scheduler, time)
     logManager.startup
     val log = logManager.getOrCreateLog(name, 0)
     val lastFlush = log.lastFlushTime
@@ -182,7 +174,7 @@ class LogManagerTest extends JUnit3Suite
       var set = TestUtils.singleMessageSet("test".getBytes())
       log.append(set)
     }
-    Thread.sleep(config.flushSchedulerThreadRate)
+    time.sleep(logManager.InitialTaskDelayMs)
     assertTrue("Time based flush should have been triggered triggered", lastFlush != log.lastFlushTime)
   }
   
@@ -198,7 +190,7 @@ class LogManagerTest extends JUnit3Suite
                    TestUtils.tempDir().getAbsolutePath)
     props.put("log.directories", dirs.mkString(","))
     logManager.shutdown()
-    logManager = new LogManager(new KafkaConfig(props), scheduler, time)
+    logManager = new LogManager(new KafkaConfig(props), time.scheduler, time)
     
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
@@ -214,7 +206,7 @@ class LogManagerTest extends JUnit3Suite
    */
   def testTwoLogManagersUsingSameDirFails() {
     try {
-      new LogManager(logManager.config, scheduler, time)
+      new LogManager(logManager.config, time.scheduler, time)
       fail("Should not be able to create a second log manager instance with the same data directory")
     } catch {
       case e: KafkaException => // this is good 

Modified: kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/log/LogTest.scala Mon Dec 10 19:31:29 2012
@@ -66,7 +66,7 @@ class LogTest extends JUnitSuite {
 
     // create a log
     val log = new Log(logDir, 1000, config.maxMessageSize, 1000, rollMs, needsRecovery = false, time = time)
-    time.currentMs += rollMs + 1
+    time.sleep(rollMs + 1)
 
     // segment age is less than its limit
     log.append(set)
@@ -76,13 +76,13 @@ class LogTest extends JUnitSuite {
     assertEquals("There should still be exactly one segment.", 1, log.numberOfSegments)
 
     for(numSegments <- 2 until 4) {
-      time.currentMs += rollMs + 1
+      time.sleep(rollMs + 1)
       log.append(set)
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
     }
 
     val numSegments = log.numberOfSegments
-    time.currentMs += rollMs + 1
+    time.sleep(rollMs + 1)
     log.append(new ByteBufferMessageSet())
     assertEquals("Appending an empty message set should not roll log even if succient time has passed.", numSegments, log.numberOfSegments)
   }

Modified: kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Mon Dec 10 19:31:29 2012
@@ -24,7 +24,7 @@ import org.junit._
 import org.junit.Assert._
 import kafka.common.KafkaException
 import kafka.cluster.Replica
-import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
+import kafka.utils._
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 

Added: kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala?rev=1419692&view=auto
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala (added)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/MockScheduler.scala Mon Dec 10 19:31:29 2012
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils
+
+import scala.collection._
+import java.util.concurrent.TimeUnit
+
+/**
+ * A mock scheduler that executes tasks synchronously using a mock time instance. Tasks are executed synchronously when
+ * the time is advanced. This class is meant to be used in conjunction with MockTime.
+ * 
+ * Example usage
+ * <code>
+ *   val time = new MockTime
+ *   time.scheduler.schedule("a task", println("hello world: " + time.milliseconds), delay = 1000)
+ *   time.sleep(1001) // this should cause our scheduled task to fire
+ * </code>
+ *   
+ * Two gotchas:
+ * <ol>
+ * <li> Incrementing the time by more than one task period will result in the correct number of executions of each scheduled task
+ * but the order of these executions is not specified.
+ * <li> Incrementing the time to the exact next execution time of a task will result in that task executing (it as if execution itself takes no time)
+ * </ol>
+ */
+class MockScheduler(val time: Time) extends Scheduler {
+  
+  var tasks = mutable.ArrayBuffer[MockScheduled]()
+
+  def startup() {}
+  
+  def shutdown() {
+    tasks.clear()
+  }
+  
+  /**
+   * Check for any tasks that need to execute. Since this is a mock scheduler this check only occurs
+   * when this method is called and the execution happens synchronously in the calling thread.
+   * If you are using the scheduler associated with a MockTime instance this call will happen automatically.
+   */
+  def tick() {
+    var tasks = mutable.ArrayBuffer[MockScheduled]()
+    val now = time.milliseconds
+    for(task <- this.tasks) {
+      if(task.nextExecution <= now) {
+        if(task.period >= 0) {
+          val executions = (now - task.nextExecution) / task.period
+          for(i <- 0 to executions.toInt)
+            task.fun()
+          task.nextExecution += (executions + 1) * task.period
+          tasks += task
+        } else {
+          task.fun()
+        }
+      } else {
+        tasks += task
+      }
+    }
+    this.tasks = tasks
+  }
+  
+  def schedule(name: String, fun: ()=>Unit, delay: Long = 0, period: Long = -1, unit: TimeUnit = TimeUnit.MILLISECONDS) {
+    tasks += MockScheduled(name, fun, time.milliseconds + delay, period = period)
+    tick()
+  }
+  
+}
+
+case class MockScheduled(val name: String, val fun: () => Unit, var nextExecution: Long, val period: Long)
\ No newline at end of file

Added: kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala?rev=1419692&view=auto
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala (added)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/MockTime.scala Mon Dec 10 19:31:29 2012
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.utils
+
+import java.util.concurrent._
+
+/**
+ * A class used for unit testing things which depend on the Time interface.
+ * 
+ * This class never manually advances the clock, it only does so when you call
+ *   sleep(ms)
+ * 
+ * It also comes with an associated scheduler instance for managing background tasks in
+ * a deterministic way.
+ */
+class MockTime(@volatile private var currentMs: Long) extends Time {
+  
+  val scheduler = new MockScheduler(this)
+  
+  def this() = this(System.currentTimeMillis)
+  
+  def milliseconds: Long = currentMs
+
+  def nanoseconds: Long = 
+    TimeUnit.NANOSECONDS.convert(currentMs, TimeUnit.MILLISECONDS)
+
+  def sleep(ms: Long) {
+    this.currentMs += ms
+    scheduler.tick()
+  }
+  
+  override def toString() = "MockTime(%d)".format(milliseconds)
+
+}

Added: kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala?rev=1419692&view=auto
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala (added)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/SchedulerTest.scala Mon Dec 10 19:31:29 2012
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.utils
+
+import junit.framework.Assert._
+import java.util.concurrent.atomic._
+import org.junit.{Test, After, Before}
+import kafka.utils.TestUtils.retry
+
+class SchedulerTest {
+
+  val scheduler = new KafkaScheduler(1)
+  val mockTime = new MockTime
+  val counter1 = new AtomicInteger(0)
+  val counter2 = new AtomicInteger(0)
+  
+  @Before
+  def setup() {
+    scheduler.startup()
+  }
+  
+  @After
+  def teardown() {
+    scheduler.shutdown()
+  }
+  
+  @Test
+  def testMockSchedulerNonPeriodicTask() {
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1)
+    mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100)
+    assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
+    assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
+    mockTime.sleep(1)
+    assertEquals("Counter1 should be incremented", 1, counter1.get)
+    assertEquals("Counter2 should not be incremented", 0, counter2.get)
+    mockTime.sleep(100000)
+    assertEquals("More sleeping should not result in more incrementing on counter1.", 1, counter1.get)
+    assertEquals("Counter2 should now be incremented.", 1, counter2.get)
+  }
+  
+  @Test
+  def testMockSchedulerPeriodicTask() {
+    mockTime.scheduler.schedule("test1", counter1.getAndIncrement, delay=1, period=1)
+    mockTime.scheduler.schedule("test2", counter2.getAndIncrement, delay=100, period=100)
+    assertEquals("Counter1 should not be incremented prior to task running.", 0, counter1.get)
+    assertEquals("Counter2 should not be incremented prior to task running.", 0, counter2.get)
+    mockTime.sleep(1)
+    assertEquals("Counter1 should be incremented", 1, counter1.get)
+    assertEquals("Counter2 should not be incremented", 0, counter2.get)
+    mockTime.sleep(100)
+    assertEquals("Counter1 should be incremented 101 times", 101, counter1.get)
+    assertEquals("Counter2 should not be incremented once", 1, counter2.get)
+  }
+  
+  @Test
+  def testNonPeriodicTask() {
+    scheduler.schedule("test", counter1.getAndIncrement, delay = 0)
+    retry(30000, () => assertEquals(counter1.get, 1))
+    Thread.sleep(5)
+    assertEquals("Should only run once", 1, counter1.get)
+  }
+  
+  @Test
+  def testPeriodicTask() {
+    scheduler.schedule("test", counter1.getAndIncrement, delay = 0, period = 5)
+    retry(30000, () => assertTrue("Should count to 20", counter1.get >= 20))
+  }
+}
\ No newline at end of file

Modified: kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1419692&r1=1419691&r2=1419692&view=diff
==============================================================================
--- kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ kafka/trunk/core/src/test/scala/unit/kafka/utils/TestUtils.scala Mon Dec 10 19:31:29 2012
@@ -433,17 +433,21 @@ object TestUtils extends Logging {
    * Execute the given block. If it throws an assert error, retry. Repeat
    * until no error is thrown or the time limit ellapses
    */
-  def retry(waitTime: Long, block: () => Unit) {
+  def retry(maxWaitMs: Long, block: () => Unit) {
+    var wait = 1L
     val startTime = System.currentTimeMillis()
     while(true) {
       try {
         block()
+        return
       } catch {
         case e: AssertionError =>
-          if(System.currentTimeMillis - startTime > waitTime)
+          if(System.currentTimeMillis - startTime > maxWaitMs) {
             throw e
-          else
-            Thread.sleep(100)
+          } else {
+            Thread.sleep(wait)
+            wait += math.min(wait, 1000)
+          }
       }
     }
   }