You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/08/19 17:07:11 UTC

[1/2] kafka git commit: KAFKA-3163; Add time based index to Kafka.

Repository: kafka
Updated Branches:
  refs/heads/trunk 05ed54bf2 -> 79d3fd2bf


http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index 15d4eea..aadda86 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -400,7 +400,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
    * If no format conversion or value overwriting is required for messages, this method will perform in-place
    * operations and avoid re-compression.
    *
-   * Returns the message set and a boolean indicating whether the message sizes may have changed.
+   * Returns a ValidationAndOffsetAssignResult containing the validated message set, maximum timestamp, the offset
+   * of the shallow message with the max timestamp and a boolean indicating whether the message sizes may have changed.
    */
   private[kafka] def validateMessagesAndAssignOffsets(offsetCounter: LongRef,
                                                       now: Long,
@@ -409,18 +410,16 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
                                                       compactedTopic: Boolean = false,
                                                       messageFormatVersion: Byte = Message.CurrentMagicValue,
                                                       messageTimestampType: TimestampType,
-                                                      messageTimestampDiffMaxMs: Long): (ByteBufferMessageSet, Boolean) = {
+                                                      messageTimestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
     if (sourceCodec == NoCompressionCodec && targetCodec == NoCompressionCodec) {
       // check the magic value
-      if (!isMagicValueInAllWrapperMessages(messageFormatVersion)) {
-        // Message format conversion
-        (convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
-          messageFormatVersion), true)
-      } else {
+      if (!isMagicValueInAllWrapperMessages(messageFormatVersion))
+        convertNonCompressedMessages(offsetCounter, compactedTopic, now, messageTimestampType, messageTimestampDiffMaxMs,
+          messageFormatVersion)
+      else
         // Do in-place validation, offset assignment and maybe set timestamp
-        (validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
-          messageTimestampDiffMaxMs), false)
-      }
+        validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter, now, compactedTopic, messageTimestampType,
+          messageTimestampDiffMaxMs)
     } else {
       // Deal with compressed messages
       // We cannot do in place assignment in one of the following situations:
@@ -433,6 +432,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       var inPlaceAssignment = sourceCodec == targetCodec && messageFormatVersion > Message.MagicValue_V0
 
       var maxTimestamp = Message.NoTimestamp
+      var offsetOfMaxTimestamp = -1L
       val expectedInnerOffset = new LongRef(0)
       val validatedMessages = new mutable.ArrayBuffer[Message]
       this.internalIterator(isShallow = false).foreach { messageAndOffset =>
@@ -446,7 +446,10 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           // Check if we need to overwrite offset
           if (messageAndOffset.offset != expectedInnerOffset.getAndIncrement())
             inPlaceAssignment = false
-          maxTimestamp = math.max(maxTimestamp, message.timestamp)
+          if (message.timestamp > maxTimestamp) {
+            maxTimestamp = message.timestamp
+            offsetOfMaxTimestamp = offsetCounter.value + expectedInnerOffset.value - 1
+          }
         }
 
         if (sourceCodec != NoCompressionCodec && message.compressionCodec != NoCompressionCodec)
@@ -462,20 +465,23 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
 
       if (!inPlaceAssignment) {
         // Cannot do in place assignment.
-        val wrapperMessageTimestamp = {
+        val (largestTimestampOfMessageSet, offsetOfMaxTimestampInMessageSet) = {
           if (messageFormatVersion == Message.MagicValue_V0)
-            Some(Message.NoTimestamp)
-          else if (messageFormatVersion > Message.MagicValue_V0 && messageTimestampType == TimestampType.CREATE_TIME)
-            Some(maxTimestamp)
+            (Some(Message.NoTimestamp), -1L)
+          else if (messageTimestampType == TimestampType.CREATE_TIME)
+            (Some(maxTimestamp), {if (targetCodec == NoCompressionCodec) offsetOfMaxTimestamp else offsetCounter.value + validatedMessages.length - 1})
           else // Log append time
-            Some(now)
+            (Some(now), {if (targetCodec == NoCompressionCodec) offsetCounter.value else offsetCounter.value + validatedMessages.length - 1})
         }
 
-        (new ByteBufferMessageSet(compressionCodec = targetCodec,
-                                  offsetCounter = offsetCounter,
-                                  wrapperMessageTimestamp = wrapperMessageTimestamp,
-                                  timestampType = messageTimestampType,
-                                  messages = validatedMessages: _*), true)
+        ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(compressionCodec = targetCodec,
+                                                                                     offsetCounter = offsetCounter,
+                                                                                     wrapperMessageTimestamp = largestTimestampOfMessageSet,
+                                                                                     timestampType = messageTimestampType,
+                                                                                     messages = validatedMessages: _*),
+                                        maxTimestamp = largestTimestampOfMessageSet.get,
+                                        offsetOfMaxTimestamp = offsetOfMaxTimestampInMessageSet,
+                                        messageSizeMaybeChanged = true)
       } else {
         // Do not do re-compression but simply update the offset, timestamp and attributes field of the wrapper message.
         buffer.putLong(0, offsetCounter.addAndGet(validatedMessages.size) - 1)
@@ -487,6 +493,7 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
         val attributeOffset = MessageSet.LogOverhead + Message.AttributesOffset
         val timestamp = buffer.getLong(timestampOffset)
         val attributes = buffer.get(attributeOffset)
+        buffer.putLong(timestampOffset, maxTimestamp)
         if (messageTimestampType == TimestampType.CREATE_TIME && timestamp == maxTimestamp)
           // We don't need to recompute crc if the timestamp is not updated.
           crcUpdateNeeded = false
@@ -503,7 +510,11 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           Utils.writeUnsignedInt(buffer, MessageSet.LogOverhead + Message.CrcOffset, wrapperMessage.computeChecksum)
         }
         buffer.rewind()
-        (this, false)
+        // For compressed messages,
+        ValidationAndOffsetAssignResult(validatedMessages = this,
+                                        maxTimestamp = buffer.getLong(timestampOffset),
+                                        offsetOfMaxTimestamp = buffer.getLong(0),
+                                        messageSizeMaybeChanged = false)
       }
     }
   }
@@ -516,12 +527,14 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
                                            now: Long,
                                            timestampType: TimestampType,
                                            messageTimestampDiffMaxMs: Long,
-                                           toMagicValue: Byte): ByteBufferMessageSet = {
+                                           toMagicValue: Byte): ValidationAndOffsetAssignResult = {
     val sizeInBytesAfterConversion = shallowValidBytes + this.internalIterator(isShallow = true).map { messageAndOffset =>
       Message.headerSizeDiff(messageAndOffset.message.magic, toMagicValue)
     }.sum
     val newBuffer = ByteBuffer.allocate(sizeInBytesAfterConversion)
     var newMessagePosition = 0
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
     this.internalIterator(isShallow = true).foreach { case MessageAndOffset(message, _) =>
       validateMessageKey(message, compactedTopic)
       validateTimestamp(message, now, timestampType, messageTimestampDiffMaxMs)
@@ -532,20 +545,31 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
       val newMessageBuffer = newBuffer.slice()
       newMessageBuffer.limit(newMessageSize)
       message.convertToBuffer(toMagicValue, newMessageBuffer, now, timestampType)
-
+      if (toMagicValue > Message.MagicValue_V0) {
+        val timestamp = newMessageBuffer.getLong(Message.TimestampOffset)
+        if (maxTimestamp < timestamp) {
+          maxTimestamp = timestamp
+          offsetOfMaxTimestamp = offsetCounter.value - 1
+        }
+      }
       newMessagePosition += MessageSet.LogOverhead + newMessageSize
     }
     newBuffer.rewind()
-    new ByteBufferMessageSet(newBuffer)
+    new ValidationAndOffsetAssignResult(validatedMessages = new ByteBufferMessageSet(newBuffer),
+                                        maxTimestamp = maxTimestamp,
+                                        offsetOfMaxTimestamp = offsetOfMaxTimestamp,
+                                        messageSizeMaybeChanged = true)
   }
 
   private def validateNonCompressedMessagesAndAssignOffsetInPlace(offsetCounter: LongRef,
                                                                   now: Long,
                                                                   compactedTopic: Boolean,
                                                                   timestampType: TimestampType,
-                                                                  timestampDiffMaxMs: Long): ByteBufferMessageSet = {
+                                                                  timestampDiffMaxMs: Long): ValidationAndOffsetAssignResult = {
     // do in-place validation and offset assignment
     var messagePosition = 0
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
     buffer.mark()
     while (messagePosition < sizeInBytes - MessageSet.LogOverhead) {
       buffer.position(messagePosition)
@@ -562,11 +586,19 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
           message.buffer.put(Message.AttributesOffset, timestampType.updateAttributes(message.attributes))
           Utils.writeUnsignedInt(message.buffer, Message.CrcOffset, message.computeChecksum)
         }
+        if (message.timestamp > maxTimestamp) {
+          maxTimestamp = message.timestamp
+          offsetOfMaxTimestamp = offsetCounter.value - 1
+        }
       }
+
       messagePosition += MessageSet.LogOverhead + messageSize
     }
     buffer.reset()
-    this
+    ValidationAndOffsetAssignResult(validatedMessages = this,
+                                    maxTimestamp = maxTimestamp,
+                                    offsetOfMaxTimestamp = offsetOfMaxTimestamp,
+                                    messageSizeMaybeChanged = false)
   }
 
   private def validateMessageKey(message: Message, compactedTopic: Boolean) {
@@ -614,3 +646,8 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
   override def hashCode: Int = buffer.hashCode
 
 }
+
+case class ValidationAndOffsetAssignResult(validatedMessages: ByteBufferMessageSet,
+                                           maxTimestamp: Long,
+                                           offsetOfMaxTimestamp: Long,
+                                           messageSizeMaybeChanged: Boolean)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/message/MessageAndOffset.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala
index 51edf9f..fab6898 100644
--- a/core/src/main/scala/kafka/message/MessageAndOffset.scala
+++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala
@@ -24,5 +24,13 @@ case class MessageAndOffset(message: Message, offset: Long) {
    * Compute the offset of the next message in the log
    */
   def nextOffset: Long = offset + 1
+
+  /**
+   * We need to decompress the message, if required, to get the offset of the first uncompressed message.
+   */
+  def firstOffset: Long = message.compressionCodec match {
+    case NoCompressionCodec => offset
+    case _ => ByteBufferMessageSet.deepIterator(this).next().offset
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index bb219ca..6eb574f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -38,10 +38,11 @@ import kafka.utils.{Logging, SystemTime, ZKGroupTopicDirs, ZkUtils}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, InvalidTopicException, NotLeaderForPartitionException, UnknownTopicOrPartitionException, TopicExistsException}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.{ApiKeys, Errors, Protocol, SecurityProtocol}
-import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, SaslHandshakeResponse, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
+import org.apache.kafka.common.requests.{ApiVersionsResponse, DescribeGroupsRequest, DescribeGroupsResponse, GroupCoordinatorRequest, GroupCoordinatorResponse, HeartbeatRequest, HeartbeatResponse, JoinGroupRequest, JoinGroupResponse, LeaderAndIsrRequest, LeaderAndIsrResponse, LeaveGroupRequest, LeaveGroupResponse, ListGroupsResponse, ListOffsetRequest, ListOffsetResponse, MetadataRequest, MetadataResponse, OffsetCommitRequest, OffsetCommitResponse, OffsetFetchRequest, OffsetFetchResponse, ProduceRequest, ProduceResponse, ResponseHeader, ResponseSend, StopReplicaRequest, StopReplicaResponse, SyncGroupRequest, SyncGroupResponse, UpdateMetadataRequest, UpdateMetadataResponse, CreateTopicsRequest, CreateTopicsResponse, DeleteTopicsRequest, DeleteTopicsResponse}
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{Node, TopicPartition}
+import org.apache.kafka.common.requests.SaslHandshakeResponse
 
 import scala.collection._
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 2b97783..f94cbf9 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -335,7 +335,7 @@ class ReplicaManager(val config: KafkaConfig,
         topicPartition ->
                 ProducePartitionStatus(
                   result.info.lastOffset + 1, // required offset
-                  new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.timestamp)) // response status
+                  new PartitionResponse(result.errorCode, result.info.firstOffset, result.info.logAppendTime)) // response status
       }
 
       if (delayedRequestRequired(requiredAcks, messagesPerPartition, localProduceResults)) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index dc99672..0a659f4 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -25,12 +25,13 @@ import kafka.coordinator.{GroupMetadataKey, GroupMetadataManager, OffsetKey}
 import kafka.log._
 import kafka.message._
 import kafka.serializer.Decoder
-import kafka.utils.{VerifiableProperties, _}
+import kafka.utils._
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.utils.Utils
 
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 object DumpLogSegments {
 
@@ -85,6 +86,7 @@ object DumpLogSegments {
     }
 
     val misMatchesForIndexFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
+    val timeIndexDumpErrors = new TimeIndexDumpErrors
     val nonConsecutivePairsForLogFilesMap = new mutable.HashMap[String, List[(Long, Long)]]
 
     for(arg <- files) {
@@ -95,8 +97,12 @@ object DumpLogSegments {
       } else if(file.getName.endsWith(Log.IndexFileSuffix)) {
         println("Dumping " + file)
         dumpIndex(file, indexSanityOnly, verifyOnly, misMatchesForIndexFilesMap, maxMessageSize)
+      } else if(file.getName.endsWith(Log.TimeIndexFileSuffix)) {
+        println("Dumping " + file)
+        dumpTimeIndex(file, indexSanityOnly, verifyOnly, timeIndexDumpErrors, maxMessageSize)
       }
     }
+
     misMatchesForIndexFilesMap.foreach {
       case (fileName, listOfMismatches) => {
         System.err.println("Mismatches in :" + fileName)
@@ -105,6 +111,9 @@ object DumpLogSegments {
         })
       }
     }
+
+    timeIndexDumpErrors.printErrors()
+
     nonConsecutivePairsForLogFilesMap.foreach {
       case (fileName, listOfNonConsecutivePairs) => {
         System.err.println("Non-secutive offsets in :" + fileName)
@@ -150,6 +159,58 @@ object DumpLogSegments {
     }
   }
 
+  private def dumpTimeIndex(file: File,
+                            indexSanityOnly: Boolean,
+                            verifyOnly: Boolean,
+                            timeIndexDumpErrors: TimeIndexDumpErrors,
+                            maxMessageSize: Int) {
+    val startOffset = file.getName().split("\\.")(0).toLong
+    val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
+    val messageSet = new FileMessageSet(logFile, false)
+    val indexFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.IndexFileSuffix)
+    val index = new OffsetIndex(indexFile, baseOffset = startOffset)
+    val timeIndex = new TimeIndex(file, baseOffset = startOffset)
+
+    //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not.
+    if (indexSanityOnly) {
+      timeIndex.sanityCheck
+      println(s"$file passed sanity check.")
+      return
+    }
+
+    var prevTimestamp = Message.NoTimestamp
+    for(i <- 0 until timeIndex.entries) {
+      val entry = timeIndex.entry(i)
+      val position = index.lookup(entry.offset + timeIndex.baseOffset).position
+      val partialFileMessageSet: FileMessageSet = messageSet.read(position, Int.MaxValue)
+      val shallowIter = partialFileMessageSet.iterator
+      var maxTimestamp = Message.NoTimestamp
+      // We first find the message by offset then check if the timestamp is correct.
+      val wrapperMessageOpt = shallowIter.find(_.offset >= entry.offset + timeIndex.baseOffset)
+      if (!wrapperMessageOpt.isDefined || wrapperMessageOpt.get.offset != entry.offset + timeIndex.baseOffset) {
+        timeIndexDumpErrors.recordShallowOffsetNotFound(file, entry.offset + timeIndex.baseOffset,
+          {if (wrapperMessageOpt.isDefined) wrapperMessageOpt.get.offset else -1})
+      } else {
+        val deepIter = getIterator(wrapperMessageOpt.get, isDeepIteration = true)
+        for (messageAndOffset <- deepIter)
+          maxTimestamp = math.max(maxTimestamp, messageAndOffset.message.timestamp)
+
+        if (maxTimestamp != entry.timestamp)
+          timeIndexDumpErrors.recordMismatchTimeIndex(file, entry.timestamp, maxTimestamp)
+
+        if (prevTimestamp >= entry.timestamp)
+          timeIndexDumpErrors.recordOutOfOrderIndexTimestamp(file, entry.timestamp, prevTimestamp)
+
+        // since it is a sparse file, in the event of a crash there may be many zero entries, stop if we see one
+        if (entry.offset == 0 && i > 0)
+          return
+      }
+      if (!verifyOnly)
+        println("timestamp: %s offset: %s".format(entry.timestamp, timeIndex.baseOffset + entry.offset))
+      prevTimestamp = entry.timestamp
+    }
+  }
+
   private trait MessageParser[K, V] {
     def parse(message: Message): (Option[K], Option[V])
   }
@@ -261,7 +322,8 @@ object DumpLogSegments {
         }
         lastOffset = messageAndOffset.offset
 
-        print("offset: " + messageAndOffset.offset + " position: " + validBytes + " isvalid: " + msg.isValid +
+        print("offset: " + messageAndOffset.offset + " position: " + validBytes +
+              " " + msg.timestampType + ": " + msg.timestamp + " isvalid: " + msg.isValid +
               " payloadsize: " + msg.payloadSize + " magic: " + msg.magic +
               " compresscodec: " + msg.compressionCodec + " crc: " + msg.checksum)
         if(msg.hasKey)
@@ -307,4 +369,60 @@ object DumpLogSegments {
     }
   }
 
+  class TimeIndexDumpErrors {
+    val misMatchesForTimeIndexFilesMap = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
+    val outOfOrderTimestamp = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
+    val shallowOffsetNotFound = new mutable.HashMap[String, ArrayBuffer[(Long, Long)]]
+
+    def recordMismatchTimeIndex(file: File, indexTimestamp: Long, logTimestamp: Long) {
+      var misMatchesSeq = misMatchesForTimeIndexFilesMap.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+      if (misMatchesSeq.isEmpty)
+        misMatchesForTimeIndexFilesMap.put(file.getAbsolutePath, misMatchesSeq)
+      misMatchesSeq += ((indexTimestamp, logTimestamp))
+    }
+
+    def recordOutOfOrderIndexTimestamp(file: File, indexTimestamp: Long, prevIndexTimestamp: Long) {
+      var outOfOrderSeq = outOfOrderTimestamp.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+      if (outOfOrderSeq.isEmpty)
+        outOfOrderTimestamp.put(file.getAbsolutePath, outOfOrderSeq)
+      outOfOrderSeq += ((indexTimestamp, prevIndexTimestamp))
+    }
+
+    def recordShallowOffsetNotFound(file: File, indexOffset: Long, logOffset: Long) {
+      var shallowOffsetNotFoundSeq = shallowOffsetNotFound.getOrElse(file.getAbsolutePath, new ArrayBuffer[(Long, Long)]())
+      if (shallowOffsetNotFoundSeq.isEmpty)
+        shallowOffsetNotFound.put(file.getAbsolutePath, shallowOffsetNotFoundSeq)
+      shallowOffsetNotFoundSeq += ((indexOffset, logOffset))
+    }
+
+    def printErrors() {
+      misMatchesForTimeIndexFilesMap.foreach {
+        case (fileName, listOfMismatches) => {
+          System.err.println("Found timestamp mismatch in :" + fileName)
+          listOfMismatches.foreach(m => {
+            System.err.println("  Index timestamp: %d, log timestamp: %d".format(m._1, m._2))
+          })
+        }
+      }
+
+      outOfOrderTimestamp.foreach {
+        case (fileName, outOfOrderTimestamps) => {
+          System.err.println("Found out of order timestamp in :" + fileName)
+          outOfOrderTimestamps.foreach(m => {
+            System.err.println("  Index timestamp: %d, Previously indexed timestamp: %d".format(m._1, m._2))
+          })
+        }
+      }
+
+      shallowOffsetNotFound.foreach {
+        case (fileName, listOfShallowOffsetNotFound) => {
+          System.err.println("The following indexed offsets are not found in the log.")
+          listOfShallowOffsetNotFound.foreach(m => {
+            System.err.println("Indexed offset: %s, found log offset: %s".format(m._1, m._2))
+          })
+        }
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/CleanerTest.scala b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
index 8212121..15920ad 100755
--- a/core/src/test/scala/unit/kafka/log/CleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/CleanerTest.scala
@@ -233,7 +233,7 @@ class CleanerTest extends JUnitSuite {
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
     
     // check grouping by index size
-    val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes()).sum + 1
+    val indexSize = log.logSegments.take(groupSize).map(_.index.sizeInBytes).sum + 1
     groups = cleaner.groupSegmentsBySize(log.logSegments, maxSize = Int.MaxValue, maxIndexSize = indexSize)
     checkSegmentOrder(groups)
     assertTrue("All but the last group should be the target size.", groups.dropRight(1).forall(_.size == groupSize))
@@ -391,8 +391,9 @@ class CleanerTest extends JUnitSuite {
     for (file <- dir.listFiles if file.getName.endsWith(Log.DeletedFileSuffix)) {
       Utils.atomicMoveWithFallback(file.toPath, Paths.get(CoreUtils.replaceSuffix(file.getPath, Log.DeletedFileSuffix, "")))
     }   
+    System.out.println("here")
     log = recoverAndCheck(config, cleanedKeys)
-    
+
     // add some more messages and clean the log again
     while(log.numberOfSegments < 10) {
       log.append(message(log.logEndOffset.toInt, log.logEndOffset.toInt))

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
index a64454d..82496f2 100644
--- a/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
@@ -111,21 +111,21 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
     var position = 0
     assertEquals("Should be able to find the first message by its offset", 
                  OffsetPosition(0L, position), 
-                 messageSet.searchFor(0, 0))
+                 messageSet.searchForOffset(0, 0))
     position += MessageSet.entrySize(messageSet.head.message)
     assertEquals("Should be able to find second message when starting from 0", 
                  OffsetPosition(1L, position), 
-                 messageSet.searchFor(1, 0))
+                 messageSet.searchForOffset(1, 0))
     assertEquals("Should be able to find second message starting from its offset", 
                  OffsetPosition(1L, position), 
-                 messageSet.searchFor(1, position))
+                 messageSet.searchForOffset(1, position))
     position += MessageSet.entrySize(messageSet.tail.head.message) + MessageSet.entrySize(messageSet.tail.tail.head.message)
     assertEquals("Should be able to find fourth message from a non-existant offset", 
                  OffsetPosition(50L, position), 
-                 messageSet.searchFor(3, position))
+                 messageSet.searchForOffset(3, position))
     assertEquals("Should be able to find fourth message by correct offset", 
                  OffsetPosition(50L, position), 
-                 messageSet.searchFor(50,  position))
+                 messageSet.searchForOffset(50,  position))
   }
   
   /**
@@ -134,7 +134,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testIteratorWithLimits() {
     val message = messageSet.toList(1)
-    val start = messageSet.searchFor(1, 0).position
+    val start = messageSet.searchForOffset(1, 0).position
     val size = message.message.size + 12
     val slice = messageSet.read(start, size)
     assertEquals(List(message), slice.toList)
@@ -148,7 +148,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testTruncate() {
     val message = messageSet.toList.head
-    val end = messageSet.searchFor(1, 0).position
+    val end = messageSet.searchForOffset(1, 0).position
     messageSet.truncateTo(end)
     assertEquals(List(message), messageSet.toList)
     assertEquals(MessageSet.entrySize(message.message), messageSet.sizeInBytes)
@@ -272,7 +272,7 @@ class FileMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testFormatConversionWithPartialMessage() {
     val message = messageSet.toList(1)
-    val start = messageSet.searchFor(1, 0).position
+    val start = messageSet.searchForOffset(1, 0).position
     val size = message.message.size + 12
     val slice = messageSet.read(start, size - 1)
     val messageV0 = slice.toMessageFormat(Message.MagicValue_V0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7b52a09..dc4cc79 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -100,7 +100,7 @@ class LogManagerTest {
     time.sleep(maxLogAgeMs + 1)
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes)
 
     try {
@@ -146,7 +146,7 @@ class LogManagerTest {
     time.sleep(logManager.InitialTaskDelayMs)
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
-    assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
+    assertEquals("Files should have been deleted", log.numberOfSegments * 3, log.dir.list.length)
     assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes)
     try {
       log.read(0, 1024)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index edbfd99..64140e8 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -26,19 +26,23 @@ import kafka.message._
 import kafka.utils.SystemTime
 
 import scala.collection._
+ import scala.collection.mutable.ListBuffer
 
 class LogSegmentTest {
   
   val segments = mutable.ArrayBuffer[LogSegment]()
   
   /* create a segment with the given base offset */
-  def createSegment(offset: Long): LogSegment = {
+  def createSegment(offset: Long, indexIntervalBytes: Int = 10): LogSegment = {
     val msFile = TestUtils.tempFile()
     val ms = new FileMessageSet(msFile)
     val idxFile = TestUtils.tempFile()
+    val timeIdxFile = TestUtils.tempFile()
     idxFile.delete()
+    timeIdxFile.delete()
     val idx = new OffsetIndex(idxFile, offset, 1000)
-    val seg = new LogSegment(ms, idx, offset, 10, 0, SystemTime)
+    val timeIdx = new TimeIndex(timeIdxFile, offset, 1500)
+    val seg = new LogSegment(ms, idx, timeIdx, offset, indexIntervalBytes, 0, SystemTime)
     segments += seg
     seg
   }
@@ -47,7 +51,7 @@ class LogSegmentTest {
   def messages(offset: Long, messages: String*): ByteBufferMessageSet = {
     new ByteBufferMessageSet(compressionCodec = NoCompressionCodec, 
                              offsetCounter = new LongRef(offset),
-                             messages = messages.map(s => new Message(s.getBytes)):_*)
+                             messages = messages.map(s => new Message(s.getBytes, offset * 10, Message.MagicValue_V1)):_*)
   }
   
   @After
@@ -76,7 +80,7 @@ class LogSegmentTest {
   def testReadBeforeFirstOffset() {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there", "little", "bee")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet
     assertEquals(ms.toList, read.toList)
   }
@@ -90,7 +94,7 @@ class LogSegmentTest {
     val baseOffset = 50
     val seg = createSegment(baseOffset)
     val ms = messages(baseOffset, "hello", "there", "beautiful")
-    seg.append(baseOffset, ms)
+    seg.append(baseOffset, Message.NoTimestamp, -1L, ms)
     def validate(offset: Long) = 
       assertEquals(ms.filter(_.offset == offset).toList, 
                    seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList)
@@ -106,7 +110,7 @@ class LogSegmentTest {
   def testReadAfterLast() {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
     assertNull("Read beyond the last offset in the segment should give null", read)
   }
@@ -119,9 +123,9 @@ class LogSegmentTest {
   def testReadFromGap() {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, ms2)
+    seg.append(60, Message.NoTimestamp, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.toList, read.messageSet.toList)
   }
@@ -136,9 +140,9 @@ class LogSegmentTest {
     var offset = 40
     for(i <- 0 until 30) {
       val ms1 = messages(offset, "hello")
-      seg.append(offset, ms1)
-      val ms2 = messages(offset+1, "hello")
-      seg.append(offset+1, ms2)
+      seg.append(offset, Message.NoTimestamp, -1L, ms1)
+      val ms2 = messages(offset + 1, "hello")
+      seg.append(offset + 1, Message.NoTimestamp, -1L, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
       assertEquals(List(ms1.head, ms2.head), read.messageSet.toList)
@@ -150,7 +154,25 @@ class LogSegmentTest {
       offset += 1
     }
   }
-  
+
+  @Test
+  def testReloadLargestTimestampAfterTruncation() {
+    val numMessages = 30
+    val seg = createSegment(40, 2 * messages(0, "hello").sizeInBytes - 1)
+    var offset = 40
+    for (i <- 0 until numMessages) {
+      seg.append(offset, offset, offset, messages(offset, "hello"))
+      offset += 1
+    }
+    val expectedNumEntries = numMessages / 2 - 1
+    assertEquals(s"Should have $expectedNumEntries time indexes", expectedNumEntries, seg.timeIndex.entries)
+
+    seg.truncateTo(41)
+    assertEquals(s"Should have 0 time indexes", 0, seg.timeIndex.entries)
+    assertEquals(s"Largest timestamp should be 400", 400L, seg.largestTimestamp)
+
+  }
+
   /**
    * Test truncating the whole segment, and check that we can reappend with the original offset.
    */
@@ -158,12 +180,38 @@ class LogSegmentTest {
   def testTruncateFull() {
     // test the case where we fully truncate the log
     val seg = createSegment(40)
-    seg.append(40, messages(40, "hello", "there"))
+    seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
     seg.truncateTo(0)
     assertNull("Segment should be empty.", seg.read(0, None, 1024))
-    seg.append(40, messages(40, "hello", "there"))    
+    seg.append(40, Message.NoTimestamp, -1L, messages(40, "hello", "there"))
   }
-  
+
+  /**
+   * Append messages with timestamp and search message by timestamp.
+   */
+  @Test
+  def testFindOffsetByTimestamp() {
+    val messageSize = messages(0, s"msg00").sizeInBytes
+    val seg = createSegment(40, messageSize * 2 - 1)
+    // Produce some messages
+    for (i <- 40 until 50)
+      seg.append(i, i * 10, i, messages(i, s"msg$i"))
+
+    assertEquals(490, seg.largestTimestamp)
+    // Search for an indexed timestamp
+    assertEquals(42, seg.findOffsetByTimestamp(420).get)
+    assertEquals(43, seg.findOffsetByTimestamp(421).get)
+    // Search for an un-indexed timestamp
+    assertEquals(43, seg.findOffsetByTimestamp(430).get)
+    assertEquals(44, seg.findOffsetByTimestamp(431).get)
+    // Search beyond the last timestamp
+    assertEquals(50, seg.findOffsetByTimestamp(491).get)
+    // Search before the first indexed timestamp
+    assertEquals(41, seg.findOffsetByTimestamp(401).get)
+    // Search before the first timestamp
+    assertEquals(40, seg.findOffsetByTimestamp(399).get)
+  }
+
   /**
    * Test that offsets are assigned sequentially and that the nextOffset variable is incremented
    */
@@ -171,7 +219,7 @@ class LogSegmentTest {
   def testNextOffsetCalculation() {
     val seg = createSegment(40)
     assertEquals(40, seg.nextOffset)
-    seg.append(50, messages(50, "hello", "there", "you"))
+    seg.append(50, Message.NoTimestamp, -1L, messages(50, "hello", "there", "you"))
     assertEquals(53, seg.nextOffset())
   }
   
@@ -198,13 +246,31 @@ class LogSegmentTest {
   def testRecoveryFixesCorruptIndex() {
     val seg = createSegment(0)
     for(i <- 0 until 100)
-      seg.append(i, messages(i, i.toString))
+      seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
     val indexFile = seg.index.file
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
       assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset)
   }
+
+  /**
+   * Create a segment with some data and an index. Then corrupt the index,
+   * and recover the segment, the entries should all be readable.
+   */
+  @Test
+  def testRecoveryFixesCorruptTimeIndex() {
+    val seg = createSegment(0)
+    for(i <- 0 until 100)
+      seg.append(i, i * 10, i, messages(i, i.toString))
+    val timeIndexFile = seg.timeIndex.file
+    TestUtils.writeNonsenseToFile(timeIndexFile, 5, timeIndexFile.length.toInt)
+    seg.recover(64*1024)
+    for(i <- 0 until 100) {
+      assertEquals(i, seg.findOffsetByTimestamp(i * 10).get)
+      assertEquals(i + 1, seg.findOffsetByTimestamp(i * 10 + 1).get)
+    }
+  }
   
   /**
    * Randomly corrupt a log a number of times and attempt recovery.
@@ -215,10 +281,10 @@ class LogSegmentTest {
     for(iteration <- 0 until 10) {
       val seg = createSegment(0)
       for(i <- 0 until messagesAppended)
-        seg.append(i, messages(i, i.toString))
+        seg.append(i, Message.NoTimestamp, -1L, messages(i, i.toString))
       val offsetToBeginCorruption = TestUtils.random.nextInt(messagesAppended)
       // start corrupting somewhere in the middle of the chosen record all the way to the end
-      val position = seg.log.searchFor(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
+      val position = seg.log.searchForOffset(offsetToBeginCorruption, 0).position + TestUtils.random.nextInt(15)
       TestUtils.writeNonsenseToFile(seg.log.file, position, seg.log.file.length.toInt - position)
       seg.recover(64*1024)
       assertEquals("Should have truncated off bad messages.", (0 until offsetToBeginCorruption).toList, seg.log.map(_.offset).toList)
@@ -227,7 +293,7 @@ class LogSegmentTest {
   }
 
   /* create a segment with   pre allocate */
-  def createSegment(offset: Long, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false): LogSegment = {
+  def createSegment(offset: Long, fileAlreadyExists: Boolean, initFileSize: Int, preallocate: Boolean): LogSegment = {
     val tempDir = TestUtils.tempDir()
     val seg = new LogSegment(tempDir, offset, 10, 1000, 0, SystemTime, fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate)
     segments += seg
@@ -239,9 +305,9 @@ class LogSegmentTest {
   def testCreateWithInitFileSizeAppendMessage() {
     val seg = createSegment(40, false, 512*1024*1024, true)
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, ms2)
+    seg.append(60, Message.NoTimestamp, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.toList, read.messageSet.toList)
   }
@@ -253,9 +319,9 @@ class LogSegmentTest {
     val seg = new LogSegment(tempDir, 40, 10, 1000, 0, SystemTime, false, 512*1024*1024, true)
 
     val ms = messages(50, "hello", "there")
-    seg.append(50, ms)
+    seg.append(50, Message.NoTimestamp, -1L, ms)
     val ms2 = messages(60, "alpha", "beta")
-    seg.append(60, ms2)
+    seg.append(60, Message.NoTimestamp, -1L, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
     assertEquals(ms2.toList, read.messageSet.toList)
     val oldSize = seg.log.sizeInBytes()

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 33dd68e..2466ef2 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -35,7 +35,7 @@ class LogTest extends JUnitSuite {
 
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val time = new MockTime(0)
+  val time = new MockTime(100)
   var config: KafkaConfig = null
   val logConfig = LogConfig()
 
@@ -88,6 +88,20 @@ class LogTest extends JUnitSuite {
       assertEquals("Changing time beyond rollMs and appending should create a new segment.", numSegments, log.numberOfSegments)
     }
 
+    time.sleep(log.config.segmentMs + 1)
+    val setWithTimestamp =
+      TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + log.config.segmentMs + 1)
+    log.append(setWithTimestamp)
+    assertEquals("A new segment should have been rolled out", 5, log.numberOfSegments)
+
+    time.sleep(log.config.segmentMs + 1)
+    log.append(set)
+    assertEquals("Log should not roll because the roll should depend on the index of the first time index entry.", 5, log.numberOfSegments)
+
+    time.sleep(log.config.segmentMs + 1)
+    log.append(set)
+    assertEquals("Log should roll because the time since the timestamp of first time index entry has expired.", 6, log.numberOfSegments)
+
     val numSegments = log.numberOfSegments
     time.sleep(log.config.segmentMs + 1)
     log.append(new ByteBufferMessageSet())
@@ -457,28 +471,64 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(messageSize)))
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(messageSize),
+        timestamp = time.milliseconds + i * 10))
     assertEquals("After appending %d messages to an empty log, the log end offset should be %d".format(numMessages, numMessages), numMessages, log.logEndOffset)
     val lastIndexOffset = log.activeSegment.index.lastOffset
     val numIndexEntries = log.activeSegment.index.entries
     val lastOffset = log.logEndOffset
+    // After segment is closed, the last entry in the time index should be (largest timestamp -> last offset).
+    val lastTimeIndexOffset = log.logEndOffset - 1
+    val lastTimeIndexTimestamp  = log.activeSegment.largestTimestamp
+    // Depending on when the last time index entry is inserted, an entry may or may not be inserted into the time index.
+    val numTimeIndexEntries = log.activeSegment.timeIndex.entries + {
+      if (log.activeSegment.timeIndex.lastEntry.offset == log.logEndOffset - 1) 0 else 1
+    }
     log.close()
 
+    def verifyRecoveredLog(log: Log) {
+      assertEquals(s"Should have $numMessages messages when log is reopened w/o recovery", numMessages, log.logEndOffset)
+      assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
+      assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+      assertEquals("Should have same last time index timestamp", lastTimeIndexTimestamp, log.activeSegment.timeIndex.lastEntry.timestamp)
+      assertEquals("Should have same last time index offset", lastTimeIndexOffset, log.activeSegment.timeIndex.lastEntry.offset)
+      assertEquals("Should have same number of time index entries as before.", numTimeIndexEntries, log.activeSegment.timeIndex.entries)
+    }
+
     log = new Log(logDir, config, recoveryPoint = lastOffset, time.scheduler, time)
-    assertEquals("Should have %d messages when log is reopened w/o recovery".format(numMessages), numMessages, log.logEndOffset)
-    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
-    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+    verifyRecoveredLog(log)
     log.close()
 
     // test recovery case
     log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
-    assertEquals("Should have %d messages when log is reopened with recovery".format(numMessages), numMessages, log.logEndOffset)
-    assertEquals("Should have same last index offset as before.", lastIndexOffset, log.activeSegment.index.lastOffset)
-    assertEquals("Should have same number of index entries as before.", numIndexEntries, log.activeSegment.index.entries)
+    verifyRecoveredLog(log)
     log.close()
   }
 
   /**
+   * Test building the time index on the follower by setting assignOffsets to false.
+   */
+  @Test
+  def testBuildTimeIndexWhenNotAssigningOffsets() {
+    val numMessages = 100
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 10000: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+
+    val config = LogConfig(logProps)
+    val log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+
+    val messages = (0 until numMessages).map { i =>
+      new ByteBufferMessageSet(NoCompressionCodec, new LongRef(100 + i), new Message(i.toString.getBytes(), time.milliseconds + i, Message.MagicValue_V1))
+    }
+    messages.foreach(log.append(_, assignOffsets = false))
+    val timeIndexEntries = log.logSegments.foldLeft(0) { (entries, segment) => entries + segment.timeIndex.entries }
+    assertEquals(s"There should be ${numMessages - 1} time index entries", numMessages - 1, timeIndexEntries)
+    assertEquals(s"The last time index entry should have timestamp ${time.milliseconds + numMessages - 1}",
+      time.milliseconds + numMessages - 1, log.activeSegment.timeIndex.lastEntry.timestamp)
+  }
+
+  /**
    * Test that if we manually delete an index segment it is rebuilt when the log is re-opened
    */
   @Test
@@ -492,19 +542,58 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
+    val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
     // delete all the index files
     indexFiles.foreach(_.delete())
+    timeIndexFiles.foreach(_.delete())
 
     // reopen the log
     log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
-    for(i <- 0 until numMessages)
+    assertTrue("The index should have been rebuilt", log.logSegments.head.index.entries > 0)
+    assertTrue("The time index should have been rebuilt", log.logSegments.head.timeIndex.entries > 0)
+    for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
+      if (i == 0)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+      else
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+    }
+    log.close()
+  }
+
+  /**
+   * Test that if messages format version of the messages in a segment is before 0.10.0, the time index should be empty.
+   */
+  @Test
+  def testRebuildTimeIndexForOldMessages() {
+    val numMessages = 200
+    val segmentSize = 200
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, 1: java.lang.Integer)
+    logProps.put(LogConfig.MessageFormatVersionProp, "0.9.0")
+
+    val config = LogConfig(logProps)
+    var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
+    for(i <- 0 until numMessages)
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
+    val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
+
+    // Delete the time index.
+    timeIndexFiles.foreach(_.delete())
+
+    // The rebuilt time index should be empty
+    log = new Log(logDir, config, recoveryPoint = numMessages + 1, time.scheduler, time)
+    val segArray = log.logSegments.toArray
+    for (i <- 0 until segArray.size - 1)
+      assertEquals("The time index should be empty", 0, segArray(i).timeIndex.entries)
+
   }
 
   /**
@@ -521,8 +610,9 @@ class LogTest extends JUnitSuite {
     val config = LogConfig(logProps)
     var log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)
     for(i <- 0 until numMessages)
-      log.append(TestUtils.singleMessageSet(TestUtils.randomBytes(10)))
+      log.append(TestUtils.singleMessageSet(payload = TestUtils.randomBytes(10), timestamp = time.milliseconds + i * 10))
     val indexFiles = log.logSegments.map(_.index.file)
+    val timeIndexFiles = log.logSegments.map(_.timeIndex.file)
     log.close()
 
     // corrupt all the index files
@@ -532,11 +622,23 @@ class LogTest extends JUnitSuite {
       bw.close()
     }
 
+    // corrupt all the index files
+    for( file <- timeIndexFiles) {
+      val bw = new BufferedWriter(new FileWriter(file))
+      bw.write("  ")
+      bw.close()
+    }
+
     // reopen the log
     log = new Log(logDir, config, recoveryPoint = 200L, time.scheduler, time)
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
-    for(i <- 0 until numMessages)
+    for(i <- 0 until numMessages) {
       assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
+      if (i == 0)
+        assertEquals(log.logSegments.head.baseOffset, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+      else
+        assertEquals(i, log.fetchOffsetsByTimestamp(time.milliseconds + i * 10))
+    }
     log.close()
   }
 
@@ -602,27 +704,37 @@ class LogTest extends JUnitSuite {
    */
   @Test
   def testIndexResizingAtTruncation() {
-    val set = TestUtils.singleMessageSet("test".getBytes)
-    val setSize = set.sizeInBytes
+    val setSize = TestUtils.singleMessageSet(payload = "test".getBytes).sizeInBytes
     val msgPerSeg = 10
     val segmentSize = msgPerSeg * setSize  // each segment will be 10 messages
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentBytesProp, segmentSize: java.lang.Integer)
+    logProps.put(LogConfig.IndexIntervalBytesProp, (setSize - 1): java.lang.Integer)
     val config = LogConfig(logProps)
     val log = new Log(logDir, config, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
     for (i<- 1 to msgPerSeg)
-      log.append(set)
+      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
+
+    time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(set)
+      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 2 segment.", 2, log.numberOfSegments)
-    assertEquals("The index of the first segment should be trimmed to empty", 0, log.logSegments.toList.head.index.maxEntries)
+    val expectedEntries = msgPerSeg - 1
+
+    assertEquals(s"The index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.index.maxEntries)
+    assertEquals(s"The time index of the first segment should have $expectedEntries entries", expectedEntries, log.logSegments.toList.head.timeIndex.maxEntries)
+
     log.truncateTo(0)
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
     assertEquals("The index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/8, log.logSegments.toList.head.index.maxEntries)
+    assertEquals("The time index of segment 1 should be resized to maxIndexSize", log.config.maxIndexSize/12, log.logSegments.toList.head.timeIndex.maxEntries)
+
+    time.sleep(msgPerSeg)
     for (i<- 1 to msgPerSeg)
-      log.append(set)
+      log.append(TestUtils.singleMessageSet(payload = "test".getBytes, timestamp = time.milliseconds + i))
     assertEquals("There should be exactly 1 segment.", 1, log.numberOfSegments)
   }
 
@@ -632,7 +744,9 @@ class LogTest extends JUnitSuite {
   @Test
   def testBogusIndexSegmentsAreRemoved() {
     val bogusIndex1 = Log.indexFilename(logDir, 0)
+    val bogusTimeIndex1 = Log.timeIndexFilename(logDir, 0)
     val bogusIndex2 = Log.indexFilename(logDir, 5)
+    val bogusTimeIndex2 = Log.timeIndexFilename(logDir, 5)
 
     val set = TestUtils.singleMessageSet("test".getBytes)
     val logProps = new Properties()
@@ -646,7 +760,9 @@ class LogTest extends JUnitSuite {
                       time)
 
     assertTrue("The first index file should have been replaced with a larger file", bogusIndex1.length > 0)
+    assertTrue("The first time index file should have been replaced with a larger file", bogusTimeIndex1.length > 0)
     assertFalse("The second index file should have been deleted.", bogusIndex2.exists)
+    assertFalse("The second time index file should have been deleted.", bogusTimeIndex2.exists)
 
     // check that we can append to the log
     for(i <- 0 until 10)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
new file mode 100644
index 0000000..bc60c72
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+
+import kafka.common.InvalidOffsetException
+import kafka.utils.TestUtils
+import org.junit.{Test, After, Before}
+import org.junit.Assert.{assertEquals}
+import org.scalatest.junit.JUnitSuite
+
+/**
+ * Unit test for time index.
+ */
+class TimeIndexTest extends JUnitSuite {
+  var idx: TimeIndex = null
+  val maxEntries = 30
+  val baseOffset = 45L
+
+  @Before
+  def setup() {
+    this.idx = new TimeIndex(file = nonExistantTempFile(), baseOffset = baseOffset, maxIndexSize = maxEntries * 12)
+  }
+
+  @After
+  def teardown() {
+    if(this.idx != null)
+      this.idx.file.delete()
+  }
+
+  @Test
+  def testLookUp() {
+    // Empty time index
+    assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(100L))
+
+    // Add several time index entries.
+    appendEntries(maxEntries - 1)
+
+    // look for timestamp smaller than the earliest entry
+    assertEquals(TimestampOffset(-1L, baseOffset), idx.lookup(9))
+    // look for timestamp in the middle of two entries.
+    assertEquals(TimestampOffset(20L, 65L), idx.lookup(25))
+    // look for timestamp same as the one in the entry
+    assertEquals(TimestampOffset(30L, 75L), idx.lookup(30))
+  }
+
+  @Test
+  def testTruncate() {
+    appendEntries(maxEntries - 1)
+    idx.truncate()
+    assertEquals(0, idx.entries)
+
+    appendEntries(maxEntries - 1)
+    idx.truncateTo(10 + baseOffset)
+    assertEquals(0, idx.entries)
+  }
+
+  @Test
+  def testAppend() {
+    appendEntries(maxEntries - 1)
+    intercept[IllegalArgumentException] {
+      idx.maybeAppend(10000L, 1000L)
+    }
+    intercept[InvalidOffsetException] {
+      idx.maybeAppend(10000L, (maxEntries - 2) * 10, true)
+    }
+    idx.maybeAppend(10000L, 1000L, true)
+  }
+
+  private def appendEntries(numEntries: Int) {
+    for (i <- 1 to numEntries)
+      idx.maybeAppend(i * 10, i * 10 + baseOffset)
+  }
+
+  def nonExistantTempFile(): File = {
+    val file = TestUtils.tempFile()
+    file.delete()
+    file
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
index 4810009..39eb84c 100644
--- a/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
+++ b/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
@@ -152,56 +152,69 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
 
   @Test
   def testLogAppendTime() {
-    val startTime = System.currentTimeMillis()
+    val now = System.currentTimeMillis()
     // The timestamps should be overwritten
     val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = NoCompressionCodec)
     val compressedMessagesWithRecompresion = getMessages(magicValue = Message.MagicValue_V0, codec = DefaultCompressionCodec)
     val compressedMessagesWithoutRecompression =
-      getMessages(magicValue = Message.MagicValue_V1, timestamp = -1L, codec = DefaultCompressionCodec)
-
-    val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                           now = System.currentTimeMillis(),
-                                                                           sourceCodec = NoCompressionCodec,
-                                                                           targetCodec = NoCompressionCodec,
-                                                                           messageFormatVersion = 1,
-                                                                           messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                           messageTimestampDiffMaxMs = 1000L)
-
-    val (validatedCompressedMessages, _) =
+      getMessages(magicValue = Message.MagicValue_V1, timestamp = 0L, codec = DefaultCompressionCodec)
+
+    val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                      now = now,
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageFormatVersion = 1,
+                                                                      messageTimestampType = TimestampType.LOG_APPEND_TIME,
+                                                                      messageTimestampDiffMaxMs = 1000L)
+    val validatedMessages = validatingResults.validatedMessages
+
+    val validatingCompressedMessagesResults =
       compressedMessagesWithRecompresion.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                          now = System.currentTimeMillis(),
+                                                                          now = now,
                                                                           sourceCodec = DefaultCompressionCodec,
                                                                           targetCodec = DefaultCompressionCodec,
                                                                           messageFormatVersion = 1,
                                                                           messageTimestampType = TimestampType.LOG_APPEND_TIME,
                                                                           messageTimestampDiffMaxMs = 1000L)
+    val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages
 
-    val (validatedCompressedMessagesWithoutRecompression, _) =
+    val validatingCompressedMessagesWithoutRecompressionResults =
       compressedMessagesWithoutRecompression.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                              now = System.currentTimeMillis(),
+                                                                              now = now,
                                                                               sourceCodec = DefaultCompressionCodec,
                                                                               targetCodec = DefaultCompressionCodec,
                                                                               messageFormatVersion = 1,
                                                                               messageTimestampType = TimestampType.LOG_APPEND_TIME,
                                                                               messageTimestampDiffMaxMs = 1000L)
 
-    val now = System.currentTimeMillis()
+    val validatedCompressedMessagesWithoutRecompression = validatingCompressedMessagesWithoutRecompressionResults.validatedMessages
+
     assertEquals("message set size should not change", messages.size, validatedMessages.size)
     validatedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
+    assertEquals(s"Max timestamp should be $now", now, validatingResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be 0", 0, validatingResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
 
     assertEquals("message set size should not change", compressedMessagesWithRecompresion.size, validatedCompressedMessages.size)
     validatedCompressedMessages.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
     assertTrue("MessageSet should still valid", validatedCompressedMessages.shallowIterator.next().message.isValid)
+    assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithRecompresion.size - 1}",
+      compressedMessagesWithRecompresion.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp)
+    assertTrue("Message size may have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged)
 
     assertEquals("message set size should not change", compressedMessagesWithoutRecompression.size,
       validatedCompressedMessagesWithoutRecompression.size)
     validatedCompressedMessagesWithoutRecompression.foreach(messageAndOffset => validateLogAppendTime(messageAndOffset.message))
     assertTrue("MessageSet should still valid", validatedCompressedMessagesWithoutRecompression.shallowIterator.next().message.isValid)
+    assertEquals(s"Max timestamp should be $now", now, validatingCompressedMessagesWithoutRecompressionResults.maxTimestamp)
+    assertEquals(s"The offset of max timestamp should be ${compressedMessagesWithoutRecompression.size - 1}",
+      compressedMessagesWithoutRecompression.size - 1, validatingCompressedMessagesWithoutRecompressionResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingCompressedMessagesWithoutRecompressionResults.messageSizeMaybeChanged)
 
     def validateLogAppendTime(message: Message) {
       message.ensureValid()
-      assertTrue(s"Timestamp of message $message should be between $startTime and $now",
-        message.timestamp >= startTime && message.timestamp <= now)
+      assertEquals(s"Timestamp of message $message should be $now", now, message.timestamp)
       assertEquals(TimestampType.LOG_APPEND_TIME, message.timestampType)
     }
   }
@@ -209,18 +222,28 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
   @Test
   def testCreateTime() {
     val now = System.currentTimeMillis()
-    val messages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = NoCompressionCodec)
-    val compressedMessages = getMessages(magicValue = Message.MagicValue_V1, timestamp = now, codec = DefaultCompressionCodec)
-
-    val (validatedMessages, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
-                                                                           now = System.currentTimeMillis(),
-                                                                           sourceCodec = NoCompressionCodec,
-                                                                           targetCodec = NoCompressionCodec,
-                                                                           messageFormatVersion = 1,
-                                                                           messageTimestampType = TimestampType.CREATE_TIME,
-                                                                           messageTimestampDiffMaxMs = 1000L)
-
-    val (validatedCompressedMessages, _) =
+    val timestampSeq = Seq(now - 1, now + 1, now)
+    val messages =
+      new ByteBufferMessageSet(NoCompressionCodec,
+                               new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1),
+                               new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1),
+                               new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1))
+    val compressedMessages =
+      new ByteBufferMessageSet(DefaultCompressionCodec,
+                               new Message("hello".getBytes, timestamp = timestampSeq(0), magicValue = Message.MagicValue_V1),
+                               new Message("there".getBytes, timestamp = timestampSeq(1), magicValue = Message.MagicValue_V1),
+                               new Message("beautiful".getBytes, timestamp = timestampSeq(2), magicValue = Message.MagicValue_V1))
+
+    val validatingResults = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
+                                                                      now = System.currentTimeMillis(),
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageFormatVersion = 1,
+                                                                      messageTimestampType = TimestampType.CREATE_TIME,
+                                                                      messageTimestampDiffMaxMs = 1000L)
+    val validatedMessages = validatingResults.validatedMessages
+
+    val validatingCompressedMessagesResults =
       compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(0),
                                                           now = System.currentTimeMillis(),
                                                           sourceCodec = DefaultCompressionCodec,
@@ -228,17 +251,29 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                           messageFormatVersion = 1,
                                                           messageTimestampType = TimestampType.CREATE_TIME,
                                                           messageTimestampDiffMaxMs = 1000L)
+    val validatedCompressedMessages = validatingCompressedMessagesResults.validatedMessages
 
+    var i = 0
     for (messageAndOffset <- validatedMessages) {
       messageAndOffset.message.ensureValid()
-      assertEquals(messageAndOffset.message.timestamp, now)
+      assertEquals(messageAndOffset.message.timestamp, timestampSeq(i))
       assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
+      i += 1
     }
+    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be 1", 1, validatingResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingResults.messageSizeMaybeChanged)
+    i = 0
     for (messageAndOffset <- validatedCompressedMessages) {
       messageAndOffset.message.ensureValid()
-      assertEquals(messageAndOffset.message.timestamp, now)
+      assertEquals(messageAndOffset.message.timestamp, timestampSeq(i))
       assertEquals(messageAndOffset.message.timestampType, TimestampType.CREATE_TIME)
+      i += 1
     }
+    assertEquals(s"Max timestamp should be ${now + 1}", now + 1, validatingResults.maxTimestamp)
+    assertEquals(s"Offset of max timestamp should be ${validatedCompressedMessages.size - 1}",
+      validatedCompressedMessages.size - 1, validatingCompressedMessagesResults.offsetOfMaxTimestamp)
+    assertFalse("Message size should not have been changed", validatingCompressedMessagesResults.messageSizeMaybeChanged)
   }
 
   @Test
@@ -287,7 +322,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                            targetCodec = NoCompressionCodec,
                                                            messageFormatVersion = 0,
                                                            messageTimestampType = TimestampType.CREATE_TIME,
-                                                           messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                           messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
@@ -297,7 +332,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                                      targetCodec = DefaultCompressionCodec,
                                                                      messageFormatVersion = 0,
                                                                      messageTimestampType = TimestampType.CREATE_TIME,
-                                                                     messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                                     messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
   }
 
@@ -310,22 +345,22 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
     // check uncompressed offsets
     checkOffsets(messages, 0)
     val offset = 1234567
-    val (messageWithOffset, _) = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                           now = System.currentTimeMillis(),
-                                                                           sourceCodec = NoCompressionCodec,
-                                                                           targetCodec = NoCompressionCodec,
-                                                                           messageTimestampType = TimestampType.CREATE_TIME,
-                                                                           messageTimestampDiffMaxMs = 5000L)
+    val messageWithOffset = messages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                      now = System.currentTimeMillis(),
+                                                                      sourceCodec = NoCompressionCodec,
+                                                                      targetCodec = NoCompressionCodec,
+                                                                      messageTimestampType = TimestampType.CREATE_TIME,
+                                                                      messageTimestampDiffMaxMs = 5000L).validatedMessages
     checkOffsets(messageWithOffset, offset)
 
     // check compressed messages
     checkOffsets(compressedMessages, 0)
-    val (compressedMessagesWithOffset, _) = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
-                                                                                                now = System.currentTimeMillis(),
-                                                                                                sourceCodec = DefaultCompressionCodec,
-                                                                                                targetCodec = DefaultCompressionCodec,
-                                                                                                messageTimestampType = TimestampType.CREATE_TIME,
-                                                                                                messageTimestampDiffMaxMs = 5000L)
+    val compressedMessagesWithOffset = compressedMessages.validateMessagesAndAssignOffsets(offsetCounter = new LongRef(offset),
+                                                                                           now = System.currentTimeMillis(),
+                                                                                           sourceCodec = DefaultCompressionCodec,
+                                                                                           targetCodec = DefaultCompressionCodec,
+                                                                                           messageTimestampType = TimestampType.CREATE_TIME,
+                                                                                           messageTimestampDiffMaxMs = 5000L).validatedMessages
     checkOffsets(compressedMessagesWithOffset, offset)
   }
 
@@ -343,7 +378,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                              targetCodec = NoCompressionCodec,
                                                              messageFormatVersion = 1,
                                                              messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                             messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                             messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
     // check compressed messages
     checkOffsets(compressedMessagesV0, 0)
@@ -353,7 +388,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                                        targetCodec = DefaultCompressionCodec,
                                                                        messageFormatVersion = 1,
                                                                        messageTimestampType = TimestampType.LOG_APPEND_TIME,
-                                                                       messageTimestampDiffMaxMs = 1000L)._1, offset)
+                                                                       messageTimestampDiffMaxMs = 1000L).validatedMessages, offset)
 
     // Check down conversion
     val now = System.currentTimeMillis()
@@ -368,7 +403,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                              targetCodec = NoCompressionCodec,
                                                              messageFormatVersion = 0,
                                                              messageTimestampType = TimestampType.CREATE_TIME,
-                                                             messageTimestampDiffMaxMs = 5000L)._1, offset)
+                                                             messageTimestampDiffMaxMs = 5000L).validatedMessages, offset)
 
     // check compressed messages
     checkOffsets(compressedMessagesV1, 0)
@@ -378,7 +413,7 @@ class ByteBufferMessageSetTest extends BaseMessageSetTestCases {
                                                                        targetCodec = DefaultCompressionCodec,
                                                                        messageFormatVersion = 0,
                                                                        messageTimestampType = TimestampType.CREATE_TIME,
-                                                                       messageTimestampDiffMaxMs = 5000L)._1, offset)
+                                                                       messageTimestampDiffMaxMs = 5000L).validatedMessages, offset)
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 05b84ef..131a24a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -274,8 +274,9 @@ object TestUtils extends Logging {
   def singleMessageSet(payload: Array[Byte],
                        codec: CompressionCodec = NoCompressionCodec,
                        key: Array[Byte] = null,
+                       timestamp: Long = Message.NoTimestamp,
                        magicValue: Byte = Message.CurrentMagicValue) =
-    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, Message.NoTimestamp, magicValue))
+    new ByteBufferMessageSet(compressionCodec = codec, messages = new Message(payload, key, timestamp, magicValue))
 
   /**
    * Generate an array of random bytes

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/docs/upgrade.html
----------------------------------------------------------------------
diff --git a/docs/upgrade.html b/docs/upgrade.html
index dfd20f4..eef21cf 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -15,10 +15,24 @@
  limitations under the License.
 -->
 
+
+
 <h3><a id="upgrade" href="#upgrade">1.5 Upgrading From Previous Versions</a></h3>
 
-<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
+<h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.10.0.X to 0.10.1.0</a></h4>
+0.10.1.0 is compatible with 0.10.0.X in terms of wire protocol. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.
+However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade.
 
+<h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a></h5>
+<ul>
+    <li> The log retention time is no longer based on last modified time of the log segments. Instead it will be based on the largest timestamp of the messages in a log segment.</li>
+    <li> The log rolling time is no longer depending on log segment create time. Instead it is now based on the timestamp of the first message in a log segment. i.e. if the timestamp of the first message in the segment is T, the log will be rolled out at T + log.roll.ms </li>
+    <li> The open file handlers of 0.10.0 will increase by ~33% because of the addition of time index files for each segment.</li>
+    <li> The time index and offset index share the same index size configuration. Since each time index entry is 1.5x the size of offset index entry. User may need to increase log.index.size.max.bytes to avoid potential frequent log rolling. </li>
+    <li> Due to the increased number of index files, on some brokers with large amount the log segments (e.g. >15K), the log loading process during the broker startup could be longer. Based on our experiment, setting the num.recovery.threads.per.data.dir to one may reduce the log loading time. </li>
+</ul>
+
+<h5><a id="upgrade_1010_notable" href="#upgrade_1010_notable">Notable changes in 0.10.1.0</a></h5>
 <ul>
     <li> The BrokerState "RunningAsController" (value 4) has been removed. Due to a bug, a broker would only be in this state briefly before transitioning out of it and hence the impact of the removal should be minimal. The recommended way to detect if a given broker is the controller is via the kafka.controller:type=KafkaController,name=ActiveControllerCount metric. </li>
 </ul>


[2/2] kafka git commit: KAFKA-3163; Add time based index to Kafka.

Posted by ju...@apache.org.
KAFKA-3163; Add time based index to Kafka.

This patch is for KIP-33.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>, Liquan Pei <li...@gmail.com>

Closes #1215 from becketqin/KAFKA-3163


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79d3fd2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79d3fd2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79d3fd2b

Branch: refs/heads/trunk
Commit: 79d3fd2bf0e5c89ff74a2988c403882ae8a9852e
Parents: 05ed54b
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Aug 19 10:07:07 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 19 10:07:07 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/log/AbstractIndex.scala    | 287 +++++++++++++++++
 .../main/scala/kafka/log/FileMessageSet.scala   |  78 ++++-
 core/src/main/scala/kafka/log/IndexEntry.scala  |  46 +++
 core/src/main/scala/kafka/log/Log.scala         | 196 +++++++++---
 core/src/main/scala/kafka/log/LogCleaner.scala  |  41 ++-
 core/src/main/scala/kafka/log/LogManager.scala  |   6 +-
 core/src/main/scala/kafka/log/LogSegment.scala  | 169 ++++++++--
 core/src/main/scala/kafka/log/OffsetIndex.scala | 306 +++----------------
 .../main/scala/kafka/log/OffsetPosition.scala   |  25 --
 core/src/main/scala/kafka/log/TimeIndex.scala   | 208 +++++++++++++
 .../kafka/message/ByteBufferMessageSet.scala    |  91 ++++--
 .../scala/kafka/message/MessageAndOffset.scala  |   8 +
 .../src/main/scala/kafka/server/KafkaApis.scala |   3 +-
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 122 +++++++-
 .../test/scala/unit/kafka/log/CleanerTest.scala |   5 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |  16 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   4 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 114 +++++--
 .../src/test/scala/unit/kafka/log/LogTest.scala | 152 +++++++--
 .../scala/unit/kafka/log/TimeIndexTest.scala    |  97 ++++++
 .../message/ByteBufferMessageSetTest.scala      | 135 +++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 docs/upgrade.html                               |  16 +-
 24 files changed, 1631 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
new file mode 100644
index 0000000..d594f18
--- /dev/null
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.{File, RandomAccessFile}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.nio.channels.FileChannel
+import java.util.concurrent.locks.{Lock, ReentrantLock}
+
+import kafka.log.IndexSearchType.IndexSearchEntity
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.{CoreUtils, Logging, Os}
+import org.apache.kafka.common.utils.Utils
+import sun.nio.ch.DirectBuffer
+
+import scala.math.ceil
+
+/**
+ * The abstract index class which holds entry format agnostic methods.
+ *
+ * @param _file The index file
+ * @param baseOffset the base offset of the segment that this index is corresponding to.
+ * @param maxIndexSize The maximum index size in bytes.
+ */
+abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
+    extends Logging {
+
+  protected def entrySize: Int
+
+  protected val lock = new ReentrantLock
+
+  @volatile
+  protected var mmap: MappedByteBuffer = {
+    val newlyCreated = _file.createNewFile()
+    val raf = new RandomAccessFile(_file, "rw")
+    try {
+      /* pre-allocate the file if necessary */
+      if(newlyCreated) {
+        if(maxIndexSize < entrySize)
+          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+        raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
+      }
+
+      /* memory-map the file */
+      val len = raf.length()
+      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+
+      /* set the position in the index for the next entry */
+      if(newlyCreated)
+        idx.position(0)
+      else
+        // if this is a pre-existing index, assume it is valid and set position to last entry
+        idx.position(roundDownToExactMultiple(idx.limit, entrySize))
+      idx
+    } finally {
+      CoreUtils.swallow(raf.close())
+    }
+  }
+
+  /**
+   * The maximum number of entries this index can hold
+   */
+  @volatile
+  private[this] var _maxEntries = mmap.limit / entrySize
+
+  /** The number of entries in this index */
+  @volatile
+  protected var _entries = mmap.position / entrySize
+
+  /**
+   * True iff there are no more slots available in this index
+   */
+  def isFull: Boolean = _entries >= _maxEntries
+
+  def maxEntries: Int = _maxEntries
+
+  def entries: Int = _entries
+
+  /**
+   * The index file
+   */
+  def file: File = _file
+
+  /**
+   * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
+   * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
+   * loading segments from disk or truncating back to an old segment where a new log segment became active;
+   * we want to reset the index size to maximum index size to avoid rolling new segment.
+   */
+  def resize(newSize: Int) {
+    inLock(lock) {
+      val raf = new RandomAccessFile(_file, "rw")
+      val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
+      val position = mmap.position
+
+      /* Windows won't let us modify the file length while the file is mmapped :-( */
+      if(Os.isWindows)
+        forceUnmap(mmap)
+      try {
+        raf.setLength(roundedNewSize)
+        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+        _maxEntries = mmap.limit / entrySize
+        mmap.position(position)
+      } finally {
+        CoreUtils.swallow(raf.close())
+      }
+    }
+  }
+
+  /**
+   * Rename the file that backs this offset index
+   *
+   * @throws IOException if rename fails
+   */
+  def renameTo(f: File) {
+    try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
+    finally _file = f
+  }
+
+  /**
+   * Flush the data in the index to disk
+   */
+  def flush() {
+    inLock(lock) {
+      mmap.force()
+    }
+  }
+
+  /**
+   * Delete this index file
+   */
+  def delete(): Boolean = {
+    info(s"Deleting index ${_file.getAbsolutePath}")
+    if(Os.isWindows)
+      CoreUtils.swallow(forceUnmap(mmap))
+    _file.delete()
+  }
+
+  /**
+   * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
+   * the file.
+   */
+  def trimToValidSize() {
+    inLock(lock) {
+      resize(entrySize * _entries)
+    }
+  }
+
+  /**
+   * The number of bytes actually used by this index
+   */
+  def sizeInBytes = entrySize * _entries
+
+  /** Close the index */
+  def close() {
+    trimToValidSize()
+  }
+
+  /**
+   * Do a basic sanity check on this index to detect obvious problems
+   *
+   * @throws IllegalArgumentException if any problems are found
+   */
+  def sanityCheck(): Unit
+
+  /**
+   * Remove all the entries from the index.
+   */
+  def truncate(): Unit
+
+  /**
+   * Remove all entries from the index which have an offset greater than or equal to the given offset.
+   * Truncating to an offset larger than the largest in the index has no effect.
+   */
+  def truncateTo(offset: Long): Unit
+
+  /**
+   * Forcefully free the buffer's mmap. We do this only on windows.
+   */
+  protected def forceUnmap(m: MappedByteBuffer) {
+    try {
+      m match {
+        case buffer: DirectBuffer =>
+          val bufferCleaner = buffer.cleaner()
+          /* cleaner can be null if the mapped region has size 0 */
+          if (bufferCleaner != null)
+            bufferCleaner.clean()
+        case _ =>
+      }
+    } catch {
+      case t: Throwable => error("Error when freeing index buffer", t)
+    }
+  }
+
+  /**
+   * Execute the given function in a lock only if we are running on windows. We do this
+   * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
+   * and this requires synchronizing reads.
+   */
+  protected def maybeLock[T](lock: Lock)(fun: => T): T = {
+    if(Os.isWindows)
+      lock.lock()
+    try {
+      fun
+    } finally {
+      if(Os.isWindows)
+        lock.unlock()
+    }
+  }
+
+  /**
+   * To parse an entry in the index.
+   *
+   * @param buffer the buffer of this memory mapped index.
+   * @param n the slot
+   * @return the index entry stored in the given slot.
+   */
+  protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
+
+  /**
+   * Find the slot in which the largest entry less than or equal to the given target key or value is stored.
+   * The comparison is made using the `IndexEntry.compareTo()` method.
+   *
+   * @param idx The index buffer
+   * @param target The index key to look for
+   * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
+   */
+  protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
+    // check if the index is empty
+    if(_entries == 0)
+      return -1
+
+    // check if the target offset is smaller than the least offset
+    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
+      return -1
+
+    // binary search for the entry
+    var lo = 0
+    var hi = _entries - 1
+    while(lo < hi) {
+      val mid = ceil(hi/2.0 + lo/2.0).toInt
+      val found = parseEntry(idx, mid)
+      val compareResult = compareIndexEntry(found, target, searchEntity)
+      if(compareResult > 0)
+        hi = mid - 1
+      else if(compareResult < 0)
+        lo = mid
+      else
+        return mid
+    }
+    lo
+  }
+
+  private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {
+    searchEntity match {
+      case IndexSearchType.KEY => indexEntry.indexKey.compareTo(target)
+      case IndexSearchType.VALUE => indexEntry.indexValue.compareTo(target)
+    }
+  }
+
+  /**
+   * Round a number to the greatest exact multiple of the given factor less than the given number.
+   * E.g. roundDownToExactMultiple(67, 8) == 64
+   */
+  private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
+
+}
+
+object IndexSearchType extends Enumeration {
+  type IndexSearchEntity = Value
+  val KEY, VALUE = Value
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 8e92f95..5763042 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -117,7 +117,13 @@ class FileMessageSet private[kafka](@volatile var file: File,
     new FileMessageSet(file,
                        channel,
                        start = this.start + position,
-                       end = math.min(this.start + position + size, sizeInBytes()))
+                       end = {
+                         // Handle the integer overflow
+                         if (this.start + position + size < 0)
+                           sizeInBytes()
+                         else
+                           math.min(this.start + position + size, sizeInBytes())
+                       })
   }
 
   /**
@@ -126,7 +132,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
    * @param targetOffset The offset to search for.
    * @param startingPosition The starting position in the file to begin searching from.
    */
-  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
+  def searchForOffset(targetOffset: Long, startingPosition: Int): OffsetPosition = {
     var position = startingPosition
     val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
     val size = sizeInBytes()
@@ -135,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
       channel.read(buffer, position)
       if(buffer.hasRemaining)
         throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
-                                        .format(targetOffset, startingPosition, file.getAbsolutePath))
+          .format(targetOffset, startingPosition, file.getAbsolutePath))
       buffer.rewind()
       val offset = buffer.getLong()
       if(offset >= targetOffset)
@@ -149,6 +155,72 @@ class FileMessageSet private[kafka](@volatile var file: File,
   }
 
   /**
+   * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+   *
+   * The search will stop immediately when it sees a message in format version before 0.10.0. This is to avoid
+   * scanning the entire log when all the messages are still in old format.
+   *
+   * @param targetTimestamp The timestamp to search for.
+   * @param startingPosition The starting position to search.
+   * @return None, if no message exists at or after the starting position.
+   *         Some(the_next_offset_to_read) otherwise.
+   */
+  def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[Long] = {
+    var maxTimestampChecked = Message.NoTimestamp
+    var lastOffsetChecked = -1L
+    val messagesToSearch = read(startingPosition, sizeInBytes)
+    for (messageAndOffset <- messagesToSearch) {
+      val message = messageAndOffset.message
+      lastOffsetChecked = messageAndOffset.offset
+      // Stop searching once we see message format before 0.10.0.
+      // This equivalent as treating message without timestamp has the largest timestamp.
+      // We do this to avoid scanning the entire log if no message has a timestamp.
+      if (message.magic == Message.MagicValue_V0)
+        return Some(messageAndOffset.offset)
+      else if (message.timestamp >= targetTimestamp) {
+        // We found a message
+        message.compressionCodec match {
+          case NoCompressionCodec =>
+            return Some(messageAndOffset.offset)
+          case _ =>
+            // Iterate over the inner messages to get the exact offset.
+            for (innerMessage <- ByteBufferMessageSet.deepIterator(messageAndOffset)) {
+              val timestamp = innerMessage.message.timestamp
+              if (timestamp >= targetTimestamp)
+                return Some(innerMessage.offset)
+            }
+            throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" +
+                s" should contain target timestamp $targetTimestamp but it does not.")
+        }
+      } else
+        maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp)
+    }
+
+    if (lastOffsetChecked >= 0)
+      Some(lastOffsetChecked + 1)
+    else
+      None
+  }
+
+  /**
+   * Return the largest timestamp of the messages after a given position in this file message set.
+   * @param startingPosition The starting position.
+   * @return The largest timestamp of the messages after the given position.
+   */
+  def largestTimestampAfter(startingPosition: Int): TimestampOffset = {
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
+    val messagesToSearch = read(startingPosition, Int.MaxValue)
+    for (messageAndOffset <- messagesToSearch) {
+      if (messageAndOffset.message.timestamp > maxTimestamp) {
+        maxTimestamp = messageAndOffset.message.timestamp
+        offsetOfMaxTimestamp = messageAndOffset.offset
+      }
+    }
+    TimestampOffset(maxTimestamp, offsetOfMaxTimestamp)
+  }
+
+  /**
    * Write some of this set to the given channel.
    * @param destChannel The channel to write to.
    * @param writePosition The position in the message set to begin writing from.

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/IndexEntry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala b/core/src/main/scala/kafka/log/IndexEntry.scala
new file mode 100644
index 0000000..2f5a6a7
--- /dev/null
+++ b/core/src/main/scala/kafka/log/IndexEntry.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+sealed trait IndexEntry {
+  // We always use Long for both key and value to avoid boxing.
+  def indexKey: Long
+  def indexValue: Long
+}
+
+/**
+ * The mapping between a logical log offset and the physical position
+ * in some log file of the beginning of the message set entry with the
+ * given offset.
+ */
+case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
+  override def indexKey = offset
+  override def indexValue = position.toLong
+}
+
+
+/**
+ * The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater
+ * than that timestamp must be at or after that offset.
+ * @param timestamp The max timestamp before the given offset.
+ * @param offset The message offset.
+ */
+case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
+  override def indexKey = timestamp
+  override def indexValue = offset
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 1a7719a..b4aa470 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,6 +17,7 @@
 
 package kafka.log
 
+import kafka.api.OffsetRequest
 import kafka.utils._
 import kafka.message._
 import kafka.common._
@@ -30,19 +31,22 @@ import java.text.NumberFormat
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
 import org.apache.kafka.common.record.TimestampType
 
-import scala.collection.JavaConversions
+import scala.collection.{Seq, JavaConversions}
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.Utils
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
  * Struct to hold various quantities we compute about each message set before appending to the log
+ *
  * @param firstOffset The first offset in the message set
  * @param lastOffset The last offset in the message set
- * @param timestamp The log append time (if used) of the message set, otherwise Message.NoTimestamp
+ * @param maxTimestamp The maximum timestamp of the message set.
+ * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
+ * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
  * @param sourceCodec The source codec used in the message set (send by the producer)
  * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
  * @param shallowCount The number of shallow messages
@@ -51,7 +55,9 @@ object LogAppendInfo {
  */
 case class LogAppendInfo(var firstOffset: Long,
                          var lastOffset: Long,
-                         var timestamp: Long,
+                         var maxTimestamp: Long,
+                         var offsetOfMaxTimestamp: Long,
+                         var logAppendTime: Long,
                          sourceCodec: CompressionCodec,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
@@ -95,7 +101,7 @@ class Log(val dir: File,
     else
       0
   }
-
+  val t = time.milliseconds
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
   loadSegments()
@@ -105,7 +111,8 @@ class Log(val dir: File,
 
   val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
 
-  info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
+  info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
+      .format(name, segments.size(), logEndOffset, time.milliseconds - t))
 
   val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
 
@@ -167,12 +174,17 @@ class Log(val dir: File,
       }
     }
 
-    // now do a second pass and load all the .log and .index files
+    // now do a second pass and load all the .log and all index files
     for(file <- dir.listFiles if file.isFile) {
       val filename = file.getName
-      if(filename.endsWith(IndexFileSuffix)) {
+      if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
         // if it is an index file, make sure it has a corresponding .log file
-        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+        val logFile =
+          if (filename.endsWith(TimeIndexFileSuffix))
+            new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
+          else
+            new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+
         if(!logFile.exists) {
           warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
           file.delete()
@@ -181,6 +193,9 @@ class Log(val dir: File,
         // if its a log file, load the corresponding log segment
         val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
         val indexFile = Log.indexFilename(dir, start)
+        val timeIndexFile = Log.timeIndexFilename(dir, start)
+
+        val indexFileExists = indexFile.exists()
         val segment = new LogSegment(dir = dir,
                                      startOffset = start,
                                      indexIntervalBytes = config.indexInterval,
@@ -189,20 +204,23 @@ class Log(val dir: File,
                                      time = time,
                                      fileAlreadyExists = true)
 
-        if(indexFile.exists()) {
+        if (indexFileExists) {
           try {
-              segment.index.sanityCheck()
+            segment.index.sanityCheck()
+            segment.timeIndex.sanityCheck()
           } catch {
             case e: java.lang.IllegalArgumentException =>
-              warn("Found a corrupted index file, %s, deleting and rebuilding index. Error Message: %s".format(indexFile.getAbsolutePath, e.getMessage))
+              warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
+                s"${indexFile.getAbsolutePath} and rebuilding index...")
               indexFile.delete()
+              timeIndexFile.delete()
               segment.recover(config.maxMessageSize)
           }
-        }
-        else {
+        } else {
           error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
           segment.recover(config.maxMessageSize)
         }
+
         segments.put(start, segment)
       }
     }
@@ -216,8 +234,11 @@ class Log(val dir: File,
       val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
       val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
       val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+      val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
+      val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
       val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
                                        index = index,
+                                       timeIndex = timeIndex,
                                        baseOffset = startOffset,
                                        indexIntervalBytes = config.indexInterval,
                                        rollJitterMs = config.randomSegmentJitter,
@@ -243,6 +264,7 @@ class Log(val dir: File,
       recoverLog()
       // reset the index size of the currently active log segment to allow more entries
       activeSegment.index.resize(config.maxIndexSize)
+      activeSegment.timeIndex.resize(config.maxIndexSize)
     }
 
   }
@@ -298,8 +320,7 @@ class Log(val dir: File,
   def close() {
     debug("Closing log " + name)
     lock synchronized {
-      for(seg <- logSegments)
-        seg.close()
+      logSegments.foreach(_.close())
     }
   }
 
@@ -311,9 +332,7 @@ class Log(val dir: File,
    *
    * @param messages The message set to append
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
-   *
    * @throws KafkaStorageException If the append fails due to an I/O error.
-   *
    * @return Information about the appended messages including the first and last offset.
    */
   def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
@@ -335,7 +354,7 @@ class Log(val dir: File,
           val offset = new LongRef(nextOffsetMetadata.messageOffset)
           appendInfo.firstOffset = offset.value
           val now = time.milliseconds
-          val (validatedMessages, messageSizesMaybeChanged) = try {
+          val validateAndOffsetAssignResult = try {
             validMessages.validateMessagesAndAssignOffsets(offset,
                                                            now,
                                                            appendInfo.sourceCodec,
@@ -347,14 +366,16 @@ class Log(val dir: File,
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
-          validMessages = validatedMessages
+          validMessages = validateAndOffsetAssignResult.validatedMessages
+          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
+          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp
           appendInfo.lastOffset = offset.value - 1
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
-            appendInfo.timestamp = now
+            appendInfo.logAppendTime = now
 
           // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
           // format conversion)
-          if (messageSizesMaybeChanged) {
+          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
             for (messageAndOffset <- validMessages.shallowIterator) {
               if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
@@ -383,7 +404,8 @@ class Log(val dir: File,
         val segment = maybeRoll(validMessages.sizeInBytes)
 
         // now append to the log
-        segment.append(appendInfo.firstOffset, validMessages)
+        segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
+          offsetOfLargestTimestamp = appendInfo.offsetOfMaxTimestamp, messages = validMessages)
 
         // increment the log end offset
         updateLogEndOffset(appendInfo.lastOffset + 1)
@@ -424,6 +446,8 @@ class Log(val dir: File,
     var firstOffset, lastOffset = -1L
     var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
     for(messageAndOffset <- messages.shallowIterator) {
       // update the first offset if on the first message
       if(firstOffset < 0)
@@ -447,7 +471,10 @@ class Log(val dir: File,
 
       // check the validity of the message by checking CRC
       m.ensureValid()
-
+      if (m.timestamp > maxTimestamp) {
+        maxTimestamp = m.timestamp
+        offsetOfMaxTimestamp = lastOffset
+      }
       shallowMessageCount += 1
       validBytesCount += messageSize
 
@@ -459,11 +486,12 @@ class Log(val dir: File,
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
 
-    LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
+   *
    * @param messages The message set to trim
    * @param info The general information of the message set
    * @return A trimmed message set. This may be the same as what was passed in or it may not.
@@ -544,6 +572,71 @@ class Log(val dir: File,
   }
 
   /**
+   * Get an offset based on the given timestamp
+   * The offset returned is the offset of the first message whose timestamp is greater than or equals to the
+   * given timestamp.
+   *
+   * If no such message is found, the log end offset is returned.
+   *
+   * `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before
+   * , i.e. it only gives back the timestamp based on the last modification time of the log segments.
+   *
+   * @param timestamp The given timestamp for offset fetching.
+   * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
+   */
+  def fetchOffsetsByTimestamp(timestamp: Long): Long = {
+    debug(s"Searching offset for timestamp $timestamp")
+    val segsArray = logSegments.toArray
+    if (timestamp == OffsetRequest.EarliestTime)
+      return segsArray(0).baseOffset
+
+    // set the target timestamp to be Long.MaxValue if we need to find from the latest.
+    val targetTimestamp = timestamp match {
+      case OffsetRequest.LatestTime => Long.MaxValue
+      case _ => timestamp
+    }
+
+    var foundOffset: Long = -1L
+    // We have this while loop here to make sure we are returning the valid offsets to our best knowledge.
+    // This while loop is to handle the case where the log is truncated during the timestamp search and we did not
+    // find any message. In this case, we need to retry the search.
+    do {
+      val targetSeg = {
+        // Get all the segments whose largest timestamp is smaller than target timestamp
+        val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp)
+        // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
+        if (earlierSegs.length < segsArray.length)
+          segsArray(earlierSegs.length)
+        else
+          earlierSegs.last
+      }
+
+      // First cache the current log end offset
+      val leo = logEndOffset
+      foundOffset = {
+        // Use the cached log end offsets if
+        // 1. user is asking for latest messages, or,
+        // 2. we are searching on the active segment and the target timestamp is greater than the largestTimestamp
+        // after we cached the log end offset. (We have to use the cached log end offsets because it is possible that
+        // some messages with a larger timestamp are appended after we check the largest timestamp. Using log end offset
+        // after the timestamp check might skip those messages.)
+        if (targetTimestamp == Long.MaxValue
+          || (targetTimestamp > targetSeg.largestTimestamp && targetSeg == activeSegment))
+          leo
+        else
+        // The findOffsetByTimestamp() method may return None when the log is truncated during the timestamp search.
+        // In that case we simply set the foundOffset to -1 so that we will search the timestamp again in the
+        // while loop.
+          targetSeg.findOffsetByTimestamp(targetTimestamp) match {
+            case Some(offset) => offset
+            case None => -1L
+          }
+      }
+    } while (foundOffset < 0)
+    foundOffset
+  }
+
+  /**
    * Given a message offset, find its corresponding offset metadata in the log.
    * If the message offset is out of range, return unknown offset metadata
    */
@@ -559,6 +652,7 @@ class Log(val dir: File,
   /**
    * Delete any log segments matching the given predicate function,
    * starting with the oldest segment and moving forward until a segment doesn't match.
+   *
    * @param predicate A function that takes in a single log segment and returns true iff it is deletable
    * @return The number of segments deleted
    */
@@ -609,24 +703,22 @@ class Log(val dir: File,
    * logSegment will be rolled if one of the following conditions met
    * <ol>
    * <li> The logSegment is full
-   * <li> The maxTime has elapsed
+   * <li> The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if
+   * the first message does not have a timestamp)
    * <li> The index is full
    * </ol>
    * @return The currently active segment after (perhaps) rolling to a new segment
    */
   private def maybeRoll(messagesSize: Int): LogSegment = {
     val segment = activeSegment
+    val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > config.segmentMs - segment.rollJitterMs
     if (segment.size > config.segmentSize - messagesSize ||
-        segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||
-        segment.index.isFull) {
-      debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
-            .format(name,
-                    segment.size,
-                    config.segmentSize,
-                    segment.index.entries,
-                    segment.index.maxEntries,
-                    time.milliseconds - segment.created,
-                    config.segmentMs - segment.rollJitterMs))
+        (segment.size > 0 && reachedRollMs) ||
+        segment.index.isFull || segment.timeIndex.isFull) {
+      debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
+          s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
+          s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
+          s"inactive_time_ms = ${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs - segment.rollJitterMs}).")
       roll()
     } else {
       segment
@@ -636,6 +728,7 @@ class Log(val dir: File,
   /**
    * Roll the log over to a new active segment starting with the current logEndOffset.
    * This will trim the index to the exact size of the number of entries it currently contains.
+   *
    * @return The newly rolled segment
    */
   def roll(): LogSegment = {
@@ -644,7 +737,8 @@ class Log(val dir: File,
       val newOffset = logEndOffset
       val logFile = logFilename(dir, newOffset)
       val indexFile = indexFilename(dir, newOffset)
-      for(file <- List(logFile, indexFile); if file.exists) {
+      val timeIndexFile = timeIndexFilename(dir, newOffset)
+      for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
         warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
         file.delete()
       }
@@ -652,8 +746,11 @@ class Log(val dir: File,
       segments.lastEntry() match {
         case null =>
         case entry => {
-          entry.getValue.index.trimToValidSize()
-          entry.getValue.log.trim()
+          val seg = entry.getValue
+          seg.onBecomeInactiveSegment()
+          seg.index.trimToValidSize()
+          seg.timeIndex.trimToValidSize()
+          seg.log.trim()
         }
       }
       val segment = new LogSegment(dir,
@@ -692,6 +789,7 @@ class Log(val dir: File,
 
   /**
    * Flush log segments for all offsets up to offset-1
+   *
    * @param offset The offset to flush up to (non-inclusive); the new recovery point
    */
   def flush(offset: Long) : Unit = {
@@ -723,6 +821,7 @@ class Log(val dir: File,
 
   /**
    * Truncate this log so that it ends with the greatest offset < targetOffset.
+   *
    * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
    */
   private[log] def truncateTo(targetOffset: Long) {
@@ -748,6 +847,7 @@ class Log(val dir: File,
 
   /**
    *  Delete all data in the log and start at the new offset
+   *
    *  @param newOffset The new offset to start the log with
    */
   private[log] def truncateFullyAndStartAt(newOffset: Long) {
@@ -826,6 +926,7 @@ class Log(val dir: File,
 
   /**
    * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
+   *
    * @throws KafkaStorageException if the file can't be renamed and still exists
    */
   private def asyncDeleteSegment(segment: LogSegment) {
@@ -893,6 +994,7 @@ class Log(val dir: File,
   }
   /**
    * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
+   *
    * @param segment The segment to add
    */
   def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
@@ -910,6 +1012,9 @@ object Log {
   /** an index file */
   val IndexFileSuffix = ".index"
 
+  /** a time index file */
+  val TimeIndexFileSuffix = ".timeindex"
+
   /** a file that is scheduled to be deleted */
   val DeletedFileSuffix = ".deleted"
 
@@ -920,13 +1025,14 @@ object Log {
   val SwapFileSuffix = ".swap"
 
   /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility
-    * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
+   * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
   /** TODO: Get rid of CleanShutdownFile in 0.8.2 */
   val CleanShutdownFile = ".kafka_cleanshutdown"
 
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
    * so that ls sorts the files numerically.
+   *
    * @param offset The offset to use in the file name
    * @return The filename
    */
@@ -940,6 +1046,7 @@ object Log {
 
   /**
    * Construct a log file name in the given dir with the given base offset
+   *
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
@@ -948,12 +1055,21 @@ object Log {
 
   /**
    * Construct an index file name in the given dir using the given base offset
+   *
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
   def indexFilename(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
 
+  /**
+   * Construct a time index file name in the given dir using the given base offset
+   *
+   * @param dir The directory in which the log will reside
+   * @param offset The base offset of the log file
+   */
+  def timeIndexFilename(dir: File, offset: Long) =
+    new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
 
   /**
    * Parse the topic and partition out of the directory name of a log

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 25c36e7..d4bb1f2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -334,7 +334,7 @@ private[log] class Cleaner(val id: Int,
     val deleteHorizonMs = 
       log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
         case None => 0L
-        case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
+        case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
     }
         
     // group the segments and clean the groups
@@ -366,23 +366,32 @@ private[log] class Cleaner(val id: Int,
     val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
     logFile.delete()
     val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
+    val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
     indexFile.delete()
+    timeIndexFile.delete()
     val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
     val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
-    val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+    val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
+    val cleaned = new LogSegment(messages, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
 
     try {
       // clean segments into the new destination segment
       for (old <- segments) {
-        val retainDeletes = old.lastModified > deleteHorizonMs
-        info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
-            .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+        val retainDeletes = old.largestTimestamp > deleteHorizonMs
+        info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
+            .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
         cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion)
       }
 
       // trim excess index
       index.trimToValidSize()
 
+      // Append the last index entry
+      cleaned.onBecomeInactiveSegment()
+
+      // trim time index
+      timeIndex.trimToValidSize()
+
       // flush new segment to disk before swap
       cleaned.flush()
 
@@ -422,6 +431,8 @@ private[log] class Cleaner(val id: Int,
       // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
       readBuffer.clear()
       writeBuffer.clear()
+      var maxTimestamp = Message.NoTimestamp
+      var offsetOfMaxTimestamp = -1L
       val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
       throttler.maybeThrottle(messages.sizeInBytes)
       // check each message to see if it is to be retained
@@ -433,6 +444,10 @@ private[log] class Cleaner(val id: Int,
           if (shouldRetainMessage(source, map, retainDeletes, entry)) {
             ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
             stats.recopyMessage(size)
+            if (entry.message.timestamp > maxTimestamp) {
+              maxTimestamp = entry.message.timestamp
+              offsetOfMaxTimestamp = entry.offset
+            }
           }
           messagesRead += 1
         } else {
@@ -443,12 +458,16 @@ private[log] class Cleaner(val id: Int,
           val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
           messages.foreach { messageAndOffset =>
             messagesRead += 1
-            if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset))
+            if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) {
               retainedMessages += messageAndOffset
+              // We need the max timestamp and last offset for time index
+              if (messageAndOffset.message.timestamp > maxTimestamp)
+                maxTimestamp = messageAndOffset.message.timestamp
+            }
             else writeOriginalMessageSet = false
           }
-
-          // There are no messages compacted out, write the original message set back
+          offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
+          // There are no messages compacted out and no message format conversion, write the original message set back
           if (writeOriginalMessageSet)
             ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
           else
@@ -461,7 +480,8 @@ private[log] class Cleaner(val id: Int,
       if (writeBuffer.position > 0) {
         writeBuffer.flip()
         val retained = new ByteBufferMessageSet(writeBuffer)
-        dest.append(retained.head.offset, retained)
+        dest.append(firstOffset = retained.head.offset, largestTimestamp = maxTimestamp,
+          offsetOfLargestTimestamp = offsetOfMaxTimestamp, messages = retained)
         throttler.maybeThrottle(writeBuffer.limit)
       }
       
@@ -569,14 +589,17 @@ private[log] class Cleaner(val id: Int,
       var group = List(segs.head)
       var logSize = segs.head.size
       var indexSize = segs.head.index.sizeInBytes
+      var timeIndexSize = segs.head.timeIndex.sizeInBytes
       segs = segs.tail
       while(segs.nonEmpty &&
             logSize + segs.head.size <= maxSize &&
             indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
+            timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
             segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
         group = segs.head :: group
         logSize += segs.head.size
         indexSize += segs.head.index.sizeInBytes
+        timeIndexSize += segs.head.timeIndex.sizeInBytes
         segs = segs.tail
       }
       grouped ::= group.reverse

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4357ef4..e6c60b9 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -108,7 +108,7 @@ class LogManager(val logDirs: Array[File],
    */
   private def loadLogs(): Unit = {
     info("Loading logs.")
-
+    val startMs = time.milliseconds
     val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
     val jobs = mutable.Map.empty[File, Seq[Future[_]]]
 
@@ -177,7 +177,7 @@ class LogManager(val logDirs: Array[File],
       threadPools.foreach(_.shutdown())
     }
 
-    info("Logs loading complete.")
+    info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
   }
 
   /**
@@ -423,7 +423,7 @@ class LogManager(val logDirs: Array[File],
     if (log.config.retentionMs < 0)
       return 0
     val startMs = time.milliseconds
-    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
+    log.deleteOldSegments(startMs - _.largestTimestamp > log.config.retentionMs)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 6bbc50c..d894020 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -36,6 +36,7 @@ import java.io.{IOException, File}
  *
  * @param log The message set containing log entries
  * @param index The offset index
+ * @param timeIndex The timestamp index
  * @param baseOffset A lower bound on the offsets in this segment
  * @param indexIntervalBytes The approximate number of bytes between entries in the index
  * @param time The time instance
@@ -43,6 +44,7 @@ import java.io.{IOException, File}
 @nonthreadsafe
 class LogSegment(val log: FileMessageSet,
                  val index: OffsetIndex,
+                 val timeIndex: TimeIndex,
                  val baseOffset: Long,
                  val indexIntervalBytes: Int,
                  val rollJitterMs: Long,
@@ -53,9 +55,17 @@ class LogSegment(val log: FileMessageSet,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
 
+  /* The timestamp we used for time based log rolling */
+  private var rollingBasedTimestamp: Option[Long] = None
+
+  /* The maximum timestamp we see so far */
+  @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp
+  @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset
+
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
          new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+         new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
          rollJitterMs,
@@ -70,21 +80,33 @@ class LogSegment(val log: FileMessageSet,
    *
    * It is assumed this method is being called from within a lock.
    *
-   * @param offset The first offset in the message set.
+   * @param firstOffset The first offset in the message set.
+   * @param largestTimestamp The largest timestamp in the message set.
+   * @param offsetOfLargestTimestamp The offset of the message that has the largest timestamp in the messages to append.
    * @param messages The messages to append.
    */
   @nonthreadsafe
-  def append(offset: Long, messages: ByteBufferMessageSet) {
+  def append(firstOffset: Long, largestTimestamp: Long, offsetOfLargestTimestamp: Long, messages: ByteBufferMessageSet) {
     if (messages.sizeInBytes > 0) {
-      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
+      trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at offset %d"
+          .format(messages.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, offsetOfLargestTimestamp))
+      val physicalPosition = log.sizeInBytes()
+      if (physicalPosition == 0)
+        rollingBasedTimestamp = Some(largestTimestamp)
+      // append the messages
+      log.append(messages)
+      // Update the in memory max timestamp and corresponding offset.
+      if (largestTimestamp > maxTimestampSoFar) {
+        maxTimestampSoFar = largestTimestamp
+        offsetOfMaxTimestamp = offsetOfLargestTimestamp
+      }
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
-        index.append(offset, log.sizeInBytes())
-        this.bytesSinceLastIndexEntry = 0
+        index.append(firstOffset, physicalPosition)
+        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
+        bytesSinceLastIndexEntry = 0
       }
-      // append the messages
-      log.append(messages)
-      this.bytesSinceLastIndexEntry += messages.sizeInBytes
+      bytesSinceLastIndexEntry += messages.sizeInBytes
     }
   }
 
@@ -97,13 +119,12 @@ class LogSegment(val log: FileMessageSet,
    * @param offset The offset we want to translate
    * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
    * when omitted, the search will begin at the position in the offset index.
-   *
    * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
    */
   @threadsafe
   private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
     val mapping = index.lookup(offset)
-    log.searchFor(offset, max(mapping.position, startingFilePosition))
+    log.searchForOffset(offset, max(mapping.position, startingFilePosition))
   }
 
   /**
@@ -165,30 +186,34 @@ class LogSegment(val log: FileMessageSet,
    *
    * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
    * is corrupt.
-   *
    * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
   def recover(maxMessageSize: Int): Int = {
     index.truncate()
     index.resize(index.maxIndexSize)
+    timeIndex.truncate()
+    timeIndex.resize(timeIndex.maxIndexSize)
     var validBytes = 0
     var lastIndexEntry = 0
     val iter = log.iterator(maxMessageSize)
+    maxTimestampSoFar = Message.NoTimestamp
     try {
       while(iter.hasNext) {
         val entry = iter.next
         entry.message.ensureValid()
+
+        // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
+        if (entry.message.timestamp > maxTimestampSoFar) {
+          maxTimestampSoFar = entry.message.timestamp
+          offsetOfMaxTimestamp = entry.offset
+        }
+
+        // Build offset index
         if(validBytes - lastIndexEntry > indexIntervalBytes) {
-          // we need to decompress the message, if required, to get the offset of the first uncompressed message
-          val startOffset =
-            entry.message.compressionCodec match {
-              case NoCompressionCodec =>
-                entry.offset
-              case _ =>
-                ByteBufferMessageSet.deepIterator(entry).next().offset
-          }
+          val startOffset = entry.firstOffset
           index.append(startOffset, validBytes)
+          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
           lastIndexEntry = validBytes
         }
         validBytes += MessageSet.entrySize(entry.message)
@@ -200,14 +225,35 @@ class LogSegment(val log: FileMessageSet,
     val truncated = log.sizeInBytes - validBytes
     log.truncateTo(validBytes)
     index.trimToValidSize()
+    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
+    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
+    timeIndex.trimToValidSize()
     truncated
   }
 
+  def loadLargestTimestamp(readToLogEnd: Boolean = false) {
+    // Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
+    val lastTimeIndexEntry = timeIndex.lastEntry
+    maxTimestampSoFar = lastTimeIndexEntry.timestamp
+    offsetOfMaxTimestamp = lastTimeIndexEntry.offset
+    if (readToLogEnd) {
+      val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
+      // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
+      val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
+      if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
+        maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
+        offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
+      }
+    }
+  }
+
+
   override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
 
   /**
    * Truncate off all index and log entries with offsets >= the given offset.
    * If the given offset is larger than the largest message in this segment, do nothing.
+   *
    * @param offset The offset to truncate to
    * @return The number of log bytes truncated
    */
@@ -217,12 +263,19 @@ class LogSegment(val log: FileMessageSet,
     if(mapping == null)
       return 0
     index.truncateTo(offset)
+    timeIndex.truncateTo(offset)
     // after truncation, reset and allocate more space for the (new currently  active) index
     index.resize(index.maxIndexSize)
+    timeIndex.resize(timeIndex.maxIndexSize)
     val bytesTruncated = log.truncateTo(mapping.position)
-    if(log.sizeInBytes == 0)
+    if(log.sizeInBytes == 0) {
       created = time.milliseconds
+      rollingBasedTimestamp = None
+    }
     bytesSinceLastIndexEntry = 0
+    // We may need to reload the max timestamp after truncation.
+    if (maxTimestampSoFar >= 0)
+      loadLargestTimestamp(readToLogEnd = true)
     bytesTruncated
   }
 
@@ -251,6 +304,7 @@ class LogSegment(val log: FileMessageSet,
     LogFlushStats.logFlushTimer.time {
       log.flush()
       index.flush()
+      timeIndex.flush()
     }
   }
 
@@ -270,27 +324,96 @@ class LogSegment(val log: FileMessageSet,
     catch {
       case e: IOException => throw kafkaStorageException("index", e)
     }
+    try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
+    catch {
+      case e: IOException => throw kafkaStorageException("timeindex", e)
+    }
+  }
+
+  /**
+   * Append the largest time index entry to the time index when this log segment become inactive segment.
+   * This entry will be used to decide when to delete the segment.
+   */
+  def onBecomeInactiveSegment() {
+    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
+  }
+
+  /**
+   * The time this segment has waited to be rolled. If the first message in the segment does not have a timestamp,
+   * the time is based on the create time of the segment. Otherwise the time is based on the timestamp of that message.
+   */
+  def timeWaitedForRoll(now: Long) : Long= {
+    // Load the timestamp of the first message into memory
+    if (!rollingBasedTimestamp.isDefined) {
+      val iter = log.iterator
+      if (iter.hasNext)
+        rollingBasedTimestamp = Some(iter.next.message.timestamp)
+      else
+        // If the log is empty, we return 0 as time waited.
+        return now - created
+    }
+    now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else created}
+  }
+
+  /**
+   * Search the message offset based on timestamp.
+   * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
+   * greater than or equals to the target timestamp.
+   *
+   * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
+   * timestamp will be max timestamp in the segment.
+   *
+   * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
+   * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
+   *
+   * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
+   * from the indexed position. This could happen if the log is truncated after we get the indexed position but
+   * before we scan the log from there. In this case we simply return None and the caller will need to check on
+   * the truncated log and maybe retry or even do the search on another log segment.
+   *
+   * @param timestamp The timestamp to search for.
+   * @return an offset which points to the first message whose timestamp is larger than or equals to the
+   *         target timestamp.
+   *         None maybe returned when the log is truncated.
+   */
+  def findOffsetByTimestamp(timestamp: Long): Option[Long] = {
+    if (log.end == log.start) {
+      // The log segment is empty, just return base offset with no timestamp.
+      Some(baseOffset)
+    } else {
+      // Get the index entry with a timestamp less than or equal to the target timestamp
+      val timestampOffset = timeIndex.lookup(timestamp)
+      val position = index.lookup(timestampOffset.offset).position
+      // Search the timestamp
+      log.searchForTimestamp(timestamp, position)
+    }
   }
 
   /**
    * Close this log segment
    */
   def close() {
+    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
     CoreUtils.swallow(index.close)
+    CoreUtils.swallow(timeIndex.close())
     CoreUtils.swallow(log.close)
   }
 
   /**
    * Delete this log segment from the filesystem.
+   *
    * @throws KafkaStorageException if the delete fails.
    */
   def delete() {
     val deletedLog = log.delete()
     val deletedIndex = index.delete()
+    val deletedTimeIndex = timeIndex.delete()
     if(!deletedLog && log.file.exists)
       throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
     if(!deletedIndex && index.file.exists)
       throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
+    if(!deletedTimeIndex && timeIndex.file.exists)
+      throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
   }
 
   /**
@@ -299,10 +422,16 @@ class LogSegment(val log: FileMessageSet,
   def lastModified = log.file.lastModified
 
   /**
+   * The largest timestamp this segment contains.
+   */
+  def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified
+
+  /**
    * Change the last modified time for this log segment
    */
   def lastModified_=(ms: Long) = {
     log.file.setLastModified(ms)
     index.file.setLastModified(ms)
+    timeIndex.file.setLastModified(ms)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 848fe3b..ad1b196 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -17,18 +17,11 @@
 
 package kafka.log
 
-import org.apache.kafka.common.utils.Utils
+import java.io.File
+import java.nio.ByteBuffer
 
-import scala.math._
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.util.concurrent.locks._
-
-import kafka.utils._
 import kafka.utils.CoreUtils.inLock
 import kafka.common.InvalidOffsetException
-import sun.nio.ch.DirectBuffer
 
 /**
  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
@@ -55,137 +48,58 @@ import sun.nio.ch.DirectBuffer
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
-  
-  private val lock = new ReentrantLock
-  
-  /* initialize the memory mapping for this index */
-  @volatile
-  private[this] var mmap: MappedByteBuffer = {
-    val newlyCreated = _file.createNewFile()
-    val raf = new RandomAccessFile(_file, "rw")
-    try {
-      /* pre-allocate the file if necessary */
-      if (newlyCreated) {
-        if (maxIndexSize < 8)
-          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
-        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-      }
-
-      /* memory-map the file */
-      val len = raf.length()
-      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-
-      /* set the position in the index for the next entry */
-      if (newlyCreated)
-        idx.position(0)
-      else
-        // if this is a pre-existing index, assume it is all valid and set position to last entry
-        idx.position(roundToExactMultiple(idx.limit, 8))
-      idx
-    } finally {
-      CoreUtils.swallow(raf.close())
-    }
-  }
+class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1)
+    extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize) {
 
-  /* the number of eight-byte entries currently in the index */
-  @volatile
-  private[this] var _entries = mmap.position / 8
-
-  /* The maximum number of eight-byte entries this index can hold */
-  @volatile
-  private[this] var _maxEntries = mmap.limit / 8
-
-  @volatile
-  private[this] var _lastOffset = readLastEntry.offset
+  override def entrySize = 8
+  
+  /* the last offset in the index */
+  private[this] var _lastOffset = lastEntry.offset
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
-    .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
-
-  /** The maximum number of entries this index can hold */
-  def maxEntries: Int = _maxEntries
-
-  /** The last offset in the index */
-  def lastOffset: Long = _lastOffset
-
-  /** The index file */
-  def file: File = _file
+    .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
 
   /**
    * The last entry in the index
    */
-  def readLastEntry(): OffsetPosition = {
+  private def lastEntry: OffsetPosition = {
     inLock(lock) {
       _entries match {
         case 0 => OffsetPosition(baseOffset, 0)
-        case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1))
+        case s => parseEntry(mmap, s - 1).asInstanceOf[OffsetPosition]
       }
     }
   }
 
+  def lastOffset: Long = _lastOffset
+
   /**
    * Find the largest offset less than or equal to the given targetOffset 
    * and return a pair holding this offset and its corresponding physical file position.
    * 
    * @param targetOffset The offset to look up.
-   * 
-   * @return The offset found and the corresponding file position for this offset. 
-   * If the target offset is smaller than the least entry in the index (or the index is empty),
-   * the pair (baseOffset, 0) is returned.
+   * @return The offset found and the corresponding file position for this offset
+   *         If the target offset is smaller than the least entry in the index (or the index is empty),
+   *         the pair (baseOffset, 0) is returned.
    */
   def lookup(targetOffset: Long): OffsetPosition = {
     maybeLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, targetOffset)
+      val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)
       if(slot == -1)
         OffsetPosition(baseOffset, 0)
       else
-        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
-      }
-  }
-  
-  /**
-   * Find the slot in which the largest offset less than or equal to the given
-   * target offset is stored.
-   * 
-   * @param idx The index buffer
-   * @param targetOffset The offset to look for
-   * 
-   * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
-   */
-  private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
-    // we only store the difference from the base offset so calculate that
-    val relOffset = targetOffset - baseOffset
-    
-    // check if the index is empty
-    if (_entries == 0)
-      return -1
-    
-    // check if the target offset is smaller than the least offset
-    if (relativeOffset(idx, 0) > relOffset)
-      return -1
-      
-    // binary search for the entry
-    var lo = 0
-    var hi = _entries - 1
-    while (lo < hi) {
-      val mid = ceil(hi/2.0 + lo/2.0).toInt
-      val found = relativeOffset(idx, mid)
-      if (found == relOffset)
-        return mid
-      else if (found < relOffset)
-        lo = mid
-      else
-        hi = mid - 1
+        parseEntry(idx, slot).asInstanceOf[OffsetPosition]
     }
-    lo
   }
-  
-  /* return the nth offset relative to the base offset */
-  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
-  
-  /* return the nth physical position */
-  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
+
+  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
+
+  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
+
+  override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
+      OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
+  }
   
   /**
    * Get the nth offset mapping from the index
@@ -208,37 +122,25 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
     inLock(lock) {
       require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
       if (_entries == 0 || offset > _lastOffset) {
-        debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
+        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
         mmap.putInt((offset - baseOffset).toInt)
         mmap.putInt(position)
         _entries += 1
         _lastOffset = offset
-        require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
+        require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
       } else {
         throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
-          .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
+          .format(offset, entries, _lastOffset, file.getAbsolutePath))
       }
     }
   }
-  
-  /**
-   * True iff there are no more slots available in this index
-   */
-  def isFull: Boolean = _entries >= _maxEntries
-  
-  /**
-   * Truncate the entire index, deleting all entries
-   */
-  def truncate() = truncateToEntries(0)
-  
-  /**
-   * Remove all entries from the index which have an offset greater than or equal to the given offset.
-   * Truncating to an offset larger than the largest in the index has no effect.
-   */
-  def truncateTo(offset: Long) {
+
+  override def truncate() = truncateToEntries(0)
+
+  override def truncateTo(offset: Long) {
     inLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, offset)
+      val slot = indexSlotFor(idx, offset, IndexSearchType.KEY)
 
       /* There are 3 cases for choosing the new size
        * 1) if there is no entry in the index <= the offset, delete everything
@@ -262,139 +164,19 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
   private def truncateToEntries(entries: Int) {
     inLock(lock) {
       _entries = entries
-      mmap.position(_entries * 8)
-      _lastOffset = readLastEntry.offset
-    }
-  }
-  
-  /**
-   * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
-   * the file.
-   */
-  def trimToValidSize() {
-    inLock(lock) {
-      resize(_entries * 8)
+      mmap.position(_entries * entrySize)
+      _lastOffset = lastEntry.offset
     }
   }
 
-  /**
-   * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
-   * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
-   * loading segments from disk or truncating back to an old segment where a new log segment became active;
-   * we want to reset the index size to maximum index size to avoid rolling new segment.
-   */
-  def resize(newSize: Int) {
-    inLock(lock) {
-      val raf = new RandomAccessFile(_file, "rw")
-      val roundedNewSize = roundToExactMultiple(newSize, 8)
-      val position = mmap.position
-      
-      /* Windows won't let us modify the file length while the file is mmapped :-( */
-      if (Os.isWindows)
-        forceUnmap(mmap)
-      try {
-        raf.setLength(roundedNewSize)
-        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
-        _maxEntries = mmap.limit / 8
-        mmap.position(position)
-      } finally {
-        CoreUtils.swallow(raf.close())
-      }
-    }
-  }
-  
-  /**
-   * Forcefully free the buffer's mmap. We do this only on windows.
-   */
-  private def forceUnmap(m: MappedByteBuffer) {
-    try {
-      m match {
-        case buffer: DirectBuffer =>
-          val bufferCleaner = buffer.cleaner()
-          /* cleaner can be null if the mapped region has size 0 */
-          if (bufferCleaner != null)
-            bufferCleaner.clean()
-        case _ =>
-      }
-    } catch {
-      case t: Throwable => warn("Error when freeing index buffer", t)
-    }
-  }
-  
-  /**
-   * Flush the data in the index to disk
-   */
-  def flush() {
-    inLock(lock) {
-      mmap.force()
-    }
-  }
-  
-  /**
-   * Delete this index file
-   */
-  def delete(): Boolean = {
-    info("Deleting index " + _file.getAbsolutePath)
-    if (Os.isWindows)
-      CoreUtils.swallow(forceUnmap(mmap))
-    _file.delete()
-  }
-  
-  /** The number of entries in this index */
-  def entries = _entries
-  
-  /**
-   * The number of bytes actually used by this index
-   */
-  def sizeInBytes() = 8 * _entries
-  
-  /** Close the index */
-  def close() {
-    trimToValidSize()
-  }
-  
-  /**
-   * Rename the file that backs this offset index
-   * @throws IOException if rename fails
-   */
-  def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
-    finally _file = f
-  }
-  
-  /**
-   * Do a basic sanity check on this index to detect obvious problems
-   * @throws IllegalArgumentException if any problems are found
-   */
-  def sanityCheck() {
-    require(_entries == 0 || lastOffset > baseOffset,
-            "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
-            .format(_file.getAbsolutePath, lastOffset, baseOffset))
-    val len = _file.length()
-    require(len % 8 == 0,
-            "Index file " + _file.getName + " is corrupt, found " + len +
+  override def sanityCheck() {
+    require(_entries == 0 || _lastOffset > baseOffset,
+            s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
+                s"is ${_lastOffset} which is no larger than the base offset $baseOffset.")
+    val len = file.length()
+    require(len % entrySize == 0,
+            "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
             " bytes which is not positive or not a multiple of 8.")
   }
-  
-  /**
-   * Round a number to the greatest exact multiple of the given factor less than the given number.
-   * E.g. roundToExactMultiple(67, 8) == 64
-   */
-  private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
-  
-  /**
-   * Execute the given function in a lock only if we are running on windows. We do this 
-   * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
-   * and this requires synchronizing reads.
-   */
-  private def maybeLock[T](lock: Lock)(fun: => T): T = {
-    if(Os.isWindows)
-      lock.lock()
-    try {
-      fun
-    } finally {
-      if(Os.isWindows)
-        lock.unlock()
-    }
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/OffsetPosition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/OffsetPosition.scala
deleted file mode 100644
index 24b6dcf..0000000
--- a/core/src/main/scala/kafka/log/OffsetPosition.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-/**
- * The mapping between a logical log offset and the physical position
- * in some log file of the beginning of the message set entry with the
- * given offset.
- */
-case class OffsetPosition(offset: Long, position: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
new file mode 100644
index 0000000..7f24081
--- /dev/null
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.ByteBuffer
+
+import kafka.common.InvalidOffsetException
+import kafka.message.Message
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+
+/**
+ * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
+ * sparse, i.e. it may not hold an entry for all the messages in the segment.
+ *
+ * The index is stored in a file that is preallocated to hold a fixed maximum amount of 12-byte time index entries.
+ * The file format is a series of time index entries. The physical format is a 8 bytes timestamp and a 4 bytes "relative"
+ * offset used in the [[OffsetIndex]]. A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen
+ * before OFFSET is TIMESTAMP. i.e. Any message whose timestamp is greater than TIMESTAMP must come after OFFSET.
+ *
+ * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
+ * storage format.
+ *
+ * The timestamps in the same time index file are guaranteed to be monotonically increasing.
+ *
+ * The index support timestamp lookup for a memory map of this file. The lookup is done using a binary search to find
+ * the offset of the message whose indexed timestamp is closest but smaller or equals to the target timestamp.
+ *
+ * Time index files can be opened in two ways: either as an empty, mutable index that allows appends or
+ * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
+ * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
+ *
+ * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
+ *
+ */
+class TimeIndex(file: File,
+                baseOffset: Long,
+                maxIndexSize: Int = -1)
+    extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize) with Logging {
+
+  override def entrySize = 12
+
+  // We override the full check to reserve the last time index entry slot for the on roll call.
+  override def isFull: Boolean = entries >= maxEntries - 1
+
+  private def timestamp(buffer: ByteBuffer, n: Int): Long = buffer.getLong(n * entrySize)
+
+  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8)
+
+  /**
+   * The last entry in the index
+   */
+  def lastEntry: TimestampOffset = {
+    inLock(lock) {
+      _entries match {
+        case 0 => TimestampOffset(Message.NoTimestamp, baseOffset)
+        case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset]
+      }
+    }
+  }
+
+  /**
+   * Get the nth timestamp mapping from the time index
+   * @param n The entry number in the time index
+   * @return The timestamp/offset pair at that entry
+   */
+  def entry(n: Int): TimestampOffset = {
+    maybeLock(lock) {
+      if(n >= _entries)
+        throw new IllegalArgumentException("Attempt to fetch the %dth entry from a time index of size %d.".format(n, _entries))
+      val idx = mmap.duplicate
+      TimestampOffset(timestamp(idx, n), relativeOffset(idx, n))
+    }
+  }
+
+  override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
+    TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n))
+  }
+
+  /**
+   * Attempt to append a time index entry to the time index.
+   * The new entry is appended only if both the timestamp and offsets are greater than the last appended timestamp and
+   * the last appended offset.
+   *
+   * @param timestamp The timestamp of the new time index entry
+   * @param offset The offset of the new time index entry
+   * @param skipFullCheck To skip checking whether the segment is full or not. We only skip the check when the segment
+   *                      gets rolled or the segment is closed.
+   */
+  def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {
+    inLock(lock) {
+      if (!skipFullCheck)
+        require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
+      // We do not throw exception when the offset equals to the offset of last entry. That means we are trying
+      // to insert the same time index entry as the last entry.
+      // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
+      // because that could happen in the following two scenarios:
+      // 1. An log segment is closed.
+      // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
+      if (_entries != 0 && offset < lastEntry.offset)
+        throw new InvalidOffsetException("Attempt to append an offset (%d) to slot %d no larger than the last offset appended (%d) to %s."
+          .format(offset, _entries, lastEntry.offset, file.getAbsolutePath))
+      if (_entries != 0 && timestamp < lastEntry.timestamp)
+        throw new IllegalStateException("Attempt to append an timestamp (%d) to slot %d no larger than the last timestamp appended (%d) to %s."
+            .format(timestamp, _entries, lastEntry.timestamp, file.getAbsolutePath))
+      // We only append to the time index when the timestamp is greater than the last inserted timestamp.
+      // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
+      // index will be empty.
+      if (timestamp > lastEntry.timestamp) {
+        debug("Adding index entry %d => %d to %s.".format(timestamp, offset, file.getName))
+        mmap.putLong(timestamp)
+        mmap.putInt((offset - baseOffset).toInt)
+        _entries += 1
+        require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
+      }
+    }
+  }
+
+  /**
+   * Find the time index entry whose timestamp is less than or equal to the given timestamp.
+   * If the target timestamp is smaller than the least timestamp in the time index, (NoTimestamp, baseOffset) is
+   * returned.
+   *
+   * @param targetTimestamp The timestamp to look up.
+   * @return The time index entry found.
+   */
+  def lookup(targetTimestamp: Long): TimestampOffset = {
+    maybeLock(lock) {
+      val idx = mmap.duplicate
+      val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
+      if (slot == -1)
+        TimestampOffset(Message.NoTimestamp, baseOffset)
+      else {
+        val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset]
+        TimestampOffset(entry.timestamp, entry.offset)
+      }
+    }
+  }
+
+  override def truncate() = truncateToEntries(0)
+
+  /**
+   * Remove all entries from the index which have an offset greater than or equal to the given offset.
+   * Truncating to an offset larger than the largest in the index has no effect.
+   */
+  override def truncateTo(offset: Long) {
+    inLock(lock) {
+      val idx = mmap.duplicate
+      val slot = indexSlotFor(idx, offset, IndexSearchType.VALUE)
+
+      /* There are 3 cases for choosing the new size
+       * 1) if there is no entry in the index <= the offset, delete everything
+       * 2) if there is an entry for this exact offset, delete it and everything larger than it
+       * 3) if there is no entry for this offset, delete everything larger than the next smallest
+       */
+      val newEntries =
+        if(slot < 0)
+          0
+        else if(relativeOffset(idx, slot) == offset - baseOffset)
+          slot
+        else
+          slot + 1
+      truncateToEntries(newEntries)
+    }
+  }
+
+  /**
+   * Truncates index to a known number of entries.
+   */
+  private def truncateToEntries(entries: Int) {
+    inLock(lock) {
+      _entries = entries
+      mmap.position(_entries * entrySize)
+    }
+  }
+
+  override def sanityCheck() {
+    val entry = lastEntry
+    val lastTimestamp = entry.timestamp
+    val lastOffset = entry.offset
+    require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)),
+      s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " +
+          s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}")
+    require(_entries == 0 || lastOffset >= baseOffset,
+      s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
+          s"is $lastOffset which is smaller than the first offset $baseOffset")
+    val len = file.length()
+    require(len % entrySize == 0,
+      "Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
+          " bytes which is not positive or not a multiple of 12.")
+  }
+}
\ No newline at end of file