You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Dmitry Bugaychenko (JIRA)" <ji...@apache.org> on 2014/04/23 14:38:15 UTC

[jira] [Created] (KAFKA-1414) Speedup broker startup after hard reset

Dmitry Bugaychenko created KAFKA-1414:
-----------------------------------------

             Summary: Speedup broker startup after hard reset
                 Key: KAFKA-1414
                 URL: https://issues.apache.org/jira/browse/KAFKA-1414
             Project: Kafka
          Issue Type: Improvement
          Components: log
    Affects Versions: 0.8.1
            Reporter: Dmitry Bugaychenko
            Assignee: Jay Kreps


After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally:

{code}
  /**
   * Recover and load all logs in the given data directories
   */
  private def loadLogs(dirs: Seq[File]) {
    val threads : Array[Thread] = new Array[Thread](dirs.size)
    var i: Int = 0
    val me = this

    for(dir <- dirs) {
      val thread = new Thread( new Runnable {
        def run()
        {
          val recoveryPoints = me.recoveryPointCheckpoints(dir).read
          /* load the logs */
          val subDirs = dir.listFiles()
          if(subDirs != null) {
            val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
            if(cleanShutDownFile.exists())
              info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
            for(dir <- subDirs) {
              if(dir.isDirectory) {
                info("Loading log '" + dir.getName + "'")
                val topicPartition = Log.parseTopicPartitionName(dir.getName)
                val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
                val log = new Log(dir,
                  config,
                  recoveryPoints.getOrElse(topicPartition, 0L),
                  scheduler,
                  time)
                val previous = addLogWithLock(topicPartition, log)
                if(previous != null)
                  throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
              }
            }
            cleanShutDownFile.delete()
          }
        }
      })

      thread.start()
      threads(i) = thread
      i = i + 1
    }

    for(thread <- threads) {
      thread.join()
    }
  }

  def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
    logCreationOrDeletionLock synchronized {
      this.logs.put(topicPartition, log)
    }
  }
{code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)