You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jk...@apache.org on 2012/11/02 20:01:39 UTC

svn commit: r1405102 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/cluster/ main/scala/kafka/log/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/integration/ test/scala/unit/kafka/log/ test/scala/unit/kafka/produ...

Author: jkreps
Date: Fri Nov  2 19:01:38 2012
New Revision: 1405102

URL: http://svn.apache.org/viewvc?rev=1405102&view=rev
Log:
KAFKA-188 Support multiple data directories. 
Patch reviewed by Neha and Jun.


Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Fri Nov  2 19:01:38 2012
@@ -37,7 +37,6 @@ class Partition(val topic: String,
   private val localBrokerId = replicaManager.config.brokerId
   private val logManager = replicaManager.logManager
   private val replicaFetcherManager = replicaManager.replicaFetcherManager
-  private val highwaterMarkCheckpoint = replicaManager.highWatermarkCheckpoint
   private val zkClient = replicaManager.zkClient
   var leaderReplicaIdOpt: Option[Int] = None
   var inSyncReplicas: Set[Replica] = Set.empty[Replica]
@@ -69,8 +68,8 @@ class Partition(val topic: String,
       case None =>
         if (isReplicaLocal(replicaId)) {
           val log = logManager.getOrCreateLog(topic, partitionId)
-          val localReplica = new Replica(replicaId, this, time,
-            highwaterMarkCheckpoint.read(topic, partitionId).min(log.logEndOffset), Some(log))
+          val offset = replicaManager.highWatermarkCheckpoints(log.dir.getParent).read(topic, partitionId).min(log.logEndOffset)
+          val localReplica = new Replica(replicaId, this, time, offset, Some(log))
           addReplicaIfNotExists(localReplica)
         }
         else {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Fri Nov  2 19:01:38 2012
@@ -26,61 +26,107 @@ import kafka.server.{HighwaterMarkCheckp
 
 
 /**
- * The guy who creates and hands out logs
+ * The entry point to the kafka log management subsystem. The log manager is responsible for log creation, retrieval, and cleaning.
+ * All read and write operations are delegated to the individual log instances.
+ * 
+ * The log manager maintains logs in one or more directories. New logs are created in the data directory
+ * with the fewest logs. No attempt is made to move partitions after the fact or balance based on
+ * size or I/O rate.
+ * 
+ * A background thread handles log retention by periodically truncating excess log segments.
  */
 @threadsafe
 private[kafka] class LogManager(val config: KafkaConfig,
                                 scheduler: KafkaScheduler,
-                                private val time: Time,
-                                val logRollDefaultIntervalMs: Long,
-                                val logCleanupIntervalMs: Long,
-                                val logCleanupDefaultAgeMs: Long,
-                                needRecovery: Boolean) extends Logging {
+                                private val time: Time) extends Logging {
 
-  val logDir: File = new File(config.logDir)
+  val CleanShutdownFile = ".kafka_cleanshutdown"
+  val LockFile = ".lock"
+  val logDirs: Array[File] = config.logDirs.map(new File(_)).toArray
   private val logFileSizeMap = config.logFileSizeMap
-  private val flushInterval = config.flushInterval
-  private val logCreationLock = new Object
+  private val logFlushInterval = config.flushInterval
   private val logFlushIntervals = config.flushIntervalMap
+  private val logCreationLock = new Object
   private val logRetentionSizeMap = config.logRetentionSizeMap
   private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
   private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
-  this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
+  private val logRollDefaultIntervalMs = 1000L * 60 * 60 * config.logRollHours
+  private val logCleanupIntervalMs = 1000L * 60 * config.logCleanupIntervalMinutes
+  private val logCleanupDefaultAgeMs = 1000L * 60 * 60 * config.logRetentionHours
 
-  /* Initialize a log for each subdirectory of the main log directory */
-  private val logs = new Pool[String, Pool[Int, Log]]()
-  if(!logDir.exists()) {
-    info("No log directory found, creating '" + logDir.getAbsolutePath() + "'")
-    logDir.mkdirs()
-  }
-  if(!logDir.isDirectory() || !logDir.canRead())
-    throw new KafkaException(logDir.getAbsolutePath() + " is not a readable log directory.")
-  val subDirs = logDir.listFiles()
-  if(subDirs != null) {
-    for(dir <- subDirs) {
-      if(dir.getName.equals(HighwaterMarkCheckpoint.highWatermarkFileName)){
-        // skip valid metadata file
+  this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
+  private val logs = new Pool[TopicAndPartition, Log]()
+  
+  createAndValidateLogDirs(logDirs)
+  private var dirLocks = lockLogDirs(logDirs)
+  loadLogs(logDirs)
+  
+  /**
+   * 1. Ensure that there are no duplicates in the directory list
+   * 2. Create each directory if it doesn't exist
+   * 3. Check that each path is a readable directory 
+   */
+  private def createAndValidateLogDirs(dirs: Seq[File]) {
+    if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
+      throw new KafkaException("Duplicate log directory found: " + logDirs.mkString(", "))
+    for(dir <- dirs) {
+      if(!dir.exists) {
+        info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
+        val created = dir.mkdirs()
+        if(!created)
+          throw new KafkaException("Failed to create data directory " + dir.getAbsolutePath)
       }
-      else if(!dir.isDirectory()) {
-        warn("Skipping unexplainable file '" + dir.getAbsolutePath() + "'--should it be there?")
-      } else {
-        info("Loading log '" + dir.getName() + "'")
-        val topicPartition = parseTopicPartitionName(dir.getName)
-        val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
-        val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
-        val log = new Log(dir, 
-                          maxLogFileSize, 
-                          config.maxMessageSize, 
-                          flushInterval, 
-                          rollIntervalMs, 
-                          needRecovery, 
-                          config.logIndexMaxSizeBytes,
-                          config.logIndexIntervalBytes,
-                          time, 
-                          config.brokerId)
-        logs.putIfNotExists(topicPartition.topic, new Pool[Int, Log]())
-        val parts = logs.get(topicPartition.topic)
-        parts.put(topicPartition.partition, log)
+      if(!dir.isDirectory || !dir.canRead)
+        throw new KafkaException(dir.getAbsolutePath + " is not a readable log directory.")
+    }
+  }
+  
+  /**
+   * Lock all the given directories
+   */
+  private def lockLogDirs(dirs: Seq[File]): Seq[FileLock] = {
+    dirs.map { dir =>
+      val lock = new FileLock(new File(dir, LockFile))
+      if(!lock.tryLock())
+        throw new KafkaException("Failed to acquire lock on file .lock in " + lock.file.getParentFile.getAbsolutePath + 
+                               ". A Kafka instance in another process or thread is using this directory.")
+      lock
+    }
+  }
+  
+  /**
+   * Recovery and load all logs in the given data directories
+   */
+  private def loadLogs(dirs: Seq[File]) {
+    for(dir <- dirs) {
+      /* check if this set of logs was shut down cleanly */
+      val cleanShutDownFile = new File(dir, CleanShutdownFile)
+      val needsRecovery = cleanShutDownFile.exists
+      cleanShutDownFile.delete
+      /* load the logs */
+      val subDirs = dir.listFiles()
+      if(subDirs != null) {
+        for(dir <- subDirs) {
+          if(dir.isDirectory){
+            info("Loading log '" + dir.getName + "'")
+            val topicPartition = parseTopicPartitionName(dir.getName)
+            val rollIntervalMs = logRollMsMap.get(topicPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
+            val maxLogFileSize = logFileSizeMap.get(topicPartition.topic).getOrElse(config.logFileSize)
+            val log = new Log(dir, 
+                              maxLogFileSize, 
+                              config.maxMessageSize, 
+                              logFlushInterval, 
+                              rollIntervalMs, 
+                              needsRecovery, 
+                              config.logIndexMaxSizeBytes,
+                              config.logIndexIntervalBytes,
+                              time, 
+                              config.brokerId)
+            val previous = this.logs.put(topicPartition, log)
+            if(previous != null)
+              throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
+          }
+        }
       }
     }
   }
@@ -99,18 +145,81 @@ private[kafka] class LogManager(val conf
                                  config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
     }
   }
+  
+  /**
+   * Get the log if it exists
+   */
+  def getLog(topic: String, partition: Int): Option[Log] = {
+    val topicAndPartiton = TopicAndPartition(topic, partition)
+    val log = logs.get(topicAndPartiton)
+    if (log == null)
+      None
+    else
+      Some(log)
+  }
 
+  /**
+   * Create the log if it does not exist, if it exists just return it
+   */
+  def getOrCreateLog(topic: String, partition: Int): Log = {
+    val topicAndPartition = TopicAndPartition(topic, partition)
+    logs.get(topicAndPartition) match {
+      case null => createLogIfNotExists(topicAndPartition)
+      case log: Log => log
+    }
+  }
 
   /**
    * Create a log for the given topic and the given partition
+   * If the log already exists, just return a copy of the existing log
    */
-  private def createLog(topic: String, partition: Int): Log = {
+  private def createLogIfNotExists(topicAndPartition: TopicAndPartition): Log = {
     logCreationLock synchronized {
-      val d = new File(logDir, topic + "-" + partition)
-      d.mkdirs()
-      val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
-      val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
-      new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, time, config.brokerId)
+      var log = logs.get(topicAndPartition)
+      
+      // check if the log has already been created in another thread
+      if(log != null)
+        return log
+      
+      // if not, create it
+      val dataDir = nextLogDir()
+      val dir = new File(dataDir, topicAndPartition.topic + "-" + topicAndPartition.partition)
+      dir.mkdirs()
+      val rollIntervalMs = logRollMsMap.get(topicAndPartition.topic).getOrElse(this.logRollDefaultIntervalMs)
+      val maxLogFileSize = logFileSizeMap.get(topicAndPartition.topic).getOrElse(config.logFileSize)
+      log = new Log(dir, 
+                    maxLogFileSize, 
+                    config.maxMessageSize, 
+                    logFlushInterval, 
+                    rollIntervalMs, 
+                    needsRecovery = false, 
+                    config.logIndexMaxSizeBytes, 
+                    config.logIndexIntervalBytes, 
+                    time, 
+                    config.brokerId)
+      info("Created log for topic %s partition %d in %s.".format(topicAndPartition.topic, topicAndPartition.partition, dataDir.getAbsolutePath))
+      logs.put(topicAndPartition, log)
+      log
+    }
+  }
+  
+  /**
+   * Choose the next directory in which to create a log. Currently this is done
+   * by calculating the number of partitions in each directory and then choosing the
+   * data directory with the fewest partitions.
+   */
+  private def nextLogDir(): File = {
+    if(logDirs.size == 1) {
+      logDirs(0)
+    } else {
+      // count the number of logs in each parent directory (including 0 for empty directories
+      val logCounts = allLogs.groupBy(_.dir.getParent).mapValues(_.size)
+      val zeros = logDirs.map(dir => (dir.getPath, 0)).toMap
+      var dirCounts = (zeros ++ logCounts).toBuffer
+    
+      // choose the directory with the least logs in it
+      val leastLoaded = dirCounts.sortBy(_._2).head
+      new File(leastLoaded._1)
     }
   }
 
@@ -123,48 +232,8 @@ private[kafka] class LogManager(val conf
   }
 
   /**
-   * Get the log if it exists
+   * Runs through the log removing segments older than a certain age
    */
-  def getLog(topic: String, partition: Int): Option[Log] = {
-    val parts = logs.get(topic)
-    if (parts == null) None
-    else {
-      val log = parts.get(partition)
-      if(log == null) None
-      else Some(log)
-    }
-  }
-
-  /**
-   * Create the log if it does not exist, if it exists just return it
-   */
-  def getOrCreateLog(topic: String, partition: Int): Log = {
-    var hasNewTopic = false
-    var parts = logs.get(topic)
-    if (parts == null) {
-      val found = logs.putIfNotExists(topic, new Pool[Int, Log])
-      if (found == null)
-        hasNewTopic = true
-      parts = logs.get(topic)
-    }
-    var log = parts.get(partition)
-    if(log == null) {
-      // check if this broker hosts this partition
-      log = createLog(topic, partition)
-      val found = parts.putIfNotExists(partition, log)
-      if(found != null) {
-        // there was already somebody there
-        log.close()
-        log = found
-      }
-      else
-        info("Created log for '" + topic + "'-" + partition)
-    }
-
-    log
-  }
-
-  /* Runs through the log removing segments older than a certain age */
   private def cleanupExpiredSegments(log: Log): Int = {
     val startMs = time.milliseconds
     val topic = parseTopicPartitionName(log.name).topic
@@ -216,14 +285,22 @@ private[kafka] class LogManager(val conf
    */
   def shutdown() {
     debug("Shutting down.")
-    allLogs.foreach(_.close())
+    try {
+      // close the logs
+      allLogs.foreach(_.close())
+      // mark that the shutdown was clean by creating the clean shutdown marker file
+      logDirs.foreach(dir => Utils.swallow(new File(dir, CleanShutdownFile).createNewFile()))
+    } finally {
+      // regardless of whether the close succeeded, we need to unlock the data directories
+      dirLocks.foreach(_.destroy())
+    }
     debug("Shutdown complete.")
   }
 
   /**
    * Get all the partition logs
    */
-  def allLogs() = logs.values.flatMap(_.values)
+  def allLogs(): Iterable[Log] = logs.values
 
   /**
    * Flush any log which has exceeded its flush interval and has unwritten messages.
@@ -253,12 +330,11 @@ private[kafka] class LogManager(val conf
     }
   }
 
-
-  def topics(): Iterable[String] = logs.keys
-  
   private def parseTopicPartitionName(name: String): TopicAndPartition = {
     val index = name.lastIndexOf('-')
     TopicAndPartition(name.substring(0,index), name.substring(index+1).toInt)
   }
 
+  def topics(): Iterable[String] = logs.keys.map(_.topic)
+
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala Fri Nov  2 19:01:38 2012
@@ -17,6 +17,7 @@
 package kafka.server
 
 import kafka.utils.Logging
+import kafka.common._
 import java.util.concurrent.locks.ReentrantLock
 import java.io._
 
@@ -40,7 +41,7 @@ class HighwaterMarkCheckpoint(val path: 
   private val hwFileLock = new ReentrantLock()
   // recover from previous tmp file, if required
 
-  def write(highwaterMarksPerPartition: Map[(String, Int), Long]) {
+  def write(highwaterMarksPerPartition: Map[TopicAndPartition, Long]) {
     hwFileLock.lock()
     try {
       // write to temp file and then swap with the highwatermark file
@@ -56,9 +57,7 @@ class HighwaterMarkCheckpoint(val path: 
       hwFileWriter.newLine()
 
       highwaterMarksPerPartition.foreach { partitionAndHw =>
-        val topic = partitionAndHw._1._1
-        val partitionId = partitionAndHw._1._2
-        hwFileWriter.write("%s %s %s".format(topic, partitionId, partitionAndHw._2))
+        hwFileWriter.write("%s %s %s".format(partitionAndHw._1.topic, partitionAndHw._1.partition, partitionAndHw._2))
         hwFileWriter.newLine()
       }
       hwFileWriter.flush()
@@ -77,9 +76,10 @@ class HighwaterMarkCheckpoint(val path: 
     hwFileLock.lock()
     try {
       hwFile.length() match {
-        case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
-          "partition %d. Returning 0 as the highwatermark".format(partition))
-        0L
+        case 0 => 
+          warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+               "partition %d. Returning 0 as the highwatermark".format(partition))
+          0L
         case _ =>
           val hwFileReader = new BufferedReader(new FileReader(hwFile))
           val version = hwFileReader.readLine().toShort
@@ -95,17 +95,18 @@ class HighwaterMarkCheckpoint(val path: 
                   // find the index of partition
                   val partitionIndex = nextHwEntry.indexOf(partitionId)
                   val topic = nextHwEntry.substring(0, partitionIndex-1)
-                  ((topic, partitionId.toInt) -> highwaterMark)
+                  (TopicAndPartition(topic, partitionId.toInt) -> highwaterMark)
                 }
               hwFileReader.close()
-              val hwOpt = partitionHighWatermarks.toMap.get((topic, partition))
+              val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))
               hwOpt match {
-                case Some(hw) => debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file"
-                                        .format(hw, topic, partition))
+                case Some(hw) => 
+                  debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file".format(hw, topic, partition))
                   hw
-                case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
-                  "partition %d. Returning 0 as the highwatermark".format(partition))
-                0L
+                case None => 
+                  warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+                       "partition %d. Returning 0 as the highwatermark".format(partition))
+                  0L
               }
             case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
             System.exit(1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala Fri Nov  2 19:01:38 2012
@@ -21,7 +21,7 @@ import java.util.Properties
 import kafka.message.Message
 import kafka.consumer.ConsumerConfig
 import java.net.InetAddress
-import kafka.utils.{VerifiableProperties, ZKConfig}
+import kafka.utils.{VerifiableProperties, ZKConfig, Utils}
 
 /**
  * Configuration settings for the kafka server
@@ -74,7 +74,8 @@ class KafkaConfig private (val props: Ve
   val numPartitions = props.getIntInRange("num.partitions", 1, (1, Int.MaxValue))
   
   /* the directories in which the log data is kept */
-  val logDir = props.getString("log.dir")
+  val logDirs = Utils.parseCsvList(props.getString("log.directories", props.getString("log.dir", "")))
+  require(logDirs.size > 0)
   
   /* the maximum size of a single log file */
   val logFileSize = props.getIntInRange("log.file.size", 1*1024*1024*1024, (Message.MinHeaderSize, Int.MaxValue))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Fri Nov  2 19:01:38 2012
@@ -32,7 +32,6 @@ import kafka.controller.{ControllerStat,
  */
 class KafkaServer(val config: KafkaConfig, time: Time = SystemTime) extends Logging {
   this.logIdent = "[Kafka Server " + config.brokerId + "], "
-  val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
   var socketServer: SocketServer = null
@@ -53,12 +52,6 @@ class KafkaServer(val config: KafkaConfi
     info("starting")
     isShuttingDown = new AtomicBoolean(false)
     shutdownLatch = new CountDownLatch(1)
-    var needRecovery = true
-    val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
-    if (cleanShutDownFile.exists) {
-      needRecovery = false
-      cleanShutDownFile.delete
-    }
 
     /* start scheduler */
     kafkaScheduler.startup
@@ -66,11 +59,7 @@ class KafkaServer(val config: KafkaConfi
     /* start log manager */
     logManager = new LogManager(config,
                                 kafkaScheduler,
-                                time,
-                                1000L * 60 * 60 * config.logRollHours,
-                                1000L * 60 * config.logCleanupIntervalMinutes,
-                                1000L * 60 * 60 * config.logRetentionHours,
-                                needRecovery)
+                                time)
     logManager.startup()
 
     socketServer = new SocketServer(config.brokerId,
@@ -122,25 +111,22 @@ class KafkaServer(val config: KafkaConfi
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       if(requestHandlerPool != null)
-        requestHandlerPool.shutdown()
-      kafkaScheduler.shutdown()
+        Utils.swallow(requestHandlerPool.shutdown())
+      Utils.swallow(kafkaScheduler.shutdown())
       if(apis != null)
-        apis.close()
+        Utils.swallow(apis.close())
       if(kafkaZookeeper != null)
-        kafkaZookeeper.shutdown()
+        Utils.swallow(kafkaZookeeper.shutdown())
       if(replicaManager != null)
-        replicaManager.shutdown()
+        Utils.swallow(replicaManager.shutdown())
       if(socketServer != null)
-        socketServer.shutdown()
+        Utils.swallow(socketServer.shutdown())
       if(logManager != null)
-        logManager.shutdown()
+        Utils.swallow(logManager.shutdown())
 
       if(kafkaController != null)
-        kafkaController.shutdown()
+        Utils.swallow(kafkaController.shutdown())
 
-      val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
-      debug("creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
-      cleanShutDownFile.createNewFile
       shutdownLatch.countDown()
       info("shut down completed")
     }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Fri Nov  2 19:01:38 2012
@@ -25,7 +25,7 @@ import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
 import java.util.concurrent.TimeUnit
-import kafka.common.{ReplicaNotAvailableException, UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
+import kafka.common._
 import kafka.api.{PartitionStateInfo, LeaderAndIsrRequest}
 
 
@@ -44,8 +44,7 @@ class ReplicaManager(val config: KafkaCo
   val replicaFetcherManager = new ReplicaFetcherManager(config, this)
   this.logIdent = "Replica Manager on Broker " + config.brokerId + ": "
   private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
-  val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
-  info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
+  val highWatermarkCheckpoints = config.logDirs.map(dir => (dir, new HighwaterMarkCheckpoint(dir))).toMap
 
   newGauge(
     "LeaderCount",
@@ -245,22 +244,12 @@ class ReplicaManager(val config: KafkaCo
    * Flushes the highwatermark value for all partitions to the highwatermark file
    */
   def checkpointHighWatermarks() {
-    val highWaterarksForAllPartitions = allPartitions.map {
-      partition =>
-        val topic = partition._1._1
-        val partitionId = partition._1._2
-        val localReplicaOpt = partition._2.getReplica(config.brokerId)
-        val hw = localReplicaOpt match {
-          case Some(localReplica) => localReplica.highWatermark
-          case None =>
-            error("Highwatermark for topic %s partition %d doesn't exist during checkpointing"
-                  .format(topic, partitionId))
-             0L
-        }
-        (topic, partitionId) -> hw
-    }.toMap
-    highWatermarkCheckpoint.write(highWaterarksForAllPartitions)
-    trace("Checkpointed high watermark data: %s".format(highWaterarksForAllPartitions))
+    val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
+    val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParent)
+    for((dir, reps) <- replicasByDir) {
+      val hwms = reps.map(r => (TopicAndPartition(r.topic, r.partitionId) -> r.highWatermark)).toMap
+      highWatermarkCheckpoints(dir).write(hwms)
+    }
   }
 
   def shutdown() {

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala?rev=1405102&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/FileLock.scala Fri Nov  2 19:01:38 2012
@@ -0,0 +1,64 @@
+package kafka.utils
+
+import java.io._
+import java.nio.channels._
+
+/**
+ * A file lock a la flock/funlock
+ * 
+ * The given path will be created and opened if it doesn't exist.
+ */
+class FileLock(val file: File) extends Logging {
+    file.createNewFile()
+    private val channel = new RandomAccessFile(file, "rw").getChannel()
+    private var flock: java.nio.channels.FileLock = null
+    
+    /**
+     * Lock the file or throw an exception if the lock is already held
+     */
+    def lock() {
+      this synchronized {
+        trace("Acquiring lock on " + file.getAbsolutePath)
+        flock = channel.lock()
+      }
+    }
+    
+    /**
+     * Try to lock the file and return true if the locking succeeds
+     */
+    def tryLock(): Boolean = {
+      this synchronized {
+        trace("Acquiring lock on " + file.getAbsolutePath)
+        try {
+          // weirdly this method will return null if the lock is held by another
+          // process, but will throw an exception if the lock is held by this process
+          // so we have to handle both cases
+          flock = channel.tryLock()
+          flock != null
+        } catch {
+          case e: OverlappingFileLockException => false
+        }
+      }
+    }
+    
+    /**
+     * Unlock the lock if it is held
+     */
+    def unlock() {
+      this synchronized {
+        trace("Releasing lock on " + file.getAbsolutePath)
+        if(flock != null)
+          flock.release()
+      }
+    }
+    
+    /**
+     * Destroy this lock, closing the associated FileChannel
+     */
+    def destroy() = {
+      this synchronized {
+        unlock()
+        channel.close()
+      }
+    }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Fri Nov  2 19:01:38 2012
@@ -226,22 +226,28 @@ object Utils extends Logging {
   def rm(file: String): Unit = rm(new File(file))
   
   /**
+   * Recursively delete the list of files/directories and any subfiles (if any exist)
+   * @param a sequence of files to be deleted
+   */
+  def rm(files: Seq[String]): Unit = files.map(f => rm(new File(f)))
+  
+  /**
    * Recursively delete the given file/directory and any subfiles (if any exist)
    * @param file The root file at which to begin deleting
    */
-  def rm(file: File): Unit = {
-    if(file == null) {
-      return
-    } else if(file.isDirectory) {
-      val files = file.listFiles()
-      if(files != null) {
-        for(f <- files)
-          rm(f)
-      }
-      file.delete()
-    } else {
-      file.delete()
-    }
+  def rm(file: File) {
+	if(file == null) {
+	  return
+	} else if(file.isDirectory) {
+	  val files = file.listFiles()
+	  if(files != null) {
+	    for(f <- files)
+	      rm(f)
+	  }
+	  file.delete()
+	} else {
+	  file.delete()
+	}
   }
   
   /**

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/VerifiableProperties.scala Fri Nov  2 19:01:38 2012
@@ -22,6 +22,8 @@ import scala.collection._
 
 class VerifiableProperties(val props: Properties) extends Logging {
   private val referenceSet = mutable.HashSet[String]()
+  
+  def this() = this(new Properties)
 
   def containsKey(name: String): Boolean = {
     props.containsKey(name)
@@ -185,4 +187,6 @@ class VerifiableProperties(val props: Pr
         info("Property %s is overridden to %s".format(key, props.getProperty(key)))
     }
   }
+  
+  override def toString(): String = props.toString
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala Fri Nov  2 19:01:38 2012
@@ -40,7 +40,7 @@ trait KafkaServerTestHarness extends JUn
 
   override def tearDown() {
     servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
+    servers.map(server => server.config.logDirs.map(Utils.rm(_)))
     super.tearDown
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Fri Nov  2 19:01:38 2012
@@ -18,21 +18,23 @@
 package kafka.log
 
 import java.io._
+import java.nio.channels.OverlappingFileLockException
 import junit.framework.Assert._
 import org.junit.Test
 import kafka.common.OffsetOutOfRangeException
 import org.scalatest.junit.JUnit3Suite
 import kafka.server.KafkaConfig
+import kafka.common._
 import kafka.utils._
 
 class LogManagerTest extends JUnit3Suite {
 
   val time: MockTime = new MockTime()
   val maxRollInterval = 100
-  val maxLogAge = 1000
+  val maxLogAgeHours = 10
   var logDir: File = null
   var logManager: LogManager = null
-  var config:KafkaConfig = null
+  var config: KafkaConfig = null
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
   val scheduler = new KafkaScheduler(2)
@@ -41,12 +43,13 @@ class LogManagerTest extends JUnit3Suite
     super.setUp()
     config = new KafkaConfig(TestUtils.createBrokerConfig(0, -1)) {
                    override val logFileSize = 1024
-                   override val flushInterval = 100
+                   override val flushInterval = 10000
+                   override val logRetentionHours = maxLogAgeHours
                  }
     scheduler.startup
-    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
+    logManager = new LogManager(config, scheduler, time)
     logManager.startup
-    logDir = logManager.logDir
+    logDir = logManager.logDirs(0)
   }
 
   override def tearDown() {
@@ -54,13 +57,14 @@ class LogManagerTest extends JUnit3Suite
     if(logManager != null)
       logManager.shutdown()
     Utils.rm(logDir)
+    logManager.logDirs.map(Utils.rm(_))
     super.tearDown()
   }
   
   @Test
   def testCreateLog() {
     val log = logManager.getOrCreateLog(name, 0)
-    val logFile = new File(config.logDir, name + "-0")
+    val logFile = new File(config.logDirs(0), name + "-0")
     assertTrue(logFile.exists)
     log.append(TestUtils.singleMessageSet("test".getBytes()))
   }
@@ -68,7 +72,7 @@ class LogManagerTest extends JUnit3Suite
   @Test
   def testGetLog() {
     val log = logManager.getLog(name, 0)
-    val logFile = new File(config.logDir, name + "-0")
+    val logFile = new File(config.logDirs(0), name + "-0")
     assertTrue(!logFile.exists)
   }
 
@@ -87,12 +91,13 @@ class LogManagerTest extends JUnit3Suite
 
     // update the last modified time of all log segments
     val logSegments = log.segments.view
-    logSegments.foreach(s => s.messageSet.file.setLastModified(time.currentMs))
+    logSegments.foreach(_.messageSet.file.setLastModified(time.currentMs))
 
-    time.currentMs += maxLogAge + 3000
+    time.currentMs += maxLogAgeHours*60*60*1000 + 1
     logManager.cleanupLogs()
     assertEquals("Now there should only be only one segment.", 1, log.numberOfSegments)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
+
     try {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")
@@ -115,8 +120,9 @@ class LogManagerTest extends JUnit3Suite
       override val logRetentionSize = (5 * 10 * setSize + 10).asInstanceOf[Long]
       override val logRetentionHours = retentionHours
       override val flushInterval = 100
+      override val logRollHours = maxRollInterval
     }
-    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, retentionMs, false)
+    logManager = new LogManager(config, scheduler, time)
     logManager.startup
 
     // create a log
@@ -157,17 +163,47 @@ class LogManagerTest extends JUnit3Suite
                    override val logFileSize = 1024 *1024 *1024
                    override val flushSchedulerThreadRate = 50
                    override val flushInterval = Int.MaxValue
+                   override val logRollHours = maxRollInterval
                    override val flushIntervalMap = Map("timebasedflush" -> 100)
                  }
-    logManager = new LogManager(config, scheduler, time, maxRollInterval, veryLargeLogFlushInterval, maxLogAge, false)
+    logManager = new LogManager(config, scheduler, time)
     logManager.startup
     val log = logManager.getOrCreateLog(name, 0)
     for(i <- 0 until 200) {
       var set = TestUtils.singleMessageSet("test".getBytes())
       log.append(set)
     }
-    println("now = " + System.currentTimeMillis + " last flush = " + log.getLastFlushedTime)
-    assertTrue("The last flush time has to be within defaultflushInterval of current time ",
-                     (System.currentTimeMillis - log.getLastFlushedTime) < 150)
+    val ellapsed = System.currentTimeMillis - log.getLastFlushedTime
+    assertTrue("The last flush time has to be within defaultflushInterval of current time (was %d)".format(ellapsed),
+                     ellapsed < 2*config.flushSchedulerThreadRate)
+  }
+  
+  @Test
+  def testLeastLoadedAssignment() {
+    // create a log manager with multiple data directories
+    val props = TestUtils.createBrokerConfig(0, -1)
+    val dirs = Seq(TestUtils.tempDir().getAbsolutePath, 
+                   TestUtils.tempDir().getAbsolutePath, 
+                   TestUtils.tempDir().getAbsolutePath)
+    props.put("log.directories", dirs.mkString(","))
+    logManager.shutdown()
+    logManager = new LogManager(new KafkaConfig(props), scheduler, time)
+    
+    // verify that logs are always assigned to the least loaded partition
+    for(partition <- 0 until 20) {
+      logManager.getOrCreateLog("test", partition)
+      assertEquals("We should have created the right number of logs", partition + 1, logManager.allLogs.size)
+      val counts = logManager.allLogs.groupBy(_.dir.getParent).values.map(_.size)
+      assertTrue("Load should balance evenly", counts.max <= counts.min + 1)
+    }
+  }
+  
+  def testTwoLogManagersUsingSameDirFails() {
+    try {
+      new LogManager(logManager.config, scheduler, time)
+      fail("Should not be able to create a second log manager instance with the same data directory")
+    } catch {
+      case e: KafkaException => // this is good 
+    }
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Fri Nov  2 19:01:38 2012
@@ -79,8 +79,8 @@ class ProducerTest extends JUnit3Suite w
     server1.awaitShutdown()
     server2.shutdown
     server2.awaitShutdown()
-    Utils.rm(server1.config.logDir)
-    Utils.rm(server2.config.logDir)
+    Utils.rm(server1.config.logDirs)
+    Utils.rm(server2.config.logDirs)
     super.tearDown()
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Fri Nov  2 19:01:38 2012
@@ -16,14 +16,15 @@
 */
 package kafka.server
 
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
 import org.easymock.EasyMock
+import org.junit._
 import org.junit.Assert._
 import kafka.common.KafkaException
 import kafka.cluster.Replica
-import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime}
+import kafka.utils.{SystemTime, KafkaScheduler, TestUtils, MockTime, Utils}
 
 class HighwatermarkPersistenceTest extends JUnit3Suite {
 
@@ -31,30 +32,37 @@ class HighwatermarkPersistenceTest exten
     override val defaultFlushIntervalMs = 100
   })
   val topic = "foo"
+  val logManagers = configs.map(config => new LogManager(config, new KafkaScheduler(1), new MockTime))
+    
+  @After
+  def teardown() {
+    for(manager <- logManagers; dir <- manager.logDirs)
+      Utils.rm(dir)
+  }
 
   def testHighWatermarkPersistenceSinglePartition() {
     // mock zkclient
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     EasyMock.replay(zkClient)
+    
     // create kafka scheduler
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
-    var fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+    var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
     assertEquals(0L, fooPartition0Hw)
     val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
-    // create leader log
-    val log0 = getMockLog
     // create leader and follower replicas
+    val log0 = logManagers(0).getOrCreateLog(topic, 0)
     val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
     partition0.addReplicaIfNotExists(leaderReplicaPartition0)
     val followerReplicaPartition0 = new Replica(configs.last.brokerId, partition0, SystemTime)
     partition0.addReplicaIfNotExists(followerReplicaPartition0)
     replicaManager.checkpointHighWatermarks()
-    fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+    fooPartition0Hw = hwmFor(replicaManager, topic, 0)
     assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
     try {
       followerReplicaPartition0.highWatermark
@@ -65,10 +73,9 @@ class HighwatermarkPersistenceTest exten
     // set the highwatermark for local replica
     partition0.getReplica().get.highWatermark = 5L
     replicaManager.checkpointHighWatermarks()
-    fooPartition0Hw = replicaManager.highWatermarkCheckpoint.read(topic, 0)
+    fooPartition0Hw = hwmFor(replicaManager, topic, 0)
     assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
     EasyMock.verify(zkClient)
-    EasyMock.verify(log0)
   }
 
   def testHighWatermarkPersistenceMultiplePartitions() {
@@ -81,35 +88,35 @@ class HighwatermarkPersistenceTest exten
     val scheduler = new KafkaScheduler(2)
     scheduler.startup
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, null)
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler, logManagers(0))
     replicaManager.startup()
     replicaManager.checkpointHighWatermarks()
-    var topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+    var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
     assertEquals(0L, topic1Partition0Hw)
     val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
     // create leader log
-    val topic1Log0 = getMockLog
+    val topic1Log0 = logManagers(0).getOrCreateLog(topic1, 0)
     // create a local replica for topic1
     val leaderReplicaTopic1Partition0 = new Replica(configs.head.brokerId, topic1Partition0, SystemTime, 0, Some(topic1Log0))
     topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
     replicaManager.checkpointHighWatermarks()
-    topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+    topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
     assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw)
     // set the highwatermark for local replica
     topic1Partition0.getReplica().get.highWatermark = 5L
     replicaManager.checkpointHighWatermarks()
-    topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+    topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
     assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
     assertEquals(5L, topic1Partition0Hw)
     // add another partition and set highwatermark
     val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
     // create leader log
-    val topic2Log0 = getMockLog
+    val topic2Log0 = logManagers(0).getOrCreateLog(topic2, 0)
     // create a local replica for topic2
     val leaderReplicaTopic2Partition0 =  new Replica(configs.head.brokerId, topic2Partition0, SystemTime, 0, Some(topic2Log0))
     topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
     replicaManager.checkpointHighWatermarks()
-    var topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
+    var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
     assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw)
     // set the highwatermark for local replica
     topic2Partition0.getReplica().get.highWatermark = 15L
@@ -119,19 +126,16 @@ class HighwatermarkPersistenceTest exten
     assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark)
     replicaManager.checkpointHighWatermarks()
     // verify checkpointed hw for topic 2
-    topic2Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic2, 0)
+    topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
     assertEquals(15L, topic2Partition0Hw)
     // verify checkpointed hw for topic 1
-    topic1Partition0Hw = replicaManager.highWatermarkCheckpoint.read(topic1, 0)
+    topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
     assertEquals(10L, topic1Partition0Hw)
     EasyMock.verify(zkClient)
-    EasyMock.verify(topic1Log0)
-    EasyMock.verify(topic2Log0)
   }
 
-  private def getMockLog: Log = {
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.replay(log)
-    log
+  def hwmFor(replicaManager: ReplicaManager, topic: String, partition: Int): Long = {
+    replicaManager.highWatermarkCheckpoints(replicaManager.config.logDirs(0)).read(topic, partition)
   }
+  
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala Fri Nov  2 19:01:38 2012
@@ -45,7 +45,7 @@ class LeaderElectionTest extends JUnit3S
 
   override def tearDown() {
     servers.map(server => server.shutdown())
-    servers.map(server => Utils.rm(server.config.logDir))
+    servers.map(server => Utils.rm(server.config.logDirs))
     super.tearDown()
   }
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Fri Nov  2 19:01:38 2012
@@ -45,8 +45,8 @@ class LogRecoveryTest extends JUnit3Suit
   val message = new Message("hello".getBytes())
 
   var producer: Producer[Int, Message] = null
-  var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir)
-  var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir)
+  var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDirs(0))
+  var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDirs(0))
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   def testHWCheckpointNoFailuresSingleLogSegment {
@@ -83,7 +83,7 @@ class LogRecoveryTest extends JUnit3Suit
     assertEquals(numMessages, leaderHW)
     val followerHW = hwFile2.read(topic, 0)
     assertEquals(numMessages, followerHW)
-    servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDir)})
+    servers.foreach(server => { server.shutdown(); Utils.rm(server.config.logDirs(0))})
   }
 
   def testHWCheckpointWithFailuresSingleLogSegment {
@@ -148,7 +148,7 @@ class LogRecoveryTest extends JUnit3Suit
     producer.close()
     assertEquals(hw, hwFile1.read(topic, 0))
     assertEquals(hw, hwFile2.read(topic, 0))
-    servers.foreach(server => Utils.rm(server.config.logDir))
+    servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 
   def testHWCheckpointNoFailuresMultipleLogSegments {
@@ -165,8 +165,8 @@ class LogRecoveryTest extends JUnit3Suit
     server2 = TestUtils.createServer(configs.last)
     servers ++= List(server1, server2)
 
-    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
-    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
+    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
 
     val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
@@ -193,7 +193,7 @@ class LogRecoveryTest extends JUnit3Suit
     assertEquals(hw, leaderHW)
     val followerHW = hwFile2.read(topic, 0)
     assertEquals(hw, followerHW)
-    servers.foreach(server => Utils.rm(server.config.logDir))
+    servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 
   def testHWCheckpointWithFailuresMultipleLogSegments {
@@ -210,8 +210,8 @@ class LogRecoveryTest extends JUnit3Suit
     server2 = TestUtils.createServer(configs.last)
     servers ++= List(server1, server2)
 
-    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
-    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDirs(0))
+    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDirs(0))
 
     val producerProps = getProducerConfig(TestUtils.getBrokerListStrFromConfigs(configs), 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
@@ -263,7 +263,7 @@ class LogRecoveryTest extends JUnit3Suit
     producer.close()
     assertEquals(hw, hwFile1.read(topic, 0))
     assertEquals(hw, hwFile2.read(topic, 0))
-    servers.foreach(server => Utils.rm(server.config.logDir))
+    servers.foreach(server => Utils.rm(server.config.logDirs))
   }
 
   private def sendMessages(n: Int = 1) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala Fri Nov  2 19:01:38 2012
@@ -31,71 +31,66 @@ import kafka.utils.{TestUtils, Utils}
 
 class ServerShutdownTest extends JUnit3Suite with ZooKeeperTestHarness {
   val port = TestUtils.choosePort
+  val props = TestUtils.createBrokerConfig(0, port)
+  val config = new KafkaConfig(props)
+
+  val host = "localhost"
+  val topic = "test"
+  val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
+  val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
 
   @Test
   def testCleanShutdown() {
-    val props = TestUtils.createBrokerConfig(0, port)
-    val config = new KafkaConfig(props)
-
-    val host = "localhost"
-    val topic = "test"
-    val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
-    val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
-
-    {
-      val server = new KafkaServer(config)
-      server.startup()
-
-      // create topic
-      CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
-
-      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
-
-      // send some messages
-      producer.send(new ProducerData[Int, Message](topic, 0, sent1))
-
-      // do a clean shutdown
-      server.shutdown()
-      val cleanShutDownFile = new File(new File(config.logDir), server.CleanShutdownFile)
+    var server = new KafkaServer(config)
+    server.startup()
+    var producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+
+    // create topic
+    CreateTopicCommand.createTopic(zkClient, topic, 1, 1, "0")
+    // send some messages
+    producer.send(new ProducerData[Int, Message](topic, 0, sent1))
+
+    // do a clean shutdown and check that the clean shudown file is written out
+    server.shutdown()
+    for(logDir <- config.logDirs) {
+      val cleanShutDownFile = new File(logDir, server.logManager.CleanShutdownFile)
       assertTrue(cleanShutDownFile.exists)
-      producer.close()
     }
+    producer.close()
+    
+    /* now restart the server and check that the written data is still readable and everything still works */
+    server = new KafkaServer(config)
+    server.startup()
+
+    producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
+    val consumer = new SimpleConsumer(host,
+                                      port,
+                                      1000000,
+                                      64*1024)
+
+    waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
+
+    var fetchedMessage: ByteBufferMessageSet = null
+    while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+      val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).maxWait(0).build())
+      fetchedMessage = fetched.messageSet(topic, 0)
+    }
+    TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
+    val newOffset = fetchedMessage.last.nextOffset
 
+    // send some more messages
+    producer.send(new ProducerData[Int, Message](topic, 0, sent2))
 
-    {
-      val producer = new Producer[Int, Message](new ProducerConfig(getProducerConfig(TestUtils.getBrokerListStrFromConfigs(Seq(config)), 64*1024, 100000, 10000)))
-      val consumer = new SimpleConsumer(host,
-                                        port,
-                                        1000000,
-                                        64*1024)
-
-      val server = new KafkaServer(config)
-      server.startup()
-
-      waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0, 1000)
-
-      var fetchedMessage: ByteBufferMessageSet = null
-      while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
-        val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, 0, 10000).build())
-        fetchedMessage = fetched.messageSet(topic, 0)
-      }
-      TestUtils.checkEquals(sent1.iterator, fetchedMessage.map(m => m.message).iterator)
-      val newOffset = fetchedMessage.last.nextOffset
-
-      // send some more messages
-      producer.send(new ProducerData[Int, Message](topic, 0, sent2))
-
-      fetchedMessage = null
-      while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
-        val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
-        fetchedMessage = fetched.messageSet(topic, 0)
-      }
-      TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
-
-      server.shutdown()
-      Utils.rm(server.config.logDir)
-      producer.close()
+    fetchedMessage = null
+    while(fetchedMessage == null || fetchedMessage.validBytes == 0) {
+      val fetched = consumer.fetch(new FetchRequestBuilder().addFetch(topic, 0, newOffset, 10000).build())
+      fetchedMessage = fetched.messageSet(topic, 0)
     }
+    TestUtils.checkEquals(sent2.iterator, fetchedMessage.map(m => m.message).iterator)
 
+    consumer.close()
+    producer.close()
+    server.shutdown()
+    Utils.rm(server.config.logDirs)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Fri Nov  2 19:01:38 2012
@@ -74,7 +74,6 @@ class SimpleFetchTest extends JUnit3Suit
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
-    EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint]))
     EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
     EasyMock.replay(replicaManager)
 
@@ -169,7 +168,6 @@ class SimpleFetchTest extends JUnit3Suit
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
-    EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint]))
     EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
     EasyMock.replay(replicaManager)
 

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1405102&r1=1405101&r2=1405102&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Fri Nov  2 19:01:38 2012
@@ -483,6 +483,7 @@ object TestUtils extends Logging {
     byteBuffer.rewind()
     byteBuffer
   }
+  
 }
 
 object TestZKUtils {