You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2014/08/06 01:00:27 UTC
[30/37] git commit: kafka-1414;
Speedup broker startup after hard reset; patched by Anton Karamanov;
reviewed by Jay Kreps and Jun Rao
kafka-1414; Speedup broker startup after hard reset; patched by Anton Karamanov; reviewed by Jay Kreps and Jun Rao
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f489493c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f489493c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f489493c
Branch: refs/heads/transactional_messaging
Commit: f489493c385e3266c3bc17db2e8ebc215a6e54e2
Parents: fa34841
Author: Anton Karamanov <at...@gmail.com>
Authored: Sun Jul 27 21:13:20 2014 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Sun Jul 27 21:13:20 2014 -0700
----------------------------------------------------------------------
config/server.properties | 4 +
core/src/main/scala/kafka/log/LogManager.scala | 179 ++++++++++++++-----
.../main/scala/kafka/server/KafkaConfig.scala | 3 +
.../main/scala/kafka/server/KafkaServer.scala | 1 +
.../main/scala/kafka/utils/KafkaScheduler.scala | 18 +-
core/src/main/scala/kafka/utils/Utils.scala | 6 +-
.../scala/unit/kafka/log/LogManagerTest.scala | 97 ++--------
.../server/HighwatermarkPersistenceTest.scala | 17 +-
.../unit/kafka/server/ReplicaManagerTest.scala | 20 +--
.../test/scala/unit/kafka/utils/TestUtils.scala | 25 +++
10 files changed, 203 insertions(+), 167 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/config/server.properties
----------------------------------------------------------------------
diff --git a/config/server.properties b/config/server.properties
index f16c84c..5c0905a 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -62,6 +62,10 @@ log.dirs=/tmp/kafka-logs
# the brokers.
num.partitions=1
+# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
+# This value is recommended to be increased for installations with data dirs located in RAID array.
+num.recovery.threads.per.data.dir=1
+
############################# Log Flush Policy #############################
# Messages are immediately written to the filesystem but by default we only fsync() to sync
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 1946c94..4d2924d 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -23,6 +23,7 @@ import kafka.utils._
import scala.collection._
import kafka.common.{TopicAndPartition, KafkaException}
import kafka.server.{RecoveringFromUncleanShutdown, BrokerState, OffsetCheckpoint}
+import java.util.concurrent.{Executors, ExecutorService, ExecutionException, Future}
/**
* The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
@@ -39,6 +40,7 @@ class LogManager(val logDirs: Array[File],
val topicConfigs: Map[String, LogConfig],
val defaultConfig: LogConfig,
val cleanerConfig: CleanerConfig,
+ ioThreads: Int,
val flushCheckMs: Long,
val flushCheckpointMs: Long,
val retentionCheckMs: Long,
@@ -54,7 +56,7 @@ class LogManager(val logDirs: Array[File],
createAndValidateLogDirs(logDirs)
private val dirLocks = lockLogDirs(logDirs)
private val recoveryPointCheckpoints = logDirs.map(dir => (dir, new OffsetCheckpoint(new File(dir, RecoveryPointCheckpointFile)))).toMap
- loadLogs(logDirs)
+ loadLogs()
private val cleaner: LogCleaner =
if(cleanerConfig.enableCleaner)
@@ -101,36 +103,71 @@ class LogManager(val logDirs: Array[File],
/**
* Recover and load all logs in the given data directories
*/
- private def loadLogs(dirs: Seq[File]) {
- for(dir <- dirs) {
+ private def loadLogs(): Unit = {
+ info("Loading logs.")
+
+ val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
+ val jobs = mutable.Map.empty[File, Seq[Future[_]]]
+
+ for (dir <- this.logDirs) {
+ val pool = Executors.newFixedThreadPool(ioThreads)
+ threadPools.append(pool)
+
+ val cleanShutdownFile = new File(dir, Log.CleanShutdownFile)
+
+ if (cleanShutdownFile.exists) {
+ debug(
+ "Found clean shutdown file. " +
+ "Skipping recovery for all logs in data directory: " +
+ dir.getAbsolutePath)
+ } else {
+ // log recovery itself is being performed by `Log` class during initialization
+ brokerState.newState(RecoveringFromUncleanShutdown)
+ }
+
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
- /* load the logs */
- val subDirs = dir.listFiles()
- if(subDirs != null) {
- val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
- if(cleanShutDownFile.exists())
- info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
- else
- brokerState.newState(RecoveringFromUncleanShutdown)
-
- for(dir <- subDirs) {
- if(dir.isDirectory) {
- info("Loading log '" + dir.getName + "'")
- val topicPartition = Log.parseTopicPartitionName(dir.getName)
- val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
- val log = new Log(dir,
- config,
- recoveryPoints.getOrElse(topicPartition, 0L),
- scheduler,
- time)
- val previous = this.logs.put(topicPartition, log)
- if(previous != null)
- throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+
+ val jobsForDir = for {
+ dirContent <- Option(dir.listFiles).toList
+ logDir <- dirContent if logDir.isDirectory
+ } yield {
+ Utils.runnable {
+ debug("Loading log '" + logDir.getName + "'")
+
+ val topicPartition = Log.parseTopicPartitionName(logDir.getName)
+ val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
+ val logRecoveryPoint = recoveryPoints.getOrElse(topicPartition, 0L)
+
+ val current = new Log(logDir, config, logRecoveryPoint, scheduler, time)
+ val previous = this.logs.put(topicPartition, current)
+
+ if (previous != null) {
+ throw new IllegalArgumentException(
+ "Duplicate log directories found: %s, %s!".format(
+ current.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
- cleanShutDownFile.delete()
}
+
+ jobs(cleanShutdownFile) = jobsForDir.map(pool.submit).toSeq
+ }
+
+
+ try {
+ for ((cleanShutdownFile, dirJobs) <- jobs) {
+ dirJobs.foreach(_.get)
+ cleanShutdownFile.delete()
+ }
+ } catch {
+ case e: ExecutionException => {
+ error("There was an error in one of the threads during logs loading: " + e.getCause)
+ throw e.getCause
+ }
+ } finally {
+ threadPools.foreach(_.shutdown())
}
+
+ info("Logs loading complete.")
}
/**
@@ -160,31 +197,69 @@ class LogManager(val logDirs: Array[File],
if(cleanerConfig.enableCleaner)
cleaner.startup()
}
-
+
/**
* Close all the logs
*/
def shutdown() {
info("Shutting down.")
+
+ val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
+ val jobs = mutable.Map.empty[File, Seq[Future[_]]]
+
+ // stop the cleaner first
+ if (cleaner != null) {
+ Utils.swallow(cleaner.shutdown())
+ }
+
+ // close logs in each dir
+ for (dir <- this.logDirs) {
+ debug("Flushing and closing logs at " + dir)
+
+ val pool = Executors.newFixedThreadPool(ioThreads)
+ threadPools.append(pool)
+
+ val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
+
+ val jobsForDir = logsInDir map { log =>
+ Utils.runnable {
+ // flush the log to ensure latest possible recovery point
+ log.flush()
+ log.close()
+ }
+ }
+
+ jobs(dir) = jobsForDir.map(pool.submit).toSeq
+ }
+
+
try {
- // stop the cleaner first
- if(cleaner != null)
- Utils.swallow(cleaner.shutdown())
- // flush the logs to ensure latest possible recovery point
- allLogs.foreach(_.flush())
- // close the logs
- allLogs.foreach(_.close())
- // update the last flush point
- checkpointRecoveryPointOffsets()
- // mark that the shutdown was clean by creating the clean shutdown marker file
- logDirs.foreach(dir => Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile()))
+ for ((dir, dirJobs) <- jobs) {
+ dirJobs.foreach(_.get)
+
+ // update the last flush point
+ debug("Updating recovery points at " + dir)
+ checkpointLogsInDir(dir)
+
+ // mark that the shutdown was clean by creating marker file
+ debug("Writing clean shutdown marker at " + dir)
+ Utils.swallow(new File(dir, Log.CleanShutdownFile).createNewFile())
+ }
+ } catch {
+ case e: ExecutionException => {
+ error("There was an error in one of the threads during LogManager shutdown: " + e.getCause)
+ throw e.getCause
+ }
} finally {
+ threadPools.foreach(_.shutdown())
// regardless of whether the close succeeded, we need to unlock the data directories
dirLocks.foreach(_.destroy())
}
+
info("Shutdown complete.")
}
+
/**
* Truncate the partition logs to the specified offsets and checkpoint the recovery point to this offset
*
@@ -230,14 +305,19 @@ class LogManager(val logDirs: Array[File],
* to avoid recovering the whole log on startup.
*/
def checkpointRecoveryPointOffsets() {
- val recoveryPointsByDir = this.logsByTopicPartition.groupBy(_._2.dir.getParent.toString)
- for(dir <- logDirs) {
- val recoveryPoints = recoveryPointsByDir.get(dir.toString)
- if(recoveryPoints.isDefined)
- this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
+ this.logDirs.foreach(checkpointLogsInDir)
+ }
+
+ /**
+ * Make a checkpoint for all logs in provided directory.
+ */
+ private def checkpointLogsInDir(dir: File): Unit = {
+ val recoveryPoints = this.logsByDir.get(dir.toString)
+ if (recoveryPoints.isDefined) {
+ this.recoveryPointCheckpoints(dir).write(recoveryPoints.get.mapValues(_.recoveryPoint))
}
}
-
+
/**
* Get the log if it exists, otherwise return None
*/
@@ -366,13 +446,22 @@ class LogManager(val logDirs: Array[File],
* Get all the partition logs
*/
def allLogs(): Iterable[Log] = logs.values
-
+
/**
* Get a map of TopicAndPartition => Log
*/
def logsByTopicPartition = logs.toMap
/**
+ * Map of log dir to logs by topic and partitions in that dir
+ */
+ private def logsByDir = {
+ this.logsByTopicPartition.groupBy {
+ case (_, log) => log.dir.getParent
+ }
+ }
+
+ /**
* Flush any log which has exceeded its flush interval and has unwritten messages.
*/
private def flushDirtyLogs() = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/server/KafkaConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 50b09ed..1a45f87 100644
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -190,6 +190,9 @@ class KafkaConfig private (val props: VerifiableProperties) extends ZKConfig(pro
/* the frequency with which we update the persistent record of the last flush which acts as the log recovery point */
val logFlushOffsetCheckpointIntervalMs = props.getIntInRange("log.flush.offset.checkpoint.interval.ms", 60000, (0, Int.MaxValue))
+ /* the number of threads per data directory to be used for log recovery at startup and flushing at shutdown */
+ val numRecoveryThreadsPerDataDir = props.getIntInRange("num.recovery.threads.per.data.dir", 1, (1, Int.MaxValue))
+
/* enable auto creation of topic on the server */
val autoCreateTopicsEnable = props.getBoolean("auto.create.topics.enable", true)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/server/KafkaServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index def1dc2..2871118 100644
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -303,6 +303,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logg
topicConfigs = configs,
defaultConfig = defaultLogConfig,
cleanerConfig = cleanerConfig,
+ ioThreads = config.numRecoveryThreadsPerDataDir,
flushCheckMs = config.logFlushSchedulerIntervalMs,
flushCheckpointMs = config.logFlushOffsetCheckpointIntervalMs,
retentionCheckMs = config.logCleanupIntervalMs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/utils/KafkaScheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index 8e37505..9a16343 100644
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -93,16 +93,14 @@ class KafkaScheduler(val threads: Int,
debug("Scheduling task %s with initial delay %d ms and period %d ms."
.format(name, TimeUnit.MILLISECONDS.convert(delay, unit), TimeUnit.MILLISECONDS.convert(period, unit)))
ensureStarted
- val runnable = new Runnable {
- def run() = {
- try {
- trace("Begining execution of scheduled task '%s'.".format(name))
- fun()
- } catch {
- case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
- } finally {
- trace("Completed execution of scheduled task '%s'.".format(name))
- }
+ val runnable = Utils.runnable {
+ try {
+ trace("Begining execution of scheduled task '%s'.".format(name))
+ fun()
+ } catch {
+ case t: Throwable => error("Uncaught exception in scheduled task '" + name +"'", t)
+ } finally {
+ trace("Completed execution of scheduled task '%s'.".format(name))
}
}
if(period >= 0)
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/main/scala/kafka/utils/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/utils/Utils.scala b/core/src/main/scala/kafka/utils/Utils.scala
index 09bfbce..da52b42 100644
--- a/core/src/main/scala/kafka/utils/Utils.scala
+++ b/core/src/main/scala/kafka/utils/Utils.scala
@@ -49,9 +49,9 @@ object Utils extends Logging {
* @param fun A function
* @return A Runnable that just executes the function
*/
- def runnable(fun: () => Unit): Runnable =
- new Runnable() {
- def run() = fun()
+ def runnable(fun: => Unit): Runnable =
+ new Runnable {
+ def run() = fun
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index d03d4c4..7d4c70c 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -35,21 +35,11 @@ class LogManagerTest extends JUnit3Suite {
var logManager: LogManager = null
val name = "kafka"
val veryLargeLogFlushInterval = 10000000L
- val cleanerConfig = CleanerConfig(enableCleaner = false)
override def setUp() {
super.setUp()
logDir = TestUtils.tempDir()
- logManager = new LogManager(logDirs = Array(logDir),
- topicConfigs = Map(),
- defaultConfig = logConfig,
- cleanerConfig = cleanerConfig,
- flushCheckMs = 1000L,
- flushCheckpointMs = 100000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- time = time,
- brokerState = new BrokerState())
+ logManager = createLogManager()
logManager.startup
logDir = logManager.logDirs(0)
}
@@ -125,18 +115,7 @@ class LogManagerTest extends JUnit3Suite {
logManager.shutdown()
val config = logConfig.copy(segmentSize = 10 * (setSize - 1), retentionSize = 5L * 10L * setSize + 10L)
- logManager = new LogManager(
- logDirs = Array(logDir),
- topicConfigs = Map(),
- defaultConfig = config,
- cleanerConfig = cleanerConfig,
- flushCheckMs = 1000L,
- flushCheckpointMs = 100000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- brokerState = new BrokerState(),
- time = time
- )
+ logManager = createLogManager()
logManager.startup
// create a log
@@ -176,18 +155,7 @@ class LogManagerTest extends JUnit3Suite {
def testTimeBasedFlush() {
logManager.shutdown()
val config = logConfig.copy(flushMs = 1000)
- logManager = new LogManager(
- logDirs = Array(logDir),
- topicConfigs = Map(),
- defaultConfig = config,
- cleanerConfig = cleanerConfig,
- flushCheckMs = 1000L,
- flushCheckpointMs = 10000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- brokerState = new BrokerState(),
- time = time
- )
+ logManager = createLogManager()
logManager.startup
val log = logManager.createLog(TopicAndPartition(name, 0), config)
val lastFlush = log.lastFlushTime
@@ -209,19 +177,8 @@ class LogManagerTest extends JUnit3Suite {
TestUtils.tempDir(),
TestUtils.tempDir())
logManager.shutdown()
- logManager = new LogManager(
- logDirs = dirs,
- topicConfigs = Map(),
- defaultConfig = logConfig,
- cleanerConfig = cleanerConfig,
- flushCheckMs = 1000L,
- flushCheckpointMs = 10000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- brokerState = new BrokerState(),
- time = time
- )
-
+ logManager = createLogManager()
+
// verify that logs are always assigned to the least loaded partition
for(partition <- 0 until 20) {
logManager.createLog(TopicAndPartition("test", partition), logConfig)
@@ -237,18 +194,7 @@ class LogManagerTest extends JUnit3Suite {
@Test
def testTwoLogManagersUsingSameDirFails() {
try {
- new LogManager(
- logDirs = Array(logDir),
- topicConfigs = Map(),
- defaultConfig = logConfig,
- cleanerConfig = cleanerConfig,
- flushCheckMs = 1000L,
- flushCheckpointMs = 10000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- brokerState = new BrokerState(),
- time = time
- )
+ createLogManager()
fail("Should not be able to create a second log manager instance with the same data directory")
} catch {
case e: KafkaException => // this is good
@@ -270,16 +216,8 @@ class LogManagerTest extends JUnit3Suite {
def testRecoveryDirectoryMappingWithTrailingSlash() {
logManager.shutdown()
logDir = TestUtils.tempDir()
- logManager = new LogManager(logDirs = Array(new File(logDir.getAbsolutePath + File.separator)),
- topicConfigs = Map(),
- defaultConfig = logConfig,
- cleanerConfig = cleanerConfig,
- flushCheckMs = 1000L,
- flushCheckpointMs = 100000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- time = time,
- brokerState = new BrokerState())
+ logManager = TestUtils.createLogManager(
+ logDirs = Array(new File(logDir.getAbsolutePath + File.separator)))
logManager.startup
verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
}
@@ -293,16 +231,7 @@ class LogManagerTest extends JUnit3Suite {
logDir = new File("data" + File.separator + logDir.getName)
logDir.mkdirs()
logDir.deleteOnExit()
- logManager = new LogManager(logDirs = Array(logDir),
- topicConfigs = Map(),
- defaultConfig = logConfig,
- cleanerConfig = cleanerConfig,
- flushCheckMs = 1000L,
- flushCheckpointMs = 100000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- time = time,
- brokerState = new BrokerState())
+ logManager = createLogManager()
logManager.startup
verifyCheckpointRecovery(Seq(TopicAndPartition("test-a", 1)), logManager)
}
@@ -327,4 +256,12 @@ class LogManagerTest extends JUnit3Suite {
}
}
}
+
+
+ private def createLogManager(logDirs: Array[File] = Array(this.logDir)): LogManager = {
+ TestUtils.createLogManager(
+ defaultConfig = logConfig,
+ logDirs = logDirs,
+ time = this.time)
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 558a5d6..e532c28 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -32,16 +32,11 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_))
val topic = "foo"
- val logManagers = configs.map(config => new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
- topicConfigs = Map(),
- defaultConfig = LogConfig(),
- cleanerConfig = CleanerConfig(),
- flushCheckMs = 30000,
- flushCheckpointMs = 10000L,
- retentionCheckMs = 30000,
- scheduler = new KafkaScheduler(1),
- brokerState = new BrokerState(),
- time = new MockTime))
+ val logManagers = configs map { config =>
+ TestUtils.createLogManager(
+ logDirs = config.logDirs.map(new File(_)).toArray,
+ cleanerConfig = CleanerConfig())
+ }
@After
def teardown() {
@@ -147,4 +142,4 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
replicaManager.highWatermarkCheckpoints(new File(replicaManager.config.logDirs(0)).getAbsolutePath).read.getOrElse(TopicAndPartition(topic, partition), 0L)
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 518d416..9abf219 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -18,7 +18,6 @@
package kafka.server
import kafka.utils.{MockScheduler, MockTime, TestUtils}
-import kafka.log.{CleanerConfig, LogManager, LogConfig}
import java.util.concurrent.atomic.AtomicBoolean
import java.io.File
@@ -37,7 +36,7 @@ class ReplicaManagerTest extends JUnit3Suite {
val props = TestUtils.createBrokerConfig(1)
val config = new KafkaConfig(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
- val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
val partition = rm.getOrCreatePartition(topic, 1, 1)
@@ -51,26 +50,11 @@ class ReplicaManagerTest extends JUnit3Suite {
props.put("log.dir", TestUtils.tempRelativeDir("data").getAbsolutePath)
val config = new KafkaConfig(props)
val zkClient = EasyMock.createMock(classOf[ZkClient])
- val mockLogMgr = createLogManager(config.logDirs.map(new File(_)).toArray)
+ val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
val time: MockTime = new MockTime()
val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
val partition = rm.getOrCreatePartition(topic, 1, 1)
partition.getOrCreateReplica(1)
rm.checkpointHighWatermarks()
}
-
- private def createLogManager(logDirs: Array[File]): LogManager = {
- val time = new MockTime()
- return new LogManager(logDirs,
- topicConfigs = Map(),
- defaultConfig = new LogConfig(),
- cleanerConfig = CleanerConfig(enableCleaner = false),
- flushCheckMs = 1000L,
- flushCheckpointMs = 100000L,
- retentionCheckMs = 1000L,
- scheduler = time.scheduler,
- brokerState = new BrokerState(),
- time = time)
- }
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/f489493c/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 4d01d25..c4e13c5 100644
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -39,6 +39,7 @@ import kafka.serializer.{StringEncoder, DefaultEncoder, Encoder}
import kafka.common.TopicAndPartition
import kafka.admin.AdminUtils
import kafka.producer.ProducerConfig
+import kafka.log._
import junit.framework.AssertionFailedError
import junit.framework.Assert._
@@ -689,6 +690,30 @@ object TestUtils extends Logging {
def checkIfReassignPartitionPathExists(zkClient: ZkClient): Boolean = {
ZkUtils.pathExists(zkClient, ZkUtils.ReassignPartitionsPath)
}
+
+
+ /**
+ * Create new LogManager instance with default configuration for testing
+ */
+ def createLogManager(
+ logDirs: Array[File] = Array.empty[File],
+ defaultConfig: LogConfig = LogConfig(),
+ cleanerConfig: CleanerConfig = CleanerConfig(enableCleaner = false),
+ time: MockTime = new MockTime()) =
+ {
+ new LogManager(
+ logDirs = logDirs,
+ topicConfigs = Map(),
+ defaultConfig = defaultConfig,
+ cleanerConfig = cleanerConfig,
+ ioThreads = 4,
+ flushCheckMs = 1000L,
+ flushCheckpointMs = 10000L,
+ retentionCheckMs = 1000L,
+ scheduler = time.scheduler,
+ time = time,
+ brokerState = new BrokerState())
+ }
}
object TestZKUtils {