You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Alexey Ozeritskiy (JIRA)" <ji...@apache.org> on 2014/04/25 15:18:15 UTC
[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard
reset
[ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13980971#comment-13980971 ]
Alexey Ozeritskiy commented on KAFKA-1414:
------------------------------------------
What to do with exceptions? Maybe we should use ParArray here?
{code}
private def loadLogs(dirs: Seq[File]) {
dirs.toParArray.foreach(dir => loadDir(dir))
}
private def loadDir(dir: File) {
val recoveryPoints = this.recoveryPointCheckpoints(dir).read
/* load the logs */
val subDirs = dir.listFiles()
if(subDirs != null) {
val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
if(cleanShutDownFile.exists())
info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
for(dir <- subDirs) {
if(dir.isDirectory) {
info("Loading log '" + dir.getName + "'")
val topicPartition = Log.parseTopicPartitionName(dir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
val log = new Log(dir,
config,
recoveryPoints.getOrElse(topicPartition, 0L),
scheduler,
time)
val previous = addLogWithLock(topicPartition, log)
if(previous != null)
throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
cleanShutDownFile.delete()
}
}
private def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
logCreationOrDeletionLock synchronized {
this.logs.put(topicPartition, log)
}
}
{code}
> Speedup broker startup after hard reset
> ---------------------------------------
>
> Key: KAFKA-1414
> URL: https://issues.apache.org/jira/browse/KAFKA-1414
> Project: Kafka
> Issue Type: Improvement
> Components: log
> Affects Versions: 0.8.1
> Reporter: Dmitry Bugaychenko
> Assignee: Jay Kreps
>
> After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally:
> {code}
> /**
> * Recover and load all logs in the given data directories
> */
> private def loadLogs(dirs: Seq[File]) {
> val threads : Array[Thread] = new Array[Thread](dirs.size)
> var i: Int = 0
> val me = this
> for(dir <- dirs) {
> val thread = new Thread( new Runnable {
> def run()
> {
> val recoveryPoints = me.recoveryPointCheckpoints(dir).read
> /* load the logs */
> val subDirs = dir.listFiles()
> if(subDirs != null) {
> val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
> if(cleanShutDownFile.exists())
> info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
> for(dir <- subDirs) {
> if(dir.isDirectory) {
> info("Loading log '" + dir.getName + "'")
> val topicPartition = Log.parseTopicPartitionName(dir.getName)
> val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
> val log = new Log(dir,
> config,
> recoveryPoints.getOrElse(topicPartition, 0L),
> scheduler,
> time)
> val previous = addLogWithLock(topicPartition, log)
> if(previous != null)
> throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
> }
> }
> cleanShutDownFile.delete()
> }
> }
> })
> thread.start()
> threads(i) = thread
> i = i + 1
> }
> for(thread <- threads) {
> thread.join()
> }
> }
> def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
> logCreationOrDeletionLock synchronized {
> this.logs.put(topicPartition, log)
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)