You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:27 UTC

[30/37] git commit: kafka-1414; Speedup broker startup after hard reset; patched by Anton Karamanov; reviewed by Jay Kreps and Jun Rao

kafka-1414; Speedup broker startup after hard reset; patched by Anton Karamanov; reviewed by Jay Kreps and Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f489493c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f489493c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f489493c

Branch: refs/heads/transactional_messaging
Commit: f489493c385e3266c3bc17db2e8ebc215a6e54e2
Parents: fa34841
Author: Anton Karamanov <at...@gmail.com>
Authored: Sun Jul 27 21:13:20 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Jul 27 21:13:20 2014 -0700

----------------------------------------------------------------------
 config/server.properties                        |   4 +
 core/src/main/scala/kafka/log/LogManager.scala  | 179 ++++++++++++++-----
 .../main/scala/kafka/server/KafkaConfig.scala   |   3 +
 .../main/scala/kafka/server/KafkaServer.scala   |   1 +
 .../main/scala/kafka/utils/KafkaScheduler.scala |  18 +-
 core/src/main/scala/kafka/utils/Utils.scala     |   6 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |  97 ++--------
 .../server/HighwatermarkPersistenceTest.scala   |  17 +-
 .../unit/kafka/server/ReplicaManagerTest.scala  |  20 +--
 .../test/scala/unit/kafka/utils/TestUtils.scala |  25 +++
 10 files changed, 203 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index f16c84c..5c0905a 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -62,6 +62,10 @@ log.dirs=/tmp/kafka-logs
 # the brokers.
 num.partitions=1
 
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
 ############################# Log Flush Policy #############################
 
 # Messages are immediately written to the filesystem but by default we only fsync() to sync

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 1946c94..4d2924d 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,6 +23,7 @@ import kafka.utils._
 import scala.collection._
 import kafka.common.{TopicAndPartition, KafkaException}
 import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
+import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future}
 
 /**
  * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -39,6 +40,7 @@ class LogManager(val logDirs: Array[File],
                  val topicConfigs: Map[String, LogConfig],
                  val defaultConfig: LogConfig,
                  val cleanerConfig: CleanerConfig,
+                 ioThreads: Int,
                  val flushCheckMs: Long,
                  val flushCheckpointMs: Long,
                  val retentionCheckMs: Long,
@@ -54,7 +56,7 @@ class LogManager(val logDirs: Array[File],
   createAndValidateLogDirs(logDirs)
   private val dirLocks = lockLogDirs(logDirs)
   private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
-  loadLogs(logDirs)
+  loadLogs()
   
   private val cleaner: LogCleaner = 
     if(cleanerConfig.enableCleaner)
@@ -101,36 +103,71 @@ class LogManager(val logDirs: Array[File],
   /**
    * Recover and load all logs in the given data directories
    */
-  private def loadLogs(dirs: Seq[File]) {
-    for(dir <- dirs) {
+  private def loadLogs(): Unit = {
+    info("Loading logs.")
+
+    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
+    val jobs = mutable.Map.empty[File, Seq[Future[_]]]
+
+    for (dir <- this.logDirs) {
+      val pool = Executors.newFixedThreadPool(ioThreads)
+      threadPools.append(pool)
+
+      val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
+
+      if (cleanShutdownFile.exists) {
+        debug(
+          "Found clean shutdown file. " +
+          "Skipping recovery for all logs in data directory: " +
+          dir.getAbsolutePath)
+      } else {
+        // log recovery itself is being performed by `Log` class during initialization
+        brokerState.newState(RecoveringFromUncleanShutdown)
+      }
+
       val recoveryPoints = this.recoveryPointCheckpoints(dir).read
-      /* load the logs */
-      val subDirs = dir.listFiles()
-      if(subDirs != null) {
-        val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
-        if(cleanShutDownFile.exists())
-          info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
-        else
-          brokerState.newState(RecoveringFromUncleanShutdown)
-
-        for(dir <- subDirs) {
-          if(dir.isDirectory) {
-            info("Loading log '" + dir.getName + "'")
-            val topicPartition = Log.parseTopicPartitionName(dir.getName)
-            val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
-            val log = new Log(dir, 
-                              config,
-                              recoveryPoints.getOrElse(topicPartition, 0L),
-                              scheduler,
-                              time)
-            val previous = this.logs.put(topicPartition, log)
-            if(previous != null)
-              throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+
+      val jobsForDir = for {
+        dirContent <- Option(dir.listFiles).toList
+        logDir <- dirContent if logDir.isDirectory
+      } yield {
+        Utils.runnable {
+          debug("Loading log '" + logDir.getName + "'")
+
+          val topicPartition = Log.parseTopicPartitionName(logDir.getName)
+          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
+          val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
+
+          val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
+          val previous = this.logs.put(topicPartition, current)
+
+          if (previous != null) {
+            throw new IllegalArgumentException(
+              "Duplicate log directories found: %s, %s!".format(
+              current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
           }
         }
-        cleanShutDownFile.delete()
       }
+
+      jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
+    }
+
+
+    try {
+      for ((cleanShutdownFile, dirJobs) <- jobs) {
+        dirJobs.foreach(_.get)
+        cleanShutdownFile.delete()
+      }
+    } catch {
+      case e: ExecutionException => {
+        error("There was an error in one of the threads during logs loading: " + e.getCause)
+        throw e.getCause
+      }
+    } finally {
+      threadPools.foreach(_.shutdown())
     }
+
+    info("Logs loading complete.")
   }
 
   /**
@@ -160,31 +197,69 @@ class LogManager(val logDirs: Array[File],
     if(cleanerConfig.enableCleaner)
       cleaner.startup()
   }
-  
+
   /**
    * Close all the logs
    */
   def shutdown() {
     info("Shutting down.")
+
+    val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
+    val jobs = mutable.Map.empty[File, Seq[Future[_]]]
+
+    // stop the cleaner first
+    if (cleaner != null) {
+      Utils.swallow(cleaner.shutdown())
+    }
+
+    // close logs in each dir
+    for (dir <- this.logDirs) {
+      debug("Flushing and closing logs at " + dir)
+
+      val pool = Executors.newFixedThreadPool(ioThreads)
+      threadPools.append(pool)
+
+      val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
+
+      val jobsForDir = logsInDir map { log =>
+        Utils.runnable {
+          // flush the log to ensure latest possible recovery point
+          log.flush()
+          log.close()
+        }
+      }
+
+      jobs(dir) = jobsForDir.map(pool.submit).toSeq
+    }
+
+
     try {
-      // stop the cleaner first
-      if(cleaner != null)
-        Utils.swallow(cleaner.shutdown())
-      // flush the logs to ensure latest possible recovery point
-      allLogs.foreach(_.flush())
-      // close the logs
-      allLogs.foreach(_.close())
-      // update the last flush point
-      checkpointRecoveryPointOffsets()
-      // mark that the shutdown was clean by creating the clean shutdown marker file
-      logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()))
+      for ((dir, dirJobs) <- jobs) {
+        dirJobs.foreach(_.get)
+
+        // update the last flush point
+        debug("Updating recovery points at " + dir)
+        checkpointLogsInDir(dir)
+
+        // mark that the shutdown was clean by creating marker file
+        debug("Writing clean shutdown marker at " + dir)
+        Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
+      }
+    } catch {
+      case e: ExecutionException => {
+        error("There was an error in one of the threads during LogManager shutdown: " + e.getCause)
+        throw e.getCause
+      }
     } finally {
+      threadPools.foreach(_.shutdown())
       // regardless of whether the close succeeded, we need to unlock the data directories
       dirLocks.foreach(_.destroy())
     }
+
     info("Shutdown complete.")
   }
 
+
   /**
    * Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
    *
@@ -230,14 +305,19 @@ class LogManager(val logDirs: Array[File],
    * to avoid recovering the whole log on startup.
    */
   def checkpointRecoveryPointOffsets() {
-    val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString)
-    for(dir <- logDirs) {
-        val recoveryPoints = recoveryPointsByDir.get(dir.toString)
-        if(recoveryPoints.isDefined)
-          this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
+    this.logDirs.foreach(checkpointLogsInDir)
+  }
+
+  /**
+   * Make a checkpoint for all logs in provided directory.
+   */
+  private def checkpointLogsInDir(dir: File): Unit = {
+    val recoveryPoints = this.logsByDir.get(dir.toString)
+    if (recoveryPoints.isDefined) {
+      this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
     }
   }
-  
+
   /**
    * Get the log if it exists, otherwise return None
    */
@@ -366,13 +446,22 @@ class LogManager(val logDirs: Array[File],
    * Get all the partition logs
    */
   def allLogs(): Iterable[Log] = logs.values
-  
+
   /**
    * Get a map of TopicAndPartition => Log
    */
   def logsByTopicPartition = logs.toMap
 
   /**
+   * Map of log dir to logs by topic and partitions in that dir
+   */
+  private def logsByDir = {
+    this.logsByTopicPartition.groupBy {
+      case (_, log) => log.dir.getParent
+    }
+  }
+
+  /**
    * Flush any log which has exceeded its flush interval and has unwritten messages.
    */
   private def flushDirtyLogs() = {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 50b09ed..1a45f87 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -190,6 +190,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
   /* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */
   val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue))
 
+  /* the number of threads per data directory to be used for log recovery at startup and flushing at shutdown */
+  val numRecoveryThreadsPerDataDir = props.getIntInRange("num.recovery.threads.per.data.dir", 1, (1, Int.MaxValue))
+
   /* enable auto creation of topic on the server */
   val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index def1dc2..2871118 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -303,6 +303,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
                    topicConfigs = configs,
                    defaultConfig = defaultLogConfig,
                    cleanerConfig = cleanerConfig,
+                   ioThreads = config.numRecoveryThreadsPerDataDir,
                    flushCheckMs = config.logFlushSchedulerIntervalMs,
                    flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
                    retentionCheckMs = config.logCleanupIntervalMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 8e37505..9a16343 100644
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -93,16 +93,14 @@ class KafkaScheduler(val threads: Int,
     debug("Scheduling task %s with initial delay %d ms and period %d ms."
         .format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
     ensureStarted
-    val runnable = new Runnable {
-      def run() = {
-        try {
-          trace("Begining execution of scheduled task '%s'.".format(name))
-          fun()
-        } catch {
-          case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
-        } finally {
-          trace("Completed execution of scheduled task '%s'.".format(name))
-        }
+    val runnable = Utils.runnable {
+      try {
+        trace("Begining execution of scheduled task '%s'.".format(name))
+        fun()
+      } catch {
+        case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
+      } finally {
+        trace("Completed execution of scheduled task '%s'.".format(name))
       }
     }
     if(period >= 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 09bfbce..da52b42 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -49,9 +49,9 @@ object Utils extends Logging {
    * @param fun A function
    * @return A Runnable that just executes the function
    */
-  def runnable(fun: () => Unit): Runnable = 
-    new Runnable() {
-      def run() = fun()
+  def runnable(fun: => Unit): Runnable =
+    new Runnable {
+      def run() = fun
     }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index d03d4c4..7d4c70c 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -35,21 +35,11 @@ class LogManagerTest extends JUnit3Suite {
   var logManager: LogManager = null
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
-  val cleanerConfig = CleanerConfig(enableCleaner = false)
 
   override def setUp() {
     super.setUp()
     logDir = TestUtils.tempDir()
-    logManager = new LogManager(logDirs = Array(logDir), 
-                                topicConfigs = Map(), 
-                                defaultConfig = logConfig, 
-                                cleanerConfig = cleanerConfig, 
-                                flushCheckMs = 1000L, 
-                                flushCheckpointMs = 100000L, 
-                                retentionCheckMs = 1000L, 
-                                scheduler = time.scheduler, 
-                                time = time,
-                                brokerState = new BrokerState())
+    logManager = createLogManager()
     logManager.startup
     logDir = logManager.logDirs(0)
   }
@@ -125,18 +115,7 @@ class LogManagerTest extends JUnit3Suite {
     logManager.shutdown()
 
     val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
-    logManager = new LogManager(
-      logDirs = Array(logDir),
-      topicConfigs = Map(),
-      defaultConfig = config,
-      cleanerConfig = cleanerConfig,
-      flushCheckMs = 1000L,
-      flushCheckpointMs = 100000L,
-      retentionCheckMs = 1000L,
-      scheduler = time.scheduler,
-      brokerState = new BrokerState(),
-      time = time
-    )
+    logManager = createLogManager()
     logManager.startup
 
     // create a log
@@ -176,18 +155,7 @@ class LogManagerTest extends JUnit3Suite {
   def testTimeBasedFlush() {
     logManager.shutdown()
     val config = logConfig.copy(flushMs = 1000)
-    logManager = new LogManager(
-      logDirs = Array(logDir),
-      topicConfigs = Map(),
-      defaultConfig = config,
-      cleanerConfig = cleanerConfig,
-      flushCheckMs = 1000L,
-      flushCheckpointMs = 10000L,
-      retentionCheckMs = 1000L,
-      scheduler = time.scheduler,
-      brokerState = new BrokerState(),
-      time = time
-    )
+    logManager = createLogManager()
     logManager.startup
     val log = logManager.createLog(TopicAndPartition(name, 0), config)
     val lastFlush = log.lastFlushTime
@@ -209,19 +177,8 @@ class LogManagerTest extends JUnit3Suite {
                      TestUtils.tempDir(), 
                      TestUtils.tempDir())
     logManager.shutdown()
-    logManager = new LogManager(
-      logDirs = dirs,
-      topicConfigs = Map(),
-      defaultConfig = logConfig,
-      cleanerConfig = cleanerConfig,
-      flushCheckMs = 1000L,
-      flushCheckpointMs = 10000L,
-      retentionCheckMs = 1000L,
-      scheduler = time.scheduler,
-      brokerState = new BrokerState(),
-      time = time
-    )
-    
+    logManager = createLogManager()
+
     // verify that logs are always assigned to the least loaded partition
     for(partition <- 0 until 20) {
       logManager.createLog(TopicAndPartition("test", partition), logConfig)
@@ -237,18 +194,7 @@ class LogManagerTest extends JUnit3Suite {
   @Test
   def testTwoLogManagersUsingSameDirFails() {
     try {
-      new LogManager(
-        logDirs = Array(logDir),
-        topicConfigs = Map(),
-        defaultConfig = logConfig,
-        cleanerConfig = cleanerConfig,
-        flushCheckMs = 1000L,
-        flushCheckpointMs = 10000L,
-        retentionCheckMs = 1000L,
-        scheduler = time.scheduler,
-        brokerState = new BrokerState(),
-        time = time
-      )
+      createLogManager()
       fail("Should not be able to create a second log manager instance with the same data directory")
     } catch {
       case e: KafkaException => // this is good 
@@ -270,16 +216,8 @@ class LogManagerTest extends JUnit3Suite {
   def testRecoveryDirectoryMappingWithTrailingSlash() {
     logManager.shutdown()
     logDir = TestUtils.tempDir()
-    logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)),
-      topicConfigs = Map(),
-      defaultConfig = logConfig,
-      cleanerConfig = cleanerConfig,
-      flushCheckMs = 1000L,
-      flushCheckpointMs = 100000L,
-      retentionCheckMs = 1000L,
-      scheduler = time.scheduler,
-      time = time,
-      brokerState = new BrokerState())
+    logManager = TestUtils.createLogManager(
+      logDirs = Array(new File(logDir.getAbsolutePath + File.separator)))
     logManager.startup
     verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
   }
@@ -293,16 +231,7 @@ class LogManagerTest extends JUnit3Suite {
     logDir = new File("data" + File.separator + logDir.getName)
     logDir.mkdirs()
     logDir.deleteOnExit()
-    logManager = new LogManager(logDirs = Array(logDir),
-      topicConfigs = Map(),
-      defaultConfig = logConfig,
-      cleanerConfig = cleanerConfig,
-      flushCheckMs = 1000L,
-      flushCheckpointMs = 100000L,
-      retentionCheckMs = 1000L,
-      scheduler = time.scheduler,
-      time = time,
-      brokerState = new BrokerState())
+    logManager = createLogManager()
     logManager.startup
     verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
   }
@@ -327,4 +256,12 @@ class LogManagerTest extends JUnit3Suite {
       }
     }
   }
+
+
+  private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = {
+    TestUtils.createLogManager(
+      defaultConfig = logConfig,
+      logDirs = logDirs,
+      time = this.time)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 558a5d6..e532c28 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -32,16 +32,11 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
 
   val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
   val topic = "foo"
-  val logManagers = configs.map(config => new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
-                                                         topicConfigs = Map(),
-                                                         defaultConfig = LogConfig(),
-                                                         cleanerConfig = CleanerConfig(),
-                                                         flushCheckMs = 30000,
-                                                         flushCheckpointMs = 10000L,
-                                                         retentionCheckMs = 30000,
-                                                         scheduler = new KafkaScheduler(1),
-                                                         brokerState = new BrokerState(),
-                                                         time = new MockTime))
+  val logManagers = configs map { config =>
+    TestUtils.createLogManager(
+      logDirs = config.logDirs.map(new File(_)).toArray,
+      cleanerConfig = CleanerConfig())
+  }
     
   @After
   def teardown() {
@@ -147,4 +142,4 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L)
   }
   
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 518d416..9abf219 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import kafka.utils.{MockScheduler, MockTime, TestUtils}
-import kafka.log.{CleanerConfig, LogManager, LogConfig}
 
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.File
@@ -37,7 +36,7 @@ class ReplicaManagerTest extends JUnit3Suite {
     val props = TestUtils.createBrokerConfig(1)
     val config = new KafkaConfig(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
     val partition = rm.getOrCreatePartition(topic, 1, 1)
@@ -51,26 +50,11 @@ class ReplicaManagerTest extends JUnit3Suite {
     props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
     val config = new KafkaConfig(props)
     val zkClient = EasyMock.createMock(classOf[ZkClient])
-    val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
     val partition = rm.getOrCreatePartition(topic, 1, 1)
     partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
   }
-
-  private def createLogManager(logDirs: Array[File]): LogManager = {
-    val time = new MockTime()
-    return new LogManager(logDirs,
-      topicConfigs = Map(),
-      defaultConfig = new LogConfig(),
-      cleanerConfig = CleanerConfig(enableCleaner = false),
-      flushCheckMs = 1000L,
-      flushCheckpointMs = 100000L,
-      retentionCheckMs = 1000L,
-      scheduler = time.scheduler,
-      brokerState = new BrokerState(),
-      time = time)
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4d01d25..c4e13c5 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -39,6 +39,7 @@ import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
 import kafka.common.TopicAndPartition
 import kafka.admin.AdminUtils
 import kafka.producer.ProducerConfig
+import kafka.log._
 
 import junit.framework.AssertionFailedError
 import junit.framework.Assert._
@@ -689,6 +690,30 @@ object TestUtils extends Logging {
   def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {
     ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
   }
+
+
+  /**
+   * Create new LogManager instance with default configuration for testing
+   */
+  def createLogManager(
+    logDirs: Array[File] = Array.empty[File],
+    defaultConfig: LogConfig = LogConfig(),
+    cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
+    time: MockTime = new MockTime()) =
+  {
+    new LogManager(
+      logDirs = logDirs,
+      topicConfigs = Map(),
+      defaultConfig = defaultConfig,
+      cleanerConfig = cleanerConfig,
+      ioThreads = 4,
+      flushCheckMs = 1000L,
+      flushCheckpointMs = 10000L,
+      retentionCheckMs = 1000L,
+      scheduler = time.scheduler,
+      time = time,
+      brokerState = new BrokerState())
+  }
 }
 
 object TestZKUtils {