You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@kafka.apache.org by "Alexey Ozeritskiy (JIRA)" <ji...@apache.org> on 2014/04/25 15:18:15 UTC

[jira] [Commented] (KAFKA-1414) Speedup broker startup after hard reset

    [ https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13980971#comment-13980971 ] 

Alexey Ozeritskiy commented on KAFKA-1414:
------------------------------------------

What to do with exceptions? Maybe we should use ParArray here?

{code}
  private def loadLogs(dirs: Seq[File]) {
    dirs.toParArray.foreach(dir => loadDir(dir))
  }

  private def loadDir(dir: File) {
    val recoveryPoints = this.recoveryPointCheckpoints(dir).read
    /* load the logs */
    val subDirs = dir.listFiles()
    if(subDirs != null) {
      val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
      if(cleanShutDownFile.exists())
        info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
      for(dir <- subDirs) {
        if(dir.isDirectory) {
          info("Loading log '" + dir.getName + "'")
          val topicPartition = Log.parseTopicPartitionName(dir.getName)
          val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
          val log = new Log(dir,
            config,
            recoveryPoints.getOrElse(topicPartition, 0L),
            scheduler,
            time)
          val previous = addLogWithLock(topicPartition, log)
          if(previous != null)
            throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
        }
      }
      cleanShutDownFile.delete()
    }
  }

  private def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
    logCreationOrDeletionLock synchronized {
      this.logs.put(topicPartition, log)
    }
  }
{code}

> Speedup broker startup after hard reset
> ---------------------------------------
>
>                 Key: KAFKA-1414
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1414
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>    Affects Versions: 0.8.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Jay Kreps
>
> After hard reset due to power failure broker takes way too much time recovering unflushed segments in a single thread. This could be easiliy improved launching multiple threads (one per data dirrectory, assuming that typically each data directory is on a dedicated drive). Localy we trie this simple patch to LogManager.loadLogs and it seems to work, however I'm too new to scala, so do not take it literally:
> {code}
>   /**
>    * Recover and load all logs in the given data directories
>    */
>   private def loadLogs(dirs: Seq[File]) {
>     val threads : Array[Thread] = new Array[Thread](dirs.size)
>     var i: Int = 0
>     val me = this
>     for(dir <- dirs) {
>       val thread = new Thread( new Runnable {
>         def run()
>         {
>           val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>           /* load the logs */
>           val subDirs = dir.listFiles()
>           if(subDirs != null) {
>             val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>             if(cleanShutDownFile.exists())
>               info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
>             for(dir <- subDirs) {
>               if(dir.isDirectory) {
>                 info("Loading log '" + dir.getName + "'")
>                 val topicPartition = Log.parseTopicPartitionName(dir.getName)
>                 val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
>                 val log = new Log(dir,
>                   config,
>                   recoveryPoints.getOrElse(topicPartition, 0L),
>                   scheduler,
>                   time)
>                 val previous = addLogWithLock(topicPartition, log)
>                 if(previous != null)
>                   throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
>               }
>             }
>             cleanShutDownFile.delete()
>           }
>         }
>       })
>       thread.start()
>       threads(i) = thread
>       i = i + 1
>     }
>     for(thread <- threads) {
>       thread.join()
>     }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
>     logCreationOrDeletionLock synchronized {
>       this.logs.put(topicPartition, log)
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)