You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2012/07/26 01:42:08 UTC

svn commit: r1365841 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/cluster/ main/scala/kafka/consumer/ main/scala/kafka/log/ main/scala/kafka/message/ main/scala/kafka/server/ main/scala/kafka/utils/ test/scala/unit/kafka/log/ test/scal...

Author: nehanarkhede
Date: Wed Jul 25 23:42:07 2012
New Revision: 1365841

URL: http://svn.apache.org/viewvc?rev=1365841&view=rev
Log:
KAFKA-405 Improve high watermark maintenance to store high watermarks for all partitions in a single .highwatermark file; patched by Neha Narkhede; reviewed by Jay Kreps and Jun Rao

Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Wed Jul 25 23:42:07 2012
@@ -56,7 +56,9 @@ class Partition(val topic: String,
     assignedReplicas
   }
 
-  def getReplica(replicaId: Int): Option[Replica] = assignedReplicas().find(_.brokerId == replicaId)
+  def getReplica(replicaId: Int): Option[Replica] = {
+    assignedReplicas().find(_.brokerId == replicaId)
+  }
 
   def addReplica(replica: Replica): Boolean = {
     if(!assignedReplicas.contains(replica)) {
@@ -65,8 +67,7 @@ class Partition(val topic: String,
     }else false
   }
 
-  def updateReplicaLEO(replica: Replica, leo: Long) {
-    replica.leoUpdateTime = time.milliseconds
+  def updateReplicaLeo(replica: Replica, leo: Long) {
     replica.logEndOffset(Some(leo))
     debug("Updating the leo to %d for replica %d".format(leo, replica.brokerId))
   }
@@ -108,7 +109,7 @@ class Partition(val topic: String,
     val possiblyStuckReplicas = inSyncReplicas.filter(r => r.logEndOffset() < leaderReplica().logEndOffset())
     info("Possibly stuck replicas for topic %s partition %d are %s".format(topic, partitionId,
       possiblyStuckReplicas.map(_.brokerId).mkString(",")))
-    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime() < (time.milliseconds - keepInSyncTimeMs))
+    val stuckReplicas = possiblyStuckReplicas.filter(r => r.logEndOffsetUpdateTime < (time.milliseconds - keepInSyncTimeMs))
     info("Stuck replicas for topic %s partition %d are %s".format(topic, partitionId, stuckReplicas.map(_.brokerId).mkString(",")))
     val leader = leaderReplica()
     // Case 2 above

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Wed Jul 25 23:42:07 2012
@@ -18,43 +18,38 @@
 package kafka.cluster
 
 import kafka.log.Log
-import kafka.utils.Logging
+import kafka.utils.{SystemTime, Time, Logging}
 import kafka.common.KafkaException
 
 class Replica(val brokerId: Int,
               val partition: Partition,
               val topic: String,
-              var log: Option[Log] = None,
-              var leoUpdateTime: Long = -1L) extends Logging {
+              time: Time = SystemTime,
+              var hw: Option[Long] = None,
+              var log: Option[Log] = None) extends Logging {
   private var logEndOffset: Long = -1L
+  private var logEndOffsetUpdateTimeMs: Long = -1L
 
   def logEndOffset(newLeo: Option[Long] = None): Long = {
     isLocal match {
       case true =>
         newLeo match {
-          case Some(newOffset) => throw new KafkaException("Trying to set the leo %d for local log".format(newOffset))
+          case Some(newOffset) => logEndOffsetUpdateTimeMs = time.milliseconds; newOffset
           case None => log.get.logEndOffset
         }
       case false =>
         newLeo match {
           case Some(newOffset) =>
             logEndOffset = newOffset
+            logEndOffsetUpdateTimeMs = time.milliseconds
+            trace("Setting log end offset for replica %d for topic %s partition %d to %d"
+              .format(brokerId, topic, partition.partitionId, logEndOffset))
             logEndOffset
           case None => logEndOffset
         }
     }
   }
 
-  def logEndOffsetUpdateTime(time: Option[Long] = None): Long = {
-    time match {
-      case Some(t) =>
-        leoUpdateTime = t
-        leoUpdateTime
-      case None =>
-        leoUpdateTime
-    }
-  }
-
   def isLocal: Boolean = {
     log match {
       case Some(l) => true
@@ -62,6 +57,8 @@ class Replica(val brokerId: Int,
     }
   }
 
+  def logEndOffsetUpdateTime = logEndOffsetUpdateTimeMs
+
   def highWatermark(highwaterMarkOpt: Option[Long] = None): Long = {
     highwaterMarkOpt match {
       case Some(highwaterMark) =>
@@ -69,7 +66,7 @@ class Replica(val brokerId: Int,
           case true =>
             trace("Setting hw for topic %s partition %d on broker %d to %d".format(topic, partition.partitionId,
                                                                                    brokerId, highwaterMark))
-            log.get.setHW(highwaterMark)
+            hw = Some(highwaterMark)
             highwaterMark
           case false => throw new KafkaException("Unable to set highwatermark for topic %s ".format(topic) +
             "partition %d on broker %d, since there is no local log for this partition"
@@ -78,7 +75,11 @@ class Replica(val brokerId: Int,
       case None =>
         isLocal match {
           case true =>
-            log.get.getHW()
+            hw match {
+              case Some(highWatermarkValue) => highWatermarkValue
+              case None => throw new KafkaException("HighWatermark does not exist for topic %s ".format(topic) +
+              " partition %d on broker %d but local log exists".format(partition.partitionId, brokerId))
+            }
           case false => throw new KafkaException("Unable to get highwatermark for topic %s ".format(topic) +
             "partition %d on broker %d, since there is no local log for this partition"
               .format(partition.partitionId, brokerId))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Wed Jul 25 23:42:07 2012
@@ -93,9 +93,8 @@ private[kafka] class ZookeeperConsumerCo
   private var fetcher: Option[ConsumerFetcherManager] = None
   private var zkClient: ZkClient = null
   private var topicRegistry = new Pool[String, Pool[Int, PartitionTopicInfo]]
-  // topicThreadIdAndQueues : (topic,consumerThreadId) -> queue
   private val topicThreadIdAndQueues = new Pool[(String,String), BlockingQueue[FetchedDataChunk]]
-  private val scheduler = new KafkaScheduler(1, "Kafka-consumer-autocommit-", false)
+  private val scheduler = new KafkaScheduler(1)
   private val messageStreamCreated = new AtomicBoolean(false)
 
   private var sessionExpirationListener: ZKSessionExpireListener = null
@@ -121,8 +120,10 @@ private[kafka] class ZookeeperConsumerCo
   connectZk()
   createFetcher()
   if (config.autoCommit) {
+    scheduler.startUp
     info("starting auto committer every " + config.autoCommitIntervalMs + " ms")
-    scheduler.scheduleWithRate(autoCommit, config.autoCommitIntervalMs, config.autoCommitIntervalMs)
+    scheduler.scheduleWithRate(autoCommit, "Kafka-consumer-autocommit-", config.autoCommitIntervalMs,
+      config.autoCommitIntervalMs, false)
   }
 
   def this(config: ConsumerConfig) = this(config, true)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Wed Jul 25 23:42:07 2012
@@ -18,7 +18,7 @@
 package kafka.log
 
 import kafka.api.OffsetRequest
-import java.io.{IOException, RandomAccessFile, File}
+import java.io.{IOException, File}
 import java.util.{Comparator, Collections, ArrayList}
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
 import kafka.utils._
@@ -105,10 +105,8 @@ class LogSegment(val file: File, val mes
    * of data to be deleted, we have to compute the offset relative to start of the log segment
    * @param offset Absolute offset for this partition
    */
-  def truncateUpto(offset: Long) = {
-    assert(offset >= start, "Offset %d used for truncating this log segment cannot be smaller than the start offset %d".
-                            format(offset, start))
-    messageSet.truncateUpto(offset - start)
+  def truncateTo(offset: Long) = {
+    messageSet.truncateTo(offset - start)
   }
 }
 
@@ -134,13 +132,6 @@ private[kafka] class Log(val dir: File, 
   /* The actual segments of the log */
   private[log] val segments: SegmentList[LogSegment] = loadSegments()
 
-  /* create the leader highwatermark file handle */
-  private val hwFile = new RandomAccessFile(dir.getAbsolutePath + "/" + hwFileName, "rw")
-  info("Created highwatermark file %s for log %s".format(dir.getAbsolutePath + "/" + hwFileName, name))
-
-  /* If hw file is absent, the hw defaults to 0. If it exists, hw is set to the checkpointed value */
-  private var hw: Long = if(hwFile.length() > 0) hwFile.readLong() else { hwFile.writeLong(0); 0 }
-
   private val logStats = new LogStats(this)
 
   Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
@@ -221,8 +212,6 @@ private[kafka] class Log(val dir: File, 
         info("Closing log segment " + seg.file.getAbsolutePath)
         seg.messageSet.close()
       }
-      checkpointHW()
-      hwFile.close()
     }
   }
 
@@ -363,7 +352,6 @@ private[kafka] class Log(val dir: File, 
       segments.view.last.messageSet.flush()
       unflushed.set(0)
       lastflushedTime.set(System.currentTimeMillis)
-      checkpointHW()
      }
   }
 
@@ -435,47 +423,31 @@ private[kafka] class Log(val dir: File, 
     total
   }
 
-  def recoverUptoLastCheckpointedHW() {
-    if(hwFile.length() > 0) {
-      // read the last checkpointed hw from disk
-      hwFile.seek(0)
-      val lastKnownHW = hwFile.readLong()
-      info("Recovering log %s upto highwatermark %d".format(name, lastKnownHW))
+  def truncateTo(targetOffset: Long) {
       // find the log segment that has this hw
       val segmentToBeTruncated = segments.view.find(segment =>
-        lastKnownHW >= segment.start && lastKnownHW < segment.absoluteEndOffset)
+        targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
 
       segmentToBeTruncated match {
         case Some(segment) =>
-          segment.truncateUpto(lastKnownHW)
-          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, hw))
+          val truncatedSegmentIndex = segments.view.indexOf(segment)
+          segments.truncLast(truncatedSegmentIndex)
+          segment.truncateTo(targetOffset)
+          info("Truncated log segment %s to highwatermark %d".format(segment.file.getAbsolutePath, targetOffset))
         case None =>
-          assert(lastKnownHW <= logEndOffset,
+          assert(targetOffset <= segments.view.last.absoluteEndOffset,
             "Last checkpointed hw %d cannot be greater than the latest message offset %d in the log %s".
-              format(lastKnownHW, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+              format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
           error("Cannot truncate log to %d since the log start offset is %d and end offset is %d"
-            .format(lastKnownHW, segments.view.head.start, logEndOffset))
+            .format(targetOffset, segments.view.head.start, segments.view.last.absoluteEndOffset))
       }
 
-      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > lastKnownHW)
+      val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
+      if(segmentsToBeDeleted.size < segments.view.size) {
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
         error("Failed to delete some segments during log recovery")
-    }else
-      info("Unable to recover log upto hw. No previously checkpointed high watermark found for " + name)
-  }
-
-  def setHW(latestLeaderHW: Long) {
-    hw = latestLeaderHW
-  }
-
-  def getHW(): Long = hw
-
-  def checkpointHW() {
-    hwFile.seek(0)
-    hwFile.writeLong(hw)
-    hwFile.getChannel.force(true)
-    info("Checkpointed highwatermark %d for log %s".format(hw, name))
+      }
   }
 
   def topicName():String = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Wed Jul 25 23:42:07 2012
@@ -30,6 +30,7 @@ import kafka.common.{KafkaException, Inv
  */
 @threadsafe
 private[kafka] class LogManager(val config: KafkaConfig,
+                                scheduler: KafkaScheduler,
                                 private val time: Time,
                                 val logCleanupIntervalMs: Long,
                                 val logCleanupDefaultAgeMs: Long,
@@ -40,11 +41,9 @@ private[kafka] class LogManager(val conf
   private val maxSize: Long = config.logFileSize
   private val flushInterval = config.flushInterval
   private val logCreationLock = new Object
-  private val logFlusherScheduler = new KafkaScheduler(1, "kafka-logflusher-", false)
   private val logFlushIntervals = config.flushIntervalMap
   private val logRetentionMs = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
   private val logRetentionSize = config.logRetentionSize
-  private val scheduler = new KafkaScheduler(1, "kafka-logcleaner-", false)
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -76,17 +75,13 @@ private[kafka] class LogManager(val conf
   def startup() {
     /* Schedule the cleanup task to delete old logs */
     if(scheduler != null) {
-      if(scheduler.hasShutdown) {
-        println("Restarting log cleaner scheduler")
-        scheduler.startUp
-      }
-      info("starting log cleaner every " + logCleanupIntervalMs + " ms")
-      scheduler.scheduleWithRate(cleanupLogs, 60 * 1000, logCleanupIntervalMs)
+      info("Starting log cleaner every " + logCleanupIntervalMs + " ms")
+      scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
+      info("Starting log flusher every " + config.flushSchedulerThreadRate +
+        " ms with the following overrides " + logFlushIntervals)
+      scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-",
+        config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
     }
-
-    if(logFlusherScheduler.hasShutdown) logFlusherScheduler.startUp
-    info("Starting log flusher every " + config.flushSchedulerThreadRate + " ms with the following overrides " + logFlushIntervals)
-    logFlusherScheduler.scheduleWithRate(flushAllLogs, config.flushSchedulerThreadRate, config.flushSchedulerThreadRate)
   }
 
 
@@ -95,7 +90,7 @@ private[kafka] class LogManager(val conf
    */
   private def createLog(topic: String, partition: Int): Log = {
     if (topic.length <= 0)
-      throw new InvalidTopicException("topic name can't be emtpy")
+      throw new InvalidTopicException("Topic name can't be emtpy")
     if (partition < 0 || partition >= config.topicPartitionsMap.getOrElse(topic, numPartitions)) {
       val error = "Wrong partition %d, valid partitions (0, %d)."
         .format(partition, (config.topicPartitionsMap.getOrElse(topic, numPartitions) - 1))
@@ -207,10 +202,8 @@ private[kafka] class LogManager(val conf
   /**
    * Close all the logs
    */
-  def close() {
+  def shutdown() {
     info("Closing log manager")
-    scheduler.shutdown()
-    logFlusherScheduler.shutdown()
     allLogs.foreach(_.close())
   }
   
@@ -220,7 +213,7 @@ private[kafka] class LogManager(val conf
   def allLogs() = logs.values.flatMap(_.values)
 
   private def flushAllLogs() = {
-    debug("flushing the high watermark of all logs")
+    debug("Flushing the high watermark of all logs")
     for (log <- allLogs)
     {
       try{

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala Wed Jul 25 23:42:07 2012
@@ -36,7 +36,7 @@ class LogStats(val log: Log) extends Log
   
   def getNumberOfSegments: Int = log.numberOfSegments
   
-  def getCurrentOffset: Long = log.getHW()
+  def getCurrentOffset: Long = log.logEndOffset
   
   def getNumAppendedMessages: Long = numCumulatedMessages.get
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Wed Jul 25 23:42:07 2012
@@ -196,9 +196,12 @@ class FileMessageSet private[kafka](priv
     len - validUpTo
   }
 
-  def truncateUpto(hw: Long) = {
-    channel.truncate(hw)
-    setSize.set(hw)
+  def truncateTo(targetSize: Long) = {
+    if(targetSize >= sizeInBytes())
+      throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
+        " size of this log segment is only %d bytes".format(sizeInBytes()))
+    channel.truncate(targetSize)
+    setSize.set(targetSize)
   }
 
   /**

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala?rev=1365841&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala Wed Jul 25 23:42:07 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.Logging
+import java.util.concurrent.locks.ReentrantLock
+import java.io._
+
+/**
+ * This class handles the read/write to the highwaterMark checkpoint file. The file stores the high watermark value for
+ * all topics and partitions that this broker hosts. The format of this file is as follows -
+ * version
+ * number of entries
+ * topic partition highwatermark
+ */
+
+object HighwaterMarkCheckpoint {
+  val highWatermarkFileName = ".highwatermark"
+  val currentHighwaterMarkFileVersion = 0
+}
+
+class HighwaterMarkCheckpoint(val path: String) extends Logging {
+  /* create the highwatermark file handle for all partitions */
+  val name = path + "/" + HighwaterMarkCheckpoint.highWatermarkFileName
+  private val hwFile = new File(name)
+  private val hwFileLock = new ReentrantLock()
+  // recover from previous tmp file, if required
+
+  def write(highwaterMarksPerPartition: Map[(String, Int), Long]) {
+    hwFileLock.lock()
+    try {
+      // write to temp file and then swap with the highwatermark file
+      val tempHwFile = new File(hwFile + ".tmp")
+      // it is an error for this file to be present. It could mean that the previous rename operation failed
+      if(tempHwFile.exists()) {
+        fatal("Temporary high watermark %s file exists. This could mean that the ".format(tempHwFile.getAbsolutePath) +
+          "previous high watermark checkpoint operation has failed.")
+        System.exit(1)
+      }
+      val hwFileWriter = new BufferedWriter(new FileWriter(tempHwFile))
+      // checkpoint highwatermark for all partitions
+      // write the current version
+      hwFileWriter.write(HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion.toString)
+      hwFileWriter.newLine()
+      // write the number of entries in the highwatermark file
+      hwFileWriter.write(highwaterMarksPerPartition.size.toString)
+      hwFileWriter.newLine()
+
+      highwaterMarksPerPartition.foreach { partitionAndHw =>
+        val topic = partitionAndHw._1._1
+        val partitionId = partitionAndHw._1._2
+        hwFileWriter.write("%s %s %s".format(topic, partitionId, partitionAndHw._2))
+        hwFileWriter.newLine()
+      }
+      hwFileWriter.flush()
+      hwFileWriter.close()
+      // swap new high watermark file with previous one
+      hwFile.delete()
+      if(!tempHwFile.renameTo(hwFile)) {
+        fatal("Attempt to swap the new high watermark file with the old one failed")
+        System.exit(1)
+      }
+    }finally {
+      hwFileLock.unlock()
+    }
+  }
+
+  def read(topic: String, partition: Int): Long = {
+    hwFileLock.lock()
+    try {
+      hwFile.length() match {
+        case 0 => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+          "partition %d. Returning 0 as the highwatermark".format(partition))
+        0L
+        case _ =>
+          val hwFileReader = new BufferedReader(new FileReader(hwFile))
+          val version = hwFileReader.readLine().toShort
+          version match {
+            case HighwaterMarkCheckpoint.currentHighwaterMarkFileVersion =>
+              val numberOfHighWatermarks = hwFileReader.readLine().toInt
+              val partitionHighWatermarks =
+                for(i <- 0 until numberOfHighWatermarks) yield {
+                  val nextHwEntry = hwFileReader.readLine()
+                  val partitionHwInfo = nextHwEntry.split(" ")
+                  val highwaterMark = partitionHwInfo.last.toLong
+                  val partitionId = partitionHwInfo.takeRight(2).head
+                  // find the index of partition
+                  val partitionIndex = nextHwEntry.indexOf(partitionId)
+                  val topic = nextHwEntry.substring(0, partitionIndex-1)
+                  ((topic, partitionId.toInt) -> highwaterMark)
+                }
+              hwFileReader.close()
+              val hwOpt = partitionHighWatermarks.toMap.get((topic, partition))
+              hwOpt match {
+                case Some(hw) => debug("Read hw %d for topic %s partition %d from highwatermark checkpoint file"
+                                        .format(hw, topic, partition))
+                  hw
+                case None => warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
+                  "partition %d. Returning 0 as the highwatermark".format(partition))
+                0L
+              }
+            case _ => fatal("Unrecognized version of the highwatermark checkpoint file " + version)
+            System.exit(1)
+            -1L
+          }
+      }
+    }finally {
+      hwFileLock.unlock()
+    }
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Wed Jul 25 23:42:07 2012
@@ -171,7 +171,7 @@ class KafkaApis(val requestChannel: Requ
           kafkaZookeeper.ensurePartitionLeaderOnThisBroker(topicData.topic, partitionData.partition)
           val log = logManager.getOrCreateLog(topicData.topic, partitionData.partition)
           log.append(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
-          replicaManager.recordLeaderLogUpdate(topicData.topic, partitionData.partition)
+          replicaManager.recordLeaderLogEndOffset(topicData.topic, partitionData.partition, log.logEndOffset)
           offsets(msgIndex) = log.logEndOffset
           errors(msgIndex) = ErrorMapping.NoError.toShort
           trace(partitionData.messages.sizeInBytes + " bytes written to logs.")

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Wed Jul 25 23:42:07 2012
@@ -40,11 +40,12 @@ class KafkaServer(val config: KafkaConfi
   private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
-  private var logManager: LogManager = null
+  var logManager: LogManager = null
   var kafkaZookeeper: KafkaZooKeeper = null
-  private var replicaManager: ReplicaManager = null
+  var replicaManager: ReplicaManager = null
   private var apis: KafkaApis = null
   var kafkaController: KafkaController = new KafkaController(config)
+  val kafkaScheduler = new KafkaScheduler(4)
   var zkClient: ZkClient = null
 
   /**
@@ -62,9 +63,13 @@ class KafkaServer(val config: KafkaConfi
       cleanShutDownFile.delete
     }
     /* start client */
-    info("connecting to ZK: " + config.zkConnect)
+    info("Connecting to ZK: " + config.zkConnect)
     zkClient = KafkaZookeeperClient.getZookeeperClient(config)
+    /* start scheduler */
+    kafkaScheduler.startUp
+    /* start log manager */
     logManager = new LogManager(config,
+                                kafkaScheduler,
                                 SystemTime,
                                 1000L * 60 * config.logCleanupIntervalMinutes,
                                 1000L * 60 * 60 * config.logRetentionHours,
@@ -80,7 +85,7 @@ class KafkaServer(val config: KafkaConfi
 
     kafkaZookeeper = new KafkaZooKeeper(config, zkClient, addReplica, getReplica, makeLeader, makeFollower)
 
-    replicaManager = new ReplicaManager(config, time, zkClient)
+    replicaManager = new ReplicaManager(config, time, zkClient, kafkaScheduler)
 
     apis = new KafkaApis(socketServer.requestChannel, logManager, replicaManager, kafkaZookeeper)
     requestHandlerPool = new KafkaRequestHandlerPool(socketServer.requestChannel, apis, config.numIoThreads)
@@ -90,6 +95,9 @@ class KafkaServer(val config: KafkaConfi
 
     // starting relevant replicas and leader election for partitions assigned to this broker
     kafkaZookeeper.startup()
+    // start the replica manager
+    replicaManager.startup()
+    // start the controller
     kafkaController.startup()
 
     info("Server started.")
@@ -103,23 +111,21 @@ class KafkaServer(val config: KafkaConfi
     val canShutdown = isShuttingDown.compareAndSet(false, true);
     if (canShutdown) {
       info("Shutting down Kafka server with id " + config.brokerId)
+      kafkaScheduler.shutdown()
       apis.close()
       if(replicaManager != null)
-        replicaManager.close()
+        replicaManager.shutdown()
       if (socketServer != null)
         socketServer.shutdown()
       if(requestHandlerPool != null)
         requestHandlerPool.shutdown()
       Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
-        logManager.close()
+        logManager.shutdown()
       if(kafkaController != null)
         kafkaController.shutDown()
-
-      kafkaZookeeper.close
-      info("Closing zookeeper client...")
+      kafkaZookeeper.shutdown()
       zkClient.close()
-
       val cleanShutDownFile = new File(new File(config.logDir), CleanShutdownFile)
       debug("Creating clean shutdown file " + cleanShutDownFile.getAbsolutePath())
       cleanShutDownFile.createNewFile

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaZooKeeper.scala Wed Jul 25 23:42:07 2012
@@ -104,7 +104,7 @@ class KafkaZooKeeper(config: KafkaConfig
     }
   }
 
-  def close() {
+  def shutdown() {
     stateChangeHandler.shutdown()
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Wed Jul 25 23:42:07 2012
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package kafka.server
 
 import kafka.log.Log
@@ -25,21 +25,28 @@ import java.util.concurrent.locks.Reentr
 import kafka.utils.{KafkaScheduler, ZkUtils, Time, Logging}
 import kafka.common.{KafkaException, InvalidPartitionException}
 
-class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient) extends Logging {
+class ReplicaManager(val config: KafkaConfig, time: Time, zkClient: ZkClient, kafkaScheduler: KafkaScheduler)
+  extends Logging {
 
   private var allReplicas = new mutable.HashMap[(String, Int), Partition]()
   private var leaderReplicas = new ListBuffer[Partition]()
   private val leaderReplicaLock = new ReentrantLock()
-  private var isrExpirationScheduler = new KafkaScheduler(1, "isr-expiration-thread-", true)
   private val replicaFetcherManager = new ReplicaFetcherManager(config, this)
+  private val highwaterMarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
+  info("Created highwatermark file %s on broker %d".format(highwaterMarkCheckpoint.name, config.brokerId))
 
-  // start ISR expiration thread
-  isrExpirationScheduler.startUp
-  isrExpirationScheduler.scheduleWithRate(maybeShrinkISR, 0, config.replicaMaxLagTimeMs)
+  def startup() {
+    // start the highwatermark checkpoint thread
+    kafkaScheduler.scheduleWithRate(checkpointHighwaterMarks, "highwatermark-checkpoint-thread", 0,
+      config.defaultFlushIntervalMs)
+    // start ISR expiration thread
+    kafkaScheduler.scheduleWithRate(maybeShrinkISR, "isr-expiration-thread-", 0, config.replicaMaxLagTimeMs)
+  }
 
   def addLocalReplica(topic: String, partitionId: Int, log: Log, assignedReplicaIds: Set[Int]): Replica = {
     val partition = getOrCreatePartition(topic, partitionId, assignedReplicaIds)
-    val localReplica = new Replica(config.brokerId, partition, topic, Some(log))
+    val localReplica = new Replica(config.brokerId, partition, topic, time,
+      Some(readCheckpointedHighWatermark(topic, partitionId)), Some(log))
 
     val replicaOpt = partition.getReplica(config.brokerId)
     replicaOpt match {
@@ -81,12 +88,12 @@ class ReplicaManager(val config: KafkaCo
       case Some(partition) => partition
       case None =>
         throw new InvalidPartitionException("Partition for topic %s partition %d doesn't exist in replica manager on %d"
-        .format(topic, partitionId, config.brokerId))
+          .format(topic, partitionId, config.brokerId))
     }
   }
 
   def addRemoteReplica(topic: String, partitionId: Int, replicaId: Int, partition: Partition): Replica = {
-    val remoteReplica = new Replica(replicaId, partition, topic)
+    val remoteReplica = new Replica(replicaId, partition, topic, time)
 
     val replicaAdded = partition.addReplica(remoteReplica)
     if(replicaAdded)
@@ -112,20 +119,17 @@ class ReplicaManager(val config: KafkaCo
         Some(replicas.leaderReplica())
       case None =>
         throw new KafkaException("Getting leader replica failed. Partition replica metadata for topic " +
-      "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
+          "%s partition %d doesn't exist in Replica manager on %d".format(topic, partitionId, config.brokerId))
     }
   }
 
-  def getPartition(topic: String, partitionId: Int): Option[Partition] =
-    allReplicas.get((topic, partitionId))
-
-  def updateReplicaLEO(replica: Replica, fetchOffset: Long) {
+  private def updateReplicaLeo(replica: Replica, fetchOffset: Long) {
     // set the replica leo
     val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
-    partition.updateReplicaLEO(replica, fetchOffset)
+    partition.updateReplicaLeo(replica, fetchOffset)
   }
 
-  def maybeIncrementLeaderHW(replica: Replica) {
+  private def maybeIncrementLeaderHW(replica: Replica) {
     // set the replica leo
     val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
     // set the leader HW to min of the leo of all replicas
@@ -141,58 +145,70 @@ class ReplicaManager(val config: KafkaCo
   }
 
   def makeLeader(replica: Replica, currentISRInZk: Seq[Int]) {
-    // read and cache the ISR
-    replica.partition.leaderId(Some(replica.brokerId))
-    replica.partition.updateISR(currentISRInZk.toSet)
-    // stop replica fetcher thread, if any
-    replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
-    // also add this partition to the list of partitions for which the leader is the current broker
+    info("Broker %d started the leader state transition for topic %s partition %d"
+      .format(config.brokerId, replica.topic, replica.partition.partitionId))
     try {
+      // read and cache the ISR
+      replica.partition.leaderId(Some(replica.brokerId))
+      replica.partition.updateISR(currentISRInZk.toSet)
+      // stop replica fetcher thread, if any
+      replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
+      // also add this partition to the list of partitions for which the leader is the current broker
       leaderReplicaLock.lock()
       leaderReplicas += replica.partition
+      info("Broker %d completed the leader state transition for topic %s partition %d"
+        .format(config.brokerId, replica.topic, replica.partition.partitionId))
+    }catch {
+      case e => error("Broker %d failed to complete the leader state transition for topic %s partition %d"
+        .format(config.brokerId, replica.topic, replica.partition.partitionId), e)
     }finally {
       leaderReplicaLock.unlock()
     }
   }
 
   def makeFollower(replica: Replica, leaderBrokerId: Int, zkClient: ZkClient) {
-    info("broker %d intending to follow leader %d for topic %s partition %d"
+    info("Broker %d starting the follower state transition to follow leader %d for topic %s partition %d"
       .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
-    // set the leader for this partition correctly on this broker
-    replica.partition.leaderId(Some(leaderBrokerId))
-    // remove this replica's partition from the ISR expiration queue
     try {
+      // set the leader for this partition correctly on this broker
+      replica.partition.leaderId(Some(leaderBrokerId))
+      replica.log match {
+        case Some(log) =>  // log is already started
+          log.truncateTo(replica.highWatermark())
+        case None =>
+      }
+      // get leader for this replica
+      val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
+      val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
+      // become follower only if it is not already following the same leader
+      if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
+        info("broker %d becoming follower to leader %d for topic %s partition %d"
+          .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+        // stop fetcher thread to previous leader
+        replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
+        // start fetcher thread to current leader
+        replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
+      }
+      // remove this replica's partition from the ISR expiration queue
       leaderReplicaLock.lock()
       leaderReplicas -= replica.partition
+      info("Broker %d completed the follower state transition to follow leader %d for topic %s partition %d"
+        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
+    }catch {
+      case e => error("Broker %d failed to complete the follower state transition to follow leader %d for topic %s partition %d"
+        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId), e)
     }finally {
       leaderReplicaLock.unlock()
     }
-    replica.log match {
-      case Some(log) =>  // log is already started
-        log.recoverUptoLastCheckpointedHW()
-      case None =>
-    }
-    // get leader for this replica
-    val leaderBroker = ZkUtils.getBrokerInfoFromIds(zkClient, List(leaderBrokerId)).head
-    val currentLeaderBroker = replicaFetcherManager.fetcherSourceBroker(replica.topic, replica.partition.partitionId)
-    // Become follower only if it is not already following the same leader
-    if( currentLeaderBroker == None || currentLeaderBroker.get != leaderBroker.id) {
-      info("broker %d becoming follower to leader %d for topic %s partition %d"
-        .format(config.brokerId, leaderBrokerId, replica.topic, replica.partition.partitionId))
-      // stop fetcher thread to previous leader
-      replicaFetcherManager.removeFetcher(replica.topic, replica.partition.partitionId)
-      // start fetcher thread to current leader
-      replicaFetcherManager.addFetcher(replica.topic, replica.partition.partitionId, replica.logEndOffset(), leaderBroker)
-    }
   }
 
-  def maybeShrinkISR(): Unit = {
+  private def maybeShrinkISR(): Unit = {
     try {
       info("Evaluating ISR list of partitions to see which replicas can be removed from the ISR"
         .format(config.replicaMaxLagTimeMs))
       leaderReplicaLock.lock()
       leaderReplicas.foreach { partition =>
-         // shrink ISR if a follower is slow or stuck
+      // shrink ISR if a follower is slow or stuck
         val outOfSyncReplicas = partition.getOutOfSyncReplicas(config.replicaMaxLagTimeMs, config.replicaMaxLagBytes)
         if(outOfSyncReplicas.size > 0) {
           val newInSyncReplicas = partition.inSyncReplicas -- outOfSyncReplicas
@@ -210,7 +226,7 @@ class ReplicaManager(val config: KafkaCo
     }
   }
 
-  def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
+  private def checkIfISRCanBeExpanded(replica: Replica): Boolean = {
     val partition = ensurePartitionExists(replica.topic, replica.partition.partitionId)
     if(partition.inSyncReplicas.contains(replica)) false
     else if(partition.assignedReplicas().contains(replica)) {
@@ -225,7 +241,7 @@ class ReplicaManager(val config: KafkaCo
     val replicaOpt = getReplica(topic, partition, replicaId)
     replicaOpt match {
       case Some(replica) =>
-        updateReplicaLEO(replica, offset)
+        updateReplicaLeo(replica, offset)
         // check if this replica needs to be added to the ISR
         if(checkIfISRCanBeExpanded(replica)) {
           val newISR = replica.partition.inSyncReplicas + replica
@@ -239,20 +255,45 @@ class ReplicaManager(val config: KafkaCo
     }
   }
 
-  def recordLeaderLogUpdate(topic: String, partition: Int) = {
+  def recordLeaderLogEndOffset(topic: String, partition: Int, logEndOffset: Long) = {
     val replicaOpt = getReplica(topic, partition, config.brokerId)
     replicaOpt match {
-      case Some(replica) =>
-        replica.logEndOffsetUpdateTime(Some(time.milliseconds))
+      case Some(replica) => replica.logEndOffset(Some(logEndOffset))
       case None =>
         throw new KafkaException("No replica %d in replica manager on %d".format(config.brokerId, config.brokerId))
     }
   }
 
-  def close() {
-    info("Closing replica manager on broker " + config.brokerId)
-    isrExpirationScheduler.shutdown()
+  /**
+   * Flushes the highwatermark value for all partitions to the highwatermark file
+   */
+  private def checkpointHighwaterMarks() {
+    val highwaterMarksForAllPartitions = allReplicas.map { partition =>
+      val topic = partition._1._1
+      val partitionId = partition._1._2
+      val localReplicaOpt = partition._2.getReplica(config.brokerId)
+      val hw = localReplicaOpt match {
+        case Some(localReplica) => localReplica.highWatermark()
+        case None =>
+          error("Error while checkpointing highwatermark for topic %s partition %d.".format(topic, partitionId) +
+            " Replica metadata doesn't exist in replica manager on broker " + config.brokerId)
+          0L
+      }
+      (topic, partitionId) -> hw
+    }.toMap
+    highwaterMarkCheckpoint.write(highwaterMarksForAllPartitions)
+    info("Checkpointed highwatermarks")
+  }
+
+  /**
+   * Reads the checkpointed highWatermarks for all partitions
+   * @returns checkpointed value of highwatermark for topic, partition. If one doesn't exist, returns 0
+   */
+  def readCheckpointedHighWatermark(topic: String, partition: Int): Long = highwaterMarkCheckpoint.read(topic, partition)
+
+  def shutdown() {
     replicaFetcherManager.shutdown()
+    checkpointHighwaterMarks()
     info("Replica manager shutdown on broker " + config.brokerId)
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/IteratorTemplate.scala Wed Jul 25 23:42:07 2012
@@ -17,7 +17,6 @@
 
 package kafka.utils
 
-import kafka.common.KafkaException
 import java.lang.IllegalStateException
 
 class State

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/KafkaScheduler.scala Wed Jul 25 23:42:07 2012
@@ -18,20 +18,24 @@
 package kafka.utils
 
 import java.util.concurrent._
-import java.util.concurrent.atomic._
+import atomic._
+import collection.mutable.HashMap
 
 /**
  * A scheduler for running jobs in the background
  */
-class KafkaScheduler(val numThreads: Int, val baseThreadName: String, isDaemon: Boolean) extends Logging {
-  private val threadId = new AtomicLong(0)
+class KafkaScheduler(val numThreads: Int) extends Logging {
   private var executor:ScheduledThreadPoolExecutor = null
-  startUp
+  private val daemonThreadFactory = new ThreadFactory() {
+      def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, true)
+    }
+  private val nonDaemonThreadFactory = new ThreadFactory() {
+      def newThread(runnable: Runnable): Thread = Utils.newThread(runnable, false)
+    }
+  private val threadNamesAndIds = new HashMap[String, AtomicInteger]()
 
   def startUp = {
-    executor = new ScheduledThreadPoolExecutor(numThreads, new ThreadFactory() {
-      def newThread(runnable: Runnable): Thread = Utils.daemonThread(baseThreadName + threadId.getAndIncrement, runnable)
-    })
+    executor = new ScheduledThreadPoolExecutor(numThreads)
     executor.setContinueExistingPeriodicTasksAfterShutdownPolicy(false)
     executor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false)
   }
@@ -43,20 +47,26 @@ class KafkaScheduler(val numThreads: Int
       throw new IllegalStateException("Kafka scheduler has not been started")
   }
 
-  def scheduleWithRate(fun: () => Unit, delayMs: Long, periodMs: Long) = {
+  def scheduleWithRate(fun: () => Unit, name: String, delayMs: Long, periodMs: Long, isDaemon: Boolean = true) = {
     ensureExecutorHasStarted
-    executor.scheduleAtFixedRate(Utils.loggedRunnable(fun), delayMs, periodMs, TimeUnit.MILLISECONDS)
+    if(isDaemon)
+      executor.setThreadFactory(daemonThreadFactory)
+    else
+      executor.setThreadFactory(nonDaemonThreadFactory)
+    val threadId = threadNamesAndIds.getOrElseUpdate(name, new AtomicInteger(0))
+    executor.scheduleAtFixedRate(Utils.loggedRunnable(fun, name + threadId.incrementAndGet()), delayMs, periodMs,
+      TimeUnit.MILLISECONDS)
   }
 
   def shutdownNow() {
     ensureExecutorHasStarted
     executor.shutdownNow()
-    info("Forcing shutdown of scheduler " + baseThreadName)
+    info("Forcing shutdown of Kafka scheduler")
   }
 
   def shutdown() {
     ensureExecutorHasStarted
     executor.shutdown()
-    info("Shutdown scheduler " + baseThreadName)
+    info("Shutdown Kafka scheduler")
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Wed Jul 25 23:42:07 2012
@@ -58,9 +58,10 @@ object Utils extends Logging {
    * @param fun A function
    * @return A Runnable that just executes the function
    */
-  def loggedRunnable(fun: () => Unit): Runnable =
+  def loggedRunnable(fun: () => Unit, name: String): Runnable =
     new Runnable() {
       def run() = {
+        Thread.currentThread().setName(name)
         try {
           fun()
         }
@@ -74,6 +75,14 @@ object Utils extends Logging {
 
   /**
    * Create a daemon thread
+   * @param runnable The runnable to execute in the background
+   * @return The unstarted thread
+   */
+  def daemonThread(runnable: Runnable): Thread =
+    newThread(runnable, true)
+
+  /**
+   * Create a daemon thread
    * @param name The name of the thread
    * @param runnable The runnable to execute in the background
    * @return The unstarted thread
@@ -109,6 +118,23 @@ object Utils extends Logging {
   }
    
   /**
+   * Create a new thread
+   * @param runnable The work for the thread to do
+   * @param daemon Should the thread block JVM shutdown?
+   * @return The unstarted thread
+   */
+  def newThread(runnable: Runnable, daemon: Boolean): Thread = {
+    val thread = new Thread(runnable)
+    thread.setDaemon(daemon)
+    thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
+      def uncaughtException(t: Thread, e: Throwable) {
+        error("Uncaught exception in thread '" + t.getName + "':", e)
+      }
+    })
+    thread
+  }
+
+  /**
    * Read a byte array from the given offset and size in the buffer
    * TODO: Should use System.arraycopy
    */

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala Wed Jul 25 23:42:07 2012
@@ -37,6 +37,7 @@ class LogManagerTest extends JUnit3Suite
   val zookeeperConnect = TestZKUtils.zookeeperConnect
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
+  val scheduler = new KafkaScheduler(2)
 
   override def setUp() {
     super.setUp()
@@ -45,7 +46,8 @@ class LogManagerTest extends JUnit3Suite
                    override val logFileSize = 1024
                    override val flushInterval = 100
                  }
-    logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
+    scheduler.startUp
+    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
     logDir = logManager.logDir
 
@@ -56,8 +58,9 @@ class LogManagerTest extends JUnit3Suite
   }
 
   override def tearDown() {
+    scheduler.shutdown()
     if(logManager != null)
-      logManager.close()
+      logManager.shutdown()
     Utils.rm(logDir)
     super.tearDown()
   }
@@ -114,7 +117,7 @@ class LogManagerTest extends JUnit3Suite
     val retentionHours = 1
     val retentionMs = 1000 * 60 * 60 * retentionHours
     val props = TestUtils.createBrokerConfig(0, -1)
-    logManager.close
+    logManager.shutdown()
     Thread.sleep(100)
     config = new KafkaConfig(props) {
       override val logFileSize = (10 * (setSize - 1)).asInstanceOf[Int] // each segment will be 10 messages
@@ -122,7 +125,7 @@ class LogManagerTest extends JUnit3Suite
       override val logRetentionHours = retentionHours
       override val flushInterval = 100
     }
-    logManager = new LogManager(config, time, veryLargeLogFlushInterval, retentionMs, false)
+    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, retentionMs, false)
     logManager.startup
 
     // create a log
@@ -159,7 +162,7 @@ class LogManagerTest extends JUnit3Suite
   @Test
   def testTimeBasedFlush() {
     val props = TestUtils.createBrokerConfig(0, -1)
-    logManager.close
+    logManager.shutdown()
     Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 1024 *1024 *1024
@@ -167,7 +170,7 @@ class LogManagerTest extends JUnit3Suite
                    override val flushInterval = Int.MaxValue
                    override val flushIntervalMap = Utils.getTopicFlushIntervals("timebasedflush:100")
                  }
-    logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
+    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
     val log = logManager.getOrCreateLog(name, 0)
     for(i <- 0 until 200) {
@@ -182,15 +185,14 @@ class LogManagerTest extends JUnit3Suite
   @Test
   def testConfigurablePartitions() {
     val props = TestUtils.createBrokerConfig(0, -1)
-    logManager.close
+    logManager.shutdown()
     Thread.sleep(100)
     config = new KafkaConfig(props) {
                    override val logFileSize = 256
                    override val topicPartitionsMap = Utils.getTopicPartitions("testPartition:2")
                    override val flushInterval = 100
                  }
-
-    logManager = new LogManager(config, time, veryLargeLogFlushInterval, maxLogAge, false)
+    logManager = new LogManager(config, scheduler, time, veryLargeLogFlushInterval, maxLogAge, false)
     logManager.startup
 
     for(i <- 0 until 1) {

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala Wed Jul 25 23:42:07 2012
@@ -13,7 +13,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package kafka.producer
 
@@ -35,7 +35,7 @@ import java.util
 
 class ProducerTest extends JUnit3Suite with ZooKeeperTestHarness {
   private val brokerId1 = 0
-  private val brokerId2 = 1  
+  private val brokerId2 = 1
   private val ports = TestUtils.choosePorts(2)
   private val (port1, port2) = (ports(0), ports(1))
   private var server1: KafkaServer = null
@@ -80,7 +80,7 @@ class ProducerTest extends JUnit3Suite w
     server2.shutdown
     server2.awaitShutdown()
     Utils.rm(server1.config.logDir)
-    Utils.rm(server2.config.logDir)    
+    Utils.rm(server2.config.logDir)
     Thread.sleep(500)
     super.tearDown()
   }
@@ -97,7 +97,7 @@ class ProducerTest extends JUnit3Suite w
     val props2 = new util.Properties()
     props2.putAll(props1)
     props2.put("producer.request.required.acks", "3")
-    props2.put("producer.request.ack.timeout.ms", "1000")
+    props2.put("producer.request.timeout.ms", "1000")
 
     val config1 = new ProducerConfig(props1)
     val config2 = new ProducerConfig(props2)
@@ -108,33 +108,28 @@ class ProducerTest extends JUnit3Suite w
 
     val producer1 = new Producer[String, String](config1)
     val producer2 = new Producer[String, String](config2)
-    try {
-      // Available partition ids should be 0.
-      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
-      // get the leader
-      val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
-      assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
-      val leader = leaderOpt.get
-
-      val messageSet = if(leader == server1.config.brokerId) {
-        val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-        response1.messageSet("new-topic", 0).iterator
-      }else {
-        val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
-        response2.messageSet("new-topic", 0).iterator
-      }
-      assertTrue("Message set should have 1 message", messageSet.hasNext)
-
-      assertEquals(new Message("test1".getBytes), messageSet.next.message)
-      assertTrue("Message set should have 1 message", messageSet.hasNext)
-      assertEquals(new Message("test1".getBytes), messageSet.next.message)
-      assertFalse("Message set should not have any more messages", messageSet.hasNext)
-    } catch {
-      case e: Exception => fail("Not expected", e)
-    } finally {
-      producer1.close()
-    }
+    // Available partition ids should be 0.
+    producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+    producer1.send(new ProducerData[String, String]("new-topic", "test", Array("test1")))
+    // get the leader
+    val leaderOpt = ZkUtils.getLeaderForPartition(zkClient, "new-topic", 0)
+    assertTrue("Leader for topic new-topic partition 0 should exist", leaderOpt.isDefined)
+    val leader = leaderOpt.get
+
+    val messageSet = if(leader == server1.config.brokerId) {
+      val response1 = consumer1.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      response1.messageSet("new-topic", 0).iterator
+    }else {
+      val response2 = consumer2.fetch(new FetchRequestBuilder().addFetch("new-topic", 0, 0, 10000).build())
+      response2.messageSet("new-topic", 0).iterator
+    }
+    assertTrue("Message set should have 1 message", messageSet.hasNext)
+
+    assertEquals(new Message("test1".getBytes), messageSet.next.message)
+    assertTrue("Message set should have 1 message", messageSet.hasNext)
+    assertEquals(new Message("test1".getBytes), messageSet.next.message)
+    assertFalse("Message set should not have any more messages", messageSet.hasNext)
+    producer1.close()
 
     try {
       producer2.send(new ProducerData[String, String]("new-topic", "test", Array("test2")))

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala?rev=1365841&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala (added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala Wed Jul 25 23:42:07 2012
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package kafka.server
+
+import kafka.log.Log
+import org.I0Itec.zkclient.ZkClient
+import org.scalatest.junit.JUnit3Suite
+import org.easymock.EasyMock
+import org.junit.Assert._
+import kafka.utils.{KafkaScheduler, TestUtils, MockTime}
+import kafka.common.KafkaException
+
+class HighwatermarkPersistenceTest extends JUnit3Suite {
+
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val defaultFlushIntervalMs = 100
+  })
+  val topic = "foo"
+
+  def testHighWatermarkPersistenceSinglePartition() {
+    // mock zkclient
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+    // create kafka scheduler
+    val scheduler = new KafkaScheduler(2)
+    scheduler.startUp
+    // create replica manager
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+    replicaManager.startup()
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    var fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+    assertEquals(0L, fooPartition0Hw)
+    val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
+    // create leader log
+    val log0 = getMockLog
+    // create leader and follower replicas
+    val leaderReplicaPartition0 = replicaManager.addLocalReplica(topic, 0, log0, configs.map(_.brokerId).toSet)
+    val followerReplicaPartition0 = replicaManager.addRemoteReplica(topic, 0, configs.last.brokerId, partition0)
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+    assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+    try {
+      followerReplicaPartition0.highWatermark()
+      fail("Should fail with IllegalStateException")
+    }catch {
+      case e: KafkaException => // this is ok
+    }
+    // set the leader
+    partition0.leaderId(Some(leaderReplicaPartition0.brokerId))
+    // set the highwatermark for local replica
+    partition0.leaderHW(Some(5L))
+    // sleep until flush interval
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    fooPartition0Hw = replicaManager.readCheckpointedHighWatermark(topic, 0)
+    assertEquals(leaderReplicaPartition0.highWatermark(), fooPartition0Hw)
+    EasyMock.verify(zkClient)
+    EasyMock.verify(log0)
+  }
+
+  def testHighWatermarkPersistenceMultiplePartitions() {
+    val topic1 = "foo1"
+    val topic2 = "foo2"
+    // mock zkclient
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+    // create kafka scheduler
+    val scheduler = new KafkaScheduler(2)
+    scheduler.startUp
+    // create replica manager
+    val replicaManager = new ReplicaManager(configs.head, new MockTime(), zkClient, scheduler)
+    replicaManager.startup()
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    var topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(0L, topic1Partition0Hw)
+    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, configs.map(_.brokerId).toSet)
+    // create leader log
+    val topic1Log0 = getMockLog
+    // create leader and follower replicas
+    val leaderReplicaTopic1Partition0 = replicaManager.addLocalReplica(topic1, 0, topic1Log0, configs.map(_.brokerId).toSet)
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(leaderReplicaTopic1Partition0.highWatermark(), topic1Partition0Hw)
+    // set the leader
+    topic1Partition0.leaderId(Some(leaderReplicaTopic1Partition0.brokerId))
+    // set the highwatermark for local replica
+    topic1Partition0.leaderHW(Some(5L))
+    // sleep until flush interval
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark())
+    assertEquals(5L, topic1Partition0Hw)
+    // add another partition and set highwatermark
+    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, configs.map(_.brokerId).toSet)
+    // create leader log
+    val topic2Log0 = getMockLog
+    // create leader and follower replicas
+    val leaderReplicaTopic2Partition0 = replicaManager.addLocalReplica(topic2, 0, topic2Log0, configs.map(_.brokerId).toSet)
+    // sleep until flush ms
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    var topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+    assertEquals(leaderReplicaTopic2Partition0.highWatermark(), topic2Partition0Hw)
+    // set the leader
+    topic2Partition0.leaderId(Some(leaderReplicaTopic2Partition0.brokerId))
+    // set the highwatermark for local replica
+    topic2Partition0.leaderHW(Some(15L))
+    assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark())
+    // change the highwatermark for topic1
+    topic1Partition0.leaderHW(Some(10L))
+    assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark())
+    // sleep until flush interval
+    Thread.sleep(configs.head.defaultFlushIntervalMs)
+    // verify checkpointed hw for topic 2
+    topic2Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic2, 0)
+    assertEquals(15L, topic2Partition0Hw)
+    // verify checkpointed hw for topic 1
+    topic1Partition0Hw = replicaManager.readCheckpointedHighWatermark(topic1, 0)
+    assertEquals(10L, topic1Partition0Hw)
+    EasyMock.verify(zkClient)
+    EasyMock.verify(topic1Log0)
+    EasyMock.verify(topic2Log0)
+  }
+
+  private def getMockLog: Log = {
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.replay(log)
+    log
+  }
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala Wed Jul 25 23:42:07 2012
@@ -22,9 +22,9 @@ import collection.mutable.Map
 import kafka.cluster.{Partition, Replica}
 import org.easymock.EasyMock
 import kafka.log.Log
-import kafka.utils.{Time, MockTime, TestUtils}
 import org.junit.Assert._
 import org.I0Itec.zkclient.ZkClient
+import kafka.utils.{KafkaScheduler, Time, MockTime, TestUtils}
 
 class ISRExpirationTest extends JUnit3Suite {
 
@@ -40,7 +40,6 @@ class ISRExpirationTest extends JUnit3Su
     // create leader replica
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(5L).times(12)
-    EasyMock.expect(log.setHW(5L)).times(1)
     EasyMock.replay(log)
 
     // add one partition
@@ -48,10 +47,10 @@ class ISRExpirationTest extends JUnit3Su
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
     // set remote replicas leo to something low, like 2
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 2))
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 2))
 
     time.sleep(150)
-    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    leaderReplica.logEndOffset(Some(5L))
 
     var partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -60,9 +59,9 @@ class ISRExpirationTest extends JUnit3Su
     partition0.inSyncReplicas ++= partition0.assignedReplicas()
     assertEquals("Replica 1 should be in sync", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
 
-    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    leaderReplica.logEndOffset(Some(5L))
     // let the follower catch up only upto 3
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 3))
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 3))
     time.sleep(150)
     // now follower broker id 1 has caught upto only 3, while the leader is at 5 AND follower broker id 1 hasn't
     // pulled any data for > replicaMaxLagTimeMs ms. So it is stuck
@@ -80,12 +79,12 @@ class ISRExpirationTest extends JUnit3Su
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
     // set remote replicas leo to something low, like 4
-    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLEO(_, 4))
+    (partition0.assignedReplicas() - leaderReplica).foreach(partition0.updateReplicaLeo(_, 4))
 
     time.sleep(150)
-    leaderReplica.logEndOffsetUpdateTime(Some(time.milliseconds))
+    leaderReplica.logEndOffset(Some(15L))
     time.sleep(10)
-    (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+    (partition0.inSyncReplicas - leaderReplica).foreach(r => r.logEndOffset(Some(4)))
 
     val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
     assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -98,8 +97,10 @@ class ISRExpirationTest extends JUnit3Su
     // mock zkclient
     val zkClient = EasyMock.createMock(classOf[ZkClient])
     EasyMock.replay(zkClient)
+    // create kafka scheduler
+    val scheduler = new KafkaScheduler(2)
     // create replica manager
-    val replicaManager = new ReplicaManager(configs.head, time, zkClient)
+    val replicaManager = new ReplicaManager(configs.head, time, zkClient, scheduler)
     try {
       val partition0 = replicaManager.getOrCreatePartition(topic, 0, configs.map(_.brokerId).toSet)
       // create leader log
@@ -115,7 +116,7 @@ class ISRExpirationTest extends JUnit3Su
       partition0.leaderHW(Some(5L))
 
       // set the leo for non-leader replicas to something low
-      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLEO(r, 2))
+      (partition0.assignedReplicas() - leaderReplicaPartition0).foreach(r => partition0.updateReplicaLeo(r, 2))
 
       val log1 = getLogWithHW(15L)
       // create leader and follower replicas for partition 1
@@ -129,13 +130,13 @@ class ISRExpirationTest extends JUnit3Su
       partition1.leaderHW(Some(15L))
 
       // set the leo for non-leader replicas to something low
-      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLEO(r, 4))
+      (partition1.assignedReplicas() - leaderReplicaPartition1).foreach(r => partition1.updateReplicaLeo(r, 4))
 
       time.sleep(150)
-      leaderReplicaPartition0.logEndOffsetUpdateTime(Some(time.milliseconds))
-      leaderReplicaPartition1.logEndOffsetUpdateTime(Some(time.milliseconds))
+      leaderReplicaPartition0.logEndOffset(Some(4L))
+      leaderReplicaPartition1.logEndOffset(Some(4L))
       time.sleep(10)
-      (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffsetUpdateTime(Some(time.milliseconds)))
+      (partition1.inSyncReplicas - leaderReplicaPartition1).foreach(r => r.logEndOffset(Some(4L)))
 
       val partition0OSR = partition0.getOutOfSyncReplicas(configs.head.replicaMaxLagTimeMs, configs.head.replicaMaxLagBytes)
       assertEquals("Replica 1 should be out of sync", Set(configs.last.brokerId), partition0OSR.map(_.brokerId))
@@ -148,16 +149,16 @@ class ISRExpirationTest extends JUnit3Su
     }catch {
       case e => e.printStackTrace()
     }finally {
-      replicaManager.close()
+      replicaManager.shutdown()
     }
   }
 
   private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
                                                localLog: Log, leaderHW: Long): Partition = {
     val partition = new Partition(topic, partitionId, time)
-    val leaderReplica = new Replica(leaderId, partition, topic, Some(localLog))
+    val leaderReplica = new Replica(leaderId, partition, topic, time, Some(leaderHW), Some(localLog))
 
-    val allReplicas = getFollowerReplicas(partition, leaderId) :+ leaderReplica
+    val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
     partition.assignedReplicas(Some(allReplicas.toSet))
     // set in sync replicas for this partition to all the assigned replicas
     partition.inSyncReplicas = allReplicas.toSet
@@ -170,15 +171,14 @@ class ISRExpirationTest extends JUnit3Su
   private def getLogWithHW(hw: Long): Log = {
     val log1 = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log1.logEndOffset).andReturn(hw).times(6)
-    EasyMock.expect(log1.setHW(hw)).times(1)
     EasyMock.replay(log1)
 
     log1
   }
 
-  private def getFollowerReplicas(partition: Partition, leaderId: Int): Seq[Replica] = {
+  private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica] = {
     configs.filter(_.brokerId != leaderId).map { config =>
-      new Replica(config.brokerId, partition, topic)
+      new Replica(config.brokerId, partition, topic, time)
     }
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala?rev=1365841&r1=1365840&r2=1365841&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala Wed Jul 25 23:42:07 2012
@@ -7,7 +7,6 @@ import kafka.utils.TestUtils._
 import kafka.utils.{Utils, TestUtils}
 import kafka.zk.ZooKeeperTestHarness
 import kafka.message.Message
-import java.io.RandomAccessFile
 import kafka.producer.{ProducerConfig, ProducerData, Producer}
 import org.junit.Test
 
@@ -34,15 +33,12 @@ class LogRecoveryTest extends JUnit3Suit
   val configProps1 = configs.head
   val configProps2 = configs.last
 
-  val server1HWFile = configProps1.logDir + "/" + topic + "-0/highwatermark"
-  val server2HWFile = configProps2.logDir + "/" + topic + "-0/highwatermark"
-
   val sent1 = List(new Message("hello".getBytes()), new Message("there".getBytes()))
   val sent2 = List( new Message("more".getBytes()), new Message("messages".getBytes()))
 
   var producer: Producer[Int, Message] = null
-  var hwFile1: RandomAccessFile = null
-  var hwFile2: RandomAccessFile = null
+  var hwFile1: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps1.logDir)
+  var hwFile2: HighwaterMarkCheckpoint = new HighwaterMarkCheckpoint(configProps2.logDir)
   var servers: Seq[KafkaServer] = Seq.empty[KafkaServer]
 
   @Test
@@ -66,22 +62,15 @@ class LogRecoveryTest extends JUnit3Suit
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
-    sendMessages()
-
-    hwFile1 = new RandomAccessFile(server1HWFile, "r")
-    hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
-    sendMessages()
+    sendMessages(2)
     // don't wait for follower to read the leader's hw
     // shutdown the servers to allow the hw to be checkpointed
     servers.map(server => server.shutdown())
     producer.close()
-    val leaderHW = readHW(hwFile1)
+    val leaderHW = hwFile1.read(topic, 0)
     assertEquals(60L, leaderHW)
-    val followerHW = readHW(hwFile2)
+    val followerHW = hwFile2.read(topic, 0)
     assertEquals(30L, followerHW)
-    hwFile1.close()
-    hwFile2.close()
     servers.map(server => Utils.rm(server.config.logDir))
   }
 
@@ -105,16 +94,13 @@ class LogRecoveryTest extends JUnit3Suit
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
-    hwFile1 = new RandomAccessFile(server1HWFile, "r")
-    hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
-    assertEquals(0L, readHW(hwFile1))
+    assertEquals(0L, hwFile1.read(topic, 0))
 
     sendMessages()
 
     // kill the server hosting the preferred replica
     server1.shutdown()
-    assertEquals(30L, readHW(hwFile1))
+    assertEquals(30L, hwFile1.read(topic, 0))
 
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
@@ -126,10 +112,10 @@ class LogRecoveryTest extends JUnit3Suit
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must remain on broker 1", 1, leader.getOrElse(-1))
 
-    assertEquals(30L, readHW(hwFile1))
+    assertEquals(30L, hwFile1.read(topic, 0))
     // since server 2 was never shut down, the hw value of 30 is probably not checkpointed to disk yet
     server2.shutdown()
-    assertEquals(30L, readHW(hwFile2))
+    assertEquals(30L, hwFile2.read(topic, 0))
 
     server2.startup()
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
@@ -137,17 +123,13 @@ class LogRecoveryTest extends JUnit3Suit
 
     sendMessages()
     // give some time for follower 1 to record leader HW of 60
-    Thread.sleep(500)
+    TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 500)
+
     // shutdown the servers to allow the hw to be checkpointed
     servers.map(server => server.shutdown())
-    Thread.sleep(200)
     producer.close()
-    assert(hwFile1.length() > 0)
-    assert(hwFile2.length() > 0)
-    assertEquals(60L, readHW(hwFile1))
-    assertEquals(60L, readHW(hwFile2))
-    hwFile1.close()
-    hwFile2.close()
+    assertEquals(60L, hwFile1.read(topic, 0))
+    assertEquals(60L, hwFile2.read(topic, 0))
     servers.map(server => Utils.rm(server.config.logDir))
   }
 
@@ -160,14 +142,14 @@ class LogRecoveryTest extends JUnit3Suit
       override val logFileSize = 30
     })
 
-    val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
-    val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
-
     // start both servers
     server1 = TestUtils.createServer(configs.head)
     server2 = TestUtils.createServer(configs.last)
     servers ++= List(server1, server2)
 
+    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
+    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+
     val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
     producerProps.put("producer.request.required.acks", "-1")
@@ -181,25 +163,16 @@ class LogRecoveryTest extends JUnit3Suit
     assertTrue("Leader should get elected", leader.isDefined)
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
-
-    sendMessages(10)
-
-    hwFile1 = new RandomAccessFile(server1HWFile, "r")
-    hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
-    sendMessages(10)
-
+    sendMessages(20)
     // give some time for follower 1 to record leader HW of 600
-    Thread.sleep(500)
+    TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 600L, 500)
     // shutdown the servers to allow the hw to be checkpointed
     servers.map(server => server.shutdown())
     producer.close()
-    val leaderHW = readHW(hwFile1)
+    val leaderHW = hwFile1.read(topic, 0)
     assertEquals(600L, leaderHW)
-    val followerHW = readHW(hwFile2)
+    val followerHW = hwFile2.read(topic, 0)
     assertEquals(600L, followerHW)
-    hwFile1.close()
-    hwFile2.close()
     servers.map(server => Utils.rm(server.config.logDir))
   }
 
@@ -208,19 +181,18 @@ class LogRecoveryTest extends JUnit3Suit
       override val replicaMaxLagTimeMs = 5000L
       override val replicaMaxLagBytes = 10L
       override val flushInterval = 1000
-      override val flushSchedulerThreadRate = 10
       override val replicaMinBytes = 20
       override val logFileSize = 30
     })
 
-    val server1HWFile = configs.head.logDir + "/" + topic + "-0/highwatermark"
-    val server2HWFile = configs.last.logDir + "/" + topic + "-0/highwatermark"
-
     // start both servers
     server1 = TestUtils.createServer(configs.head)
     server2 = TestUtils.createServer(configs.last)
     servers ++= List(server1, server2)
 
+    hwFile1 = new HighwaterMarkCheckpoint(server1.config.logDir)
+    hwFile2 = new HighwaterMarkCheckpoint(server2.config.logDir)
+
     val producerProps = getProducerConfig(zkConnect, 64*1024, 100000, 10000)
     producerProps.put("producer.request.timeout.ms", "1000")
     producerProps.put("producer.request.required.acks", "-1")
@@ -235,43 +207,36 @@ class LogRecoveryTest extends JUnit3Suit
     // NOTE: this is to avoid transient test failures
     assertTrue("Leader could be broker 0 or broker 1", (leader.getOrElse(-1) == 0) || (leader.getOrElse(-1) == 1))
 
-    val hwFile1 = new RandomAccessFile(server1HWFile, "r")
-    val hwFile2 = new RandomAccessFile(server2HWFile, "r")
-
     sendMessages(2)
     // allow some time for the follower to get the leader HW
-    Thread.sleep(1000)
+    TestUtils.waitUntilTrue(() => server2.getReplica(topic, 0).get.highWatermark() == 60L, 1000)
     // kill the server hosting the preferred replica
     server1.shutdown()
     server2.shutdown()
-    assertEquals(60L, readHW(hwFile1))
-    assertEquals(60L, readHW(hwFile2))
+    assertEquals(60L, hwFile1.read(topic, 0))
+    assertEquals(60L, hwFile2.read(topic, 0))
 
     server2.startup()
     // check if leader moves to the other server
     leader = waitUntilLeaderIsElected(zkClient, topic, partitionId, 500)
     assertEquals("Leader must move to broker 1", 1, leader.getOrElse(-1))
 
-    assertEquals(60L, readHW(hwFile1))
+    assertEquals(60L, hwFile1.read(topic, 0))
 
     // bring the preferred replica back
     server1.startup()
 
-    assertEquals(60L, readHW(hwFile1))
-    assertEquals(60L, readHW(hwFile2))
+    assertEquals(60L, hwFile1.read(topic, 0))
+    assertEquals(60L, hwFile2.read(topic, 0))
 
     sendMessages(2)
     // allow some time for the follower to get the leader HW
-    Thread.sleep(1000)
+    TestUtils.waitUntilTrue(() => server1.getReplica(topic, 0).get.highWatermark() == 120L, 1000)
     // shutdown the servers to allow the hw to be checkpointed
     servers.map(server => server.shutdown())
     producer.close()
-    assert(hwFile1.length() > 0)
-    assert(hwFile2.length() > 0)
-    assertEquals(120L, readHW(hwFile1))
-    assertEquals(120L, readHW(hwFile2))
-    hwFile1.close()
-    hwFile2.close()
+    assertEquals(120L, hwFile1.read(topic, 0))
+    assertEquals(120L, hwFile2.read(topic, 0))
     servers.map(server => Utils.rm(server.config.logDir))
   }
 
@@ -280,9 +245,4 @@ class LogRecoveryTest extends JUnit3Suit
       producer.send(new ProducerData[Int, Message](topic, 0, sent1))
     }
   }
-
-  private def readHW(hwFile: RandomAccessFile): Long = {
-    hwFile.seek(0)
-    hwFile.readLong()
-  }
 }
\ No newline at end of file