You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/08/19 17:07:12 UTC

[2/2] kafka git commit: KAFKA-3163; Add time based index to Kafka.

KAFKA-3163; Add time based index to Kafka.

This patch is for KIP-33.
https://cwiki.apache.org/confluence/display/KAFKA/KIP-33+-+Add+a+time+based+log+index

Author: Jiangjie Qin <be...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>, Liquan Pei <li...@gmail.com>

Closes #1215 from becketqin/KAFKA-3163


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/79d3fd2b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/79d3fd2b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/79d3fd2b

Branch: refs/heads/trunk
Commit: 79d3fd2bf0e5c89ff74a2988c403882ae8a9852e
Parents: 05ed54b
Author: Jiangjie Qin <be...@gmail.com>
Authored: Fri Aug 19 10:07:07 2016 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Aug 19 10:07:07 2016 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/log/AbstractIndex.scala    | 287 +++++++++++++++++
 .../main/scala/kafka/log/FileMessageSet.scala   |  78 ++++-
 core/src/main/scala/kafka/log/IndexEntry.scala  |  46 +++
 core/src/main/scala/kafka/log/Log.scala         | 196 +++++++++---
 core/src/main/scala/kafka/log/LogCleaner.scala  |  41 ++-
 core/src/main/scala/kafka/log/LogManager.scala  |   6 +-
 core/src/main/scala/kafka/log/LogSegment.scala  | 169 ++++++++--
 core/src/main/scala/kafka/log/OffsetIndex.scala | 306 +++----------------
 .../main/scala/kafka/log/OffsetPosition.scala   |  25 --
 core/src/main/scala/kafka/log/TimeIndex.scala   | 208 +++++++++++++
 .../kafka/message/ByteBufferMessageSet.scala    |  91 ++++--
 .../scala/kafka/message/MessageAndOffset.scala  |   8 +
 .../src/main/scala/kafka/server/KafkaApis.scala |   3 +-
 .../scala/kafka/server/ReplicaManager.scala     |   2 +-
 .../scala/kafka/tools/DumpLogSegments.scala     | 122 +++++++-
 .../test/scala/unit/kafka/log/CleanerTest.scala |   5 +-
 .../unit/kafka/log/FileMessageSetTest.scala     |  16 +-
 .../scala/unit/kafka/log/LogManagerTest.scala   |   4 +-
 .../scala/unit/kafka/log/LogSegmentTest.scala   | 114 +++++--
 .../src/test/scala/unit/kafka/log/LogTest.scala | 152 +++++++--
 .../scala/unit/kafka/log/TimeIndexTest.scala    |  97 ++++++
 .../message/ByteBufferMessageSetTest.scala      | 135 +++++---
 .../test/scala/unit/kafka/utils/TestUtils.scala |   3 +-
 docs/upgrade.html                               |  16 +-
 24 files changed, 1631 insertions(+), 499 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
new file mode 100644
index 0000000..d594f18
--- /dev/null
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.{File, RandomAccessFile}
+import java.nio.{ByteBuffer, MappedByteBuffer}
+import java.nio.channels.FileChannel
+import java.util.concurrent.locks.{Lock, ReentrantLock}
+
+import kafka.log.IndexSearchType.IndexSearchEntity
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.{CoreUtils, Logging, Os}
+import org.apache.kafka.common.utils.Utils
+import sun.nio.ch.DirectBuffer
+
+import scala.math.ceil
+
+/**
+ * The abstract index class which holds entry format agnostic methods.
+ *
+ * @param _file The index file
+ * @param baseOffset the base offset of the segment that this index is corresponding to.
+ * @param maxIndexSize The maximum index size in bytes.
+ */
+abstract class AbstractIndex[K, V](@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1)
+    extends Logging {
+
+  protected def entrySize: Int
+
+  protected val lock = new ReentrantLock
+
+  @volatile
+  protected var mmap: MappedByteBuffer = {
+    val newlyCreated = _file.createNewFile()
+    val raf = new RandomAccessFile(_file, "rw")
+    try {
+      /* pre-allocate the file if necessary */
+      if(newlyCreated) {
+        if(maxIndexSize < entrySize)
+          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+        raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize))
+      }
+
+      /* memory-map the file */
+      val len = raf.length()
+      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+
+      /* set the position in the index for the next entry */
+      if(newlyCreated)
+        idx.position(0)
+      else
+        // if this is a pre-existing index, assume it is valid and set position to last entry
+        idx.position(roundDownToExactMultiple(idx.limit, entrySize))
+      idx
+    } finally {
+      CoreUtils.swallow(raf.close())
+    }
+  }
+
+  /**
+   * The maximum number of entries this index can hold
+   */
+  @volatile
+  private[this] var _maxEntries = mmap.limit / entrySize
+
+  /** The number of entries in this index */
+  @volatile
+  protected var _entries = mmap.position / entrySize
+
+  /**
+   * True iff there are no more slots available in this index
+   */
+  def isFull: Boolean = _entries >= _maxEntries
+
+  def maxEntries: Int = _maxEntries
+
+  def entries: Int = _entries
+
+  /**
+   * The index file
+   */
+  def file: File = _file
+
+  /**
+   * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
+   * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
+   * loading segments from disk or truncating back to an old segment where a new log segment became active;
+   * we want to reset the index size to maximum index size to avoid rolling new segment.
+   */
+  def resize(newSize: Int) {
+    inLock(lock) {
+      val raf = new RandomAccessFile(_file, "rw")
+      val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
+      val position = mmap.position
+
+      /* Windows won't let us modify the file length while the file is mmapped :-( */
+      if(Os.isWindows)
+        forceUnmap(mmap)
+      try {
+        raf.setLength(roundedNewSize)
+        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+        _maxEntries = mmap.limit / entrySize
+        mmap.position(position)
+      } finally {
+        CoreUtils.swallow(raf.close())
+      }
+    }
+  }
+
+  /**
+   * Rename the file that backs this offset index
+   *
+   * @throws IOException if rename fails
+   */
+  def renameTo(f: File) {
+    try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
+    finally _file = f
+  }
+
+  /**
+   * Flush the data in the index to disk
+   */
+  def flush() {
+    inLock(lock) {
+      mmap.force()
+    }
+  }
+
+  /**
+   * Delete this index file
+   */
+  def delete(): Boolean = {
+    info(s"Deleting index ${_file.getAbsolutePath}")
+    if(Os.isWindows)
+      CoreUtils.swallow(forceUnmap(mmap))
+    _file.delete()
+  }
+
+  /**
+   * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
+   * the file.
+   */
+  def trimToValidSize() {
+    inLock(lock) {
+      resize(entrySize * _entries)
+    }
+  }
+
+  /**
+   * The number of bytes actually used by this index
+   */
+  def sizeInBytes = entrySize * _entries
+
+  /** Close the index */
+  def close() {
+    trimToValidSize()
+  }
+
+  /**
+   * Do a basic sanity check on this index to detect obvious problems
+   *
+   * @throws IllegalArgumentException if any problems are found
+   */
+  def sanityCheck(): Unit
+
+  /**
+   * Remove all the entries from the index.
+   */
+  def truncate(): Unit
+
+  /**
+   * Remove all entries from the index which have an offset greater than or equal to the given offset.
+   * Truncating to an offset larger than the largest in the index has no effect.
+   */
+  def truncateTo(offset: Long): Unit
+
+  /**
+   * Forcefully free the buffer's mmap. We do this only on windows.
+   */
+  protected def forceUnmap(m: MappedByteBuffer) {
+    try {
+      m match {
+        case buffer: DirectBuffer =>
+          val bufferCleaner = buffer.cleaner()
+          /* cleaner can be null if the mapped region has size 0 */
+          if (bufferCleaner != null)
+            bufferCleaner.clean()
+        case _ =>
+      }
+    } catch {
+      case t: Throwable => error("Error when freeing index buffer", t)
+    }
+  }
+
+  /**
+   * Execute the given function in a lock only if we are running on windows. We do this
+   * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
+   * and this requires synchronizing reads.
+   */
+  protected def maybeLock[T](lock: Lock)(fun: => T): T = {
+    if(Os.isWindows)
+      lock.lock()
+    try {
+      fun
+    } finally {
+      if(Os.isWindows)
+        lock.unlock()
+    }
+  }
+
+  /**
+   * To parse an entry in the index.
+   *
+   * @param buffer the buffer of this memory mapped index.
+   * @param n the slot
+   * @return the index entry stored in the given slot.
+   */
+  protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry
+
+  /**
+   * Find the slot in which the largest entry less than or equal to the given target key or value is stored.
+   * The comparison is made using the `IndexEntry.compareTo()` method.
+   *
+   * @param idx The index buffer
+   * @param target The index key to look for
+   * @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
+   */
+  protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
+    // check if the index is empty
+    if(_entries == 0)
+      return -1
+
+    // check if the target offset is smaller than the least offset
+    if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
+      return -1
+
+    // binary search for the entry
+    var lo = 0
+    var hi = _entries - 1
+    while(lo < hi) {
+      val mid = ceil(hi/2.0 + lo/2.0).toInt
+      val found = parseEntry(idx, mid)
+      val compareResult = compareIndexEntry(found, target, searchEntity)
+      if(compareResult > 0)
+        hi = mid - 1
+      else if(compareResult < 0)
+        lo = mid
+      else
+        return mid
+    }
+    lo
+  }
+
+  private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {
+    searchEntity match {
+      case IndexSearchType.KEY => indexEntry.indexKey.compareTo(target)
+      case IndexSearchType.VALUE => indexEntry.indexValue.compareTo(target)
+    }
+  }
+
+  /**
+   * Round a number to the greatest exact multiple of the given factor less than the given number.
+   * E.g. roundDownToExactMultiple(67, 8) == 64
+   */
+  private def roundDownToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
+
+}
+
+object IndexSearchType extends Enumeration {
+  type IndexSearchEntity = Value
+  val KEY, VALUE = Value
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/FileMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/FileMessageSet.scala b/core/src/main/scala/kafka/log/FileMessageSet.scala
index 8e92f95..5763042 100755
--- a/core/src/main/scala/kafka/log/FileMessageSet.scala
+++ b/core/src/main/scala/kafka/log/FileMessageSet.scala
@@ -117,7 +117,13 @@ class FileMessageSet private[kafka](@volatile var file: File,
     new FileMessageSet(file,
                        channel,
                        start = this.start + position,
-                       end = math.min(this.start + position + size, sizeInBytes()))
+                       end = {
+                         // Handle the integer overflow
+                         if (this.start + position + size < 0)
+                           sizeInBytes()
+                         else
+                           math.min(this.start + position + size, sizeInBytes())
+                       })
   }
 
   /**
@@ -126,7 +132,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
    * @param targetOffset The offset to search for.
    * @param startingPosition The starting position in the file to begin searching from.
    */
-  def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
+  def searchForOffset(targetOffset: Long, startingPosition: Int): OffsetPosition = {
     var position = startingPosition
     val buffer = ByteBuffer.allocate(MessageSet.LogOverhead)
     val size = sizeInBytes()
@@ -135,7 +141,7 @@ class FileMessageSet private[kafka](@volatile var file: File,
       channel.read(buffer, position)
       if(buffer.hasRemaining)
         throw new IllegalStateException("Failed to read complete buffer for targetOffset %d startPosition %d in %s"
-                                        .format(targetOffset, startingPosition, file.getAbsolutePath))
+          .format(targetOffset, startingPosition, file.getAbsolutePath))
       buffer.rewind()
       val offset = buffer.getLong()
       if(offset >= targetOffset)
@@ -149,6 +155,72 @@ class FileMessageSet private[kafka](@volatile var file: File,
   }
 
   /**
+   * Search forward for the message whose timestamp is greater than or equals to the target timestamp.
+   *
+   * The search will stop immediately when it sees a message in format version before 0.10.0. This is to avoid
+   * scanning the entire log when all the messages are still in old format.
+   *
+   * @param targetTimestamp The timestamp to search for.
+   * @param startingPosition The starting position to search.
+   * @return None, if no message exists at or after the starting position.
+   *         Some(the_next_offset_to_read) otherwise.
+   */
+  def searchForTimestamp(targetTimestamp: Long, startingPosition: Int): Option[Long] = {
+    var maxTimestampChecked = Message.NoTimestamp
+    var lastOffsetChecked = -1L
+    val messagesToSearch = read(startingPosition, sizeInBytes)
+    for (messageAndOffset <- messagesToSearch) {
+      val message = messageAndOffset.message
+      lastOffsetChecked = messageAndOffset.offset
+      // Stop searching once we see message format before 0.10.0.
+      // This equivalent as treating message without timestamp has the largest timestamp.
+      // We do this to avoid scanning the entire log if no message has a timestamp.
+      if (message.magic == Message.MagicValue_V0)
+        return Some(messageAndOffset.offset)
+      else if (message.timestamp >= targetTimestamp) {
+        // We found a message
+        message.compressionCodec match {
+          case NoCompressionCodec =>
+            return Some(messageAndOffset.offset)
+          case _ =>
+            // Iterate over the inner messages to get the exact offset.
+            for (innerMessage <- ByteBufferMessageSet.deepIterator(messageAndOffset)) {
+              val timestamp = innerMessage.message.timestamp
+              if (timestamp >= targetTimestamp)
+                return Some(innerMessage.offset)
+            }
+            throw new IllegalStateException(s"The message set (max timestamp = ${message.timestamp}, max offset = ${messageAndOffset.offset}" +
+                s" should contain target timestamp $targetTimestamp but it does not.")
+        }
+      } else
+        maxTimestampChecked = math.max(maxTimestampChecked, message.timestamp)
+    }
+
+    if (lastOffsetChecked >= 0)
+      Some(lastOffsetChecked + 1)
+    else
+      None
+  }
+
+  /**
+   * Return the largest timestamp of the messages after a given position in this file message set.
+   * @param startingPosition The starting position.
+   * @return The largest timestamp of the messages after the given position.
+   */
+  def largestTimestampAfter(startingPosition: Int): TimestampOffset = {
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
+    val messagesToSearch = read(startingPosition, Int.MaxValue)
+    for (messageAndOffset <- messagesToSearch) {
+      if (messageAndOffset.message.timestamp > maxTimestamp) {
+        maxTimestamp = messageAndOffset.message.timestamp
+        offsetOfMaxTimestamp = messageAndOffset.offset
+      }
+    }
+    TimestampOffset(maxTimestamp, offsetOfMaxTimestamp)
+  }
+
+  /**
    * Write some of this set to the given channel.
    * @param destChannel The channel to write to.
    * @param writePosition The position in the message set to begin writing from.

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/IndexEntry.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/IndexEntry.scala b/core/src/main/scala/kafka/log/IndexEntry.scala
new file mode 100644
index 0000000..2f5a6a7
--- /dev/null
+++ b/core/src/main/scala/kafka/log/IndexEntry.scala
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+sealed trait IndexEntry {
+  // We always use Long for both key and value to avoid boxing.
+  def indexKey: Long
+  def indexValue: Long
+}
+
+/**
+ * The mapping between a logical log offset and the physical position
+ * in some log file of the beginning of the message set entry with the
+ * given offset.
+ */
+case class OffsetPosition(offset: Long, position: Int) extends IndexEntry {
+  override def indexKey = offset
+  override def indexValue = position.toLong
+}
+
+
+/**
+ * The mapping between a timestamp to a message offset. The entry means that any message whose timestamp is greater
+ * than that timestamp must be at or after that offset.
+ * @param timestamp The max timestamp before the given offset.
+ * @param offset The message offset.
+ */
+case class TimestampOffset(timestamp: Long, offset: Long) extends IndexEntry {
+  override def indexKey = timestamp
+  override def indexValue = offset
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 1a7719a..b4aa470 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -17,6 +17,7 @@
 
 package kafka.log
 
+import kafka.api.OffsetRequest
 import kafka.utils._
 import kafka.message._
 import kafka.common._
@@ -30,19 +31,22 @@ import java.text.NumberFormat
 import org.apache.kafka.common.errors.{CorruptRecordException, OffsetOutOfRangeException, RecordBatchTooLargeException, RecordTooLargeException}
 import org.apache.kafka.common.record.TimestampType
 
-import scala.collection.JavaConversions
+import scala.collection.{Seq, JavaConversions}
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.common.utils.Utils
 
 object LogAppendInfo {
-  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
+  val UnknownLogAppendInfo = LogAppendInfo(-1, -1, Message.NoTimestamp, -1L, Message.NoTimestamp, NoCompressionCodec, NoCompressionCodec, -1, -1, offsetsMonotonic = false)
 }
 
 /**
  * Struct to hold various quantities we compute about each message set before appending to the log
+ *
  * @param firstOffset The first offset in the message set
  * @param lastOffset The last offset in the message set
- * @param timestamp The log append time (if used) of the message set, otherwise Message.NoTimestamp
+ * @param maxTimestamp The maximum timestamp of the message set.
+ * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
+ * @param logAppendTime The log append time (if used) of the message set, otherwise Message.NoTimestamp
  * @param sourceCodec The source codec used in the message set (send by the producer)
  * @param targetCodec The target codec of the message set(after applying the broker compression configuration if any)
  * @param shallowCount The number of shallow messages
@@ -51,7 +55,9 @@ object LogAppendInfo {
  */
 case class LogAppendInfo(var firstOffset: Long,
                          var lastOffset: Long,
-                         var timestamp: Long,
+                         var maxTimestamp: Long,
+                         var offsetOfMaxTimestamp: Long,
+                         var logAppendTime: Long,
                          sourceCodec: CompressionCodec,
                          targetCodec: CompressionCodec,
                          shallowCount: Int,
@@ -95,7 +101,7 @@ class Log(val dir: File,
     else
       0
   }
-
+  val t = time.milliseconds
   /* the actual segments of the log */
   private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
   loadSegments()
@@ -105,7 +111,8 @@ class Log(val dir: File,
 
   val topicAndPartition: TopicAndPartition = Log.parseTopicPartitionName(dir)
 
-  info("Completed load of log %s with log end offset %d".format(name, logEndOffset))
+  info("Completed load of log %s with %d log segments and log end offset %d in %d ms"
+      .format(name, segments.size(), logEndOffset, time.milliseconds - t))
 
   val tags = Map("topic" -> topicAndPartition.topic, "partition" -> topicAndPartition.partition.toString)
 
@@ -167,12 +174,17 @@ class Log(val dir: File,
       }
     }
 
-    // now do a second pass and load all the .log and .index files
+    // now do a second pass and load all the .log and all index files
     for(file <- dir.listFiles if file.isFile) {
       val filename = file.getName
-      if(filename.endsWith(IndexFileSuffix)) {
+      if(filename.endsWith(IndexFileSuffix) || filename.endsWith(TimeIndexFileSuffix)) {
         // if it is an index file, make sure it has a corresponding .log file
-        val logFile = new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+        val logFile =
+          if (filename.endsWith(TimeIndexFileSuffix))
+            new File(file.getAbsolutePath.replace(TimeIndexFileSuffix, LogFileSuffix))
+          else
+            new File(file.getAbsolutePath.replace(IndexFileSuffix, LogFileSuffix))
+
         if(!logFile.exists) {
           warn("Found an orphaned index file, %s, with no corresponding log file.".format(file.getAbsolutePath))
           file.delete()
@@ -181,6 +193,9 @@ class Log(val dir: File,
         // if its a log file, load the corresponding log segment
         val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
         val indexFile = Log.indexFilename(dir, start)
+        val timeIndexFile = Log.timeIndexFilename(dir, start)
+
+        val indexFileExists = indexFile.exists()
         val segment = new LogSegment(dir = dir,
                                      startOffset = start,
                                      indexIntervalBytes = config.indexInterval,
@@ -189,20 +204,23 @@ class Log(val dir: File,
                                      time = time,
                                      fileAlreadyExists = true)
 
-        if(indexFile.exists()) {
+        if (indexFileExists) {
           try {
-              segment.index.sanityCheck()
+            segment.index.sanityCheck()
+            segment.timeIndex.sanityCheck()
           } catch {
             case e: java.lang.IllegalArgumentException =>
-              warn("Found a corrupted index file, %s, deleting and rebuilding index. Error Message: %s".format(indexFile.getAbsolutePath, e.getMessage))
+              warn(s"Found a corrupted index file due to ${e.getMessage}}. deleting ${timeIndexFile.getAbsolutePath}, " +
+                s"${indexFile.getAbsolutePath} and rebuilding index...")
               indexFile.delete()
+              timeIndexFile.delete()
               segment.recover(config.maxMessageSize)
           }
-        }
-        else {
+        } else {
           error("Could not find index file corresponding to log file %s, rebuilding index...".format(segment.log.file.getAbsolutePath))
           segment.recover(config.maxMessageSize)
         }
+
         segments.put(start, segment)
       }
     }
@@ -216,8 +234,11 @@ class Log(val dir: File,
       val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
       val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
       val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+      val timeIndexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, TimeIndexFileSuffix) + SwapFileSuffix)
+      val timeIndex = new TimeIndex(timeIndexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
       val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
                                        index = index,
+                                       timeIndex = timeIndex,
                                        baseOffset = startOffset,
                                        indexIntervalBytes = config.indexInterval,
                                        rollJitterMs = config.randomSegmentJitter,
@@ -243,6 +264,7 @@ class Log(val dir: File,
       recoverLog()
       // reset the index size of the currently active log segment to allow more entries
       activeSegment.index.resize(config.maxIndexSize)
+      activeSegment.timeIndex.resize(config.maxIndexSize)
     }
 
   }
@@ -298,8 +320,7 @@ class Log(val dir: File,
   def close() {
     debug("Closing log " + name)
     lock synchronized {
-      for(seg <- logSegments)
-        seg.close()
+      logSegments.foreach(_.close())
     }
   }
 
@@ -311,9 +332,7 @@ class Log(val dir: File,
    *
    * @param messages The message set to append
    * @param assignOffsets Should the log assign offsets to this message set or blindly apply what it is given
-   *
    * @throws KafkaStorageException If the append fails due to an I/O error.
-   *
    * @return Information about the appended messages including the first and last offset.
    */
   def append(messages: ByteBufferMessageSet, assignOffsets: Boolean = true): LogAppendInfo = {
@@ -335,7 +354,7 @@ class Log(val dir: File,
           val offset = new LongRef(nextOffsetMetadata.messageOffset)
           appendInfo.firstOffset = offset.value
           val now = time.milliseconds
-          val (validatedMessages, messageSizesMaybeChanged) = try {
+          val validateAndOffsetAssignResult = try {
             validMessages.validateMessagesAndAssignOffsets(offset,
                                                            now,
                                                            appendInfo.sourceCodec,
@@ -347,14 +366,16 @@ class Log(val dir: File,
           } catch {
             case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
           }
-          validMessages = validatedMessages
+          validMessages = validateAndOffsetAssignResult.validatedMessages
+          appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
+          appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.offsetOfMaxTimestamp
           appendInfo.lastOffset = offset.value - 1
           if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
-            appendInfo.timestamp = now
+            appendInfo.logAppendTime = now
 
           // re-validate message sizes if there's a possibility that they have changed (due to re-compression or message
           // format conversion)
-          if (messageSizesMaybeChanged) {
+          if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
             for (messageAndOffset <- validMessages.shallowIterator) {
               if (MessageSet.entrySize(messageAndOffset.message) > config.maxMessageSize) {
                 // we record the original message set size instead of the trimmed size
@@ -383,7 +404,8 @@ class Log(val dir: File,
         val segment = maybeRoll(validMessages.sizeInBytes)
 
         // now append to the log
-        segment.append(appendInfo.firstOffset, validMessages)
+        segment.append(firstOffset = appendInfo.firstOffset, largestTimestamp = appendInfo.maxTimestamp,
+          offsetOfLargestTimestamp = appendInfo.offsetOfMaxTimestamp, messages = validMessages)
 
         // increment the log end offset
         updateLogEndOffset(appendInfo.lastOffset + 1)
@@ -424,6 +446,8 @@ class Log(val dir: File,
     var firstOffset, lastOffset = -1L
     var sourceCodec: CompressionCodec = NoCompressionCodec
     var monotonic = true
+    var maxTimestamp = Message.NoTimestamp
+    var offsetOfMaxTimestamp = -1L
     for(messageAndOffset <- messages.shallowIterator) {
       // update the first offset if on the first message
       if(firstOffset < 0)
@@ -447,7 +471,10 @@ class Log(val dir: File,
 
       // check the validity of the message by checking CRC
       m.ensureValid()
-
+      if (m.timestamp > maxTimestamp) {
+        maxTimestamp = m.timestamp
+        offsetOfMaxTimestamp = lastOffset
+      }
       shallowMessageCount += 1
       validBytesCount += messageSize
 
@@ -459,11 +486,12 @@ class Log(val dir: File,
     // Apply broker-side compression if any
     val targetCodec = BrokerCompressionCodec.getTargetCompressionCodec(config.compressionType, sourceCodec)
 
-    LogAppendInfo(firstOffset, lastOffset, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
+    LogAppendInfo(firstOffset, lastOffset, maxTimestamp, offsetOfMaxTimestamp, Message.NoTimestamp, sourceCodec, targetCodec, shallowMessageCount, validBytesCount, monotonic)
   }
 
   /**
    * Trim any invalid bytes from the end of this message set (if there are any)
+   *
    * @param messages The message set to trim
    * @param info The general information of the message set
    * @return A trimmed message set. This may be the same as what was passed in or it may not.
@@ -544,6 +572,71 @@ class Log(val dir: File,
   }
 
   /**
+   * Get an offset based on the given timestamp
+   * The offset returned is the offset of the first message whose timestamp is greater than or equals to the
+   * given timestamp.
+   *
+   * If no such message is found, the log end offset is returned.
+   *
+   * `NOTE:` OffsetRequest V0 does not use this method, the behavior of OffsetRequest V0 remains the same as before
+   * , i.e. it only gives back the timestamp based on the last modification time of the log segments.
+   *
+   * @param timestamp The given timestamp for offset fetching.
+   * @return The offset of the first message whose timestamp is greater than or equals to the given timestamp.
+   */
+  def fetchOffsetsByTimestamp(timestamp: Long): Long = {
+    debug(s"Searching offset for timestamp $timestamp")
+    val segsArray = logSegments.toArray
+    if (timestamp == OffsetRequest.EarliestTime)
+      return segsArray(0).baseOffset
+
+    // set the target timestamp to be Long.MaxValue if we need to find from the latest.
+    val targetTimestamp = timestamp match {
+      case OffsetRequest.LatestTime => Long.MaxValue
+      case _ => timestamp
+    }
+
+    var foundOffset: Long = -1L
+    // We have this while loop here to make sure we are returning the valid offsets to our best knowledge.
+    // This while loop is to handle the case where the log is truncated during the timestamp search and we did not
+    // find any message. In this case, we need to retry the search.
+    do {
+      val targetSeg = {
+        // Get all the segments whose largest timestamp is smaller than target timestamp
+        val earlierSegs = segsArray.takeWhile(_.largestTimestamp < targetTimestamp)
+        // We need to search the first segment whose largest timestamp is greater than the target timestamp if there is one.
+        if (earlierSegs.length < segsArray.length)
+          segsArray(earlierSegs.length)
+        else
+          earlierSegs.last
+      }
+
+      // First cache the current log end offset
+      val leo = logEndOffset
+      foundOffset = {
+        // Use the cached log end offsets if
+        // 1. user is asking for latest messages, or,
+        // 2. we are searching on the active segment and the target timestamp is greater than the largestTimestamp
+        // after we cached the log end offset. (We have to use the cached log end offsets because it is possible that
+        // some messages with a larger timestamp are appended after we check the largest timestamp. Using log end offset
+        // after the timestamp check might skip those messages.)
+        if (targetTimestamp == Long.MaxValue
+          || (targetTimestamp > targetSeg.largestTimestamp && targetSeg == activeSegment))
+          leo
+        else
+        // The findOffsetByTimestamp() method may return None when the log is truncated during the timestamp search.
+        // In that case we simply set the foundOffset to -1 so that we will search the timestamp again in the
+        // while loop.
+          targetSeg.findOffsetByTimestamp(targetTimestamp) match {
+            case Some(offset) => offset
+            case None => -1L
+          }
+      }
+    } while (foundOffset < 0)
+    foundOffset
+  }
+
+  /**
    * Given a message offset, find its corresponding offset metadata in the log.
    * If the message offset is out of range, return unknown offset metadata
    */
@@ -559,6 +652,7 @@ class Log(val dir: File,
   /**
    * Delete any log segments matching the given predicate function,
    * starting with the oldest segment and moving forward until a segment doesn't match.
+   *
    * @param predicate A function that takes in a single log segment and returns true iff it is deletable
    * @return The number of segments deleted
    */
@@ -609,24 +703,22 @@ class Log(val dir: File,
    * logSegment will be rolled if one of the following conditions met
    * <ol>
    * <li> The logSegment is full
-   * <li> The maxTime has elapsed
+   * <li> The maxTime has elapsed since the timestamp of first message in the segment (or since the create time if
+   * the first message does not have a timestamp)
    * <li> The index is full
    * </ol>
    * @return The currently active segment after (perhaps) rolling to a new segment
    */
   private def maybeRoll(messagesSize: Int): LogSegment = {
     val segment = activeSegment
+    val reachedRollMs = segment.timeWaitedForRoll(time.milliseconds) > config.segmentMs - segment.rollJitterMs
     if (segment.size > config.segmentSize - messagesSize ||
-        segment.size > 0 && time.milliseconds - segment.created > config.segmentMs - segment.rollJitterMs ||
-        segment.index.isFull) {
-      debug("Rolling new log segment in %s (log_size = %d/%d, index_size = %d/%d, age_ms = %d/%d)."
-            .format(name,
-                    segment.size,
-                    config.segmentSize,
-                    segment.index.entries,
-                    segment.index.maxEntries,
-                    time.milliseconds - segment.created,
-                    config.segmentMs - segment.rollJitterMs))
+        (segment.size > 0 && reachedRollMs) ||
+        segment.index.isFull || segment.timeIndex.isFull) {
+      debug(s"Rolling new log segment in $name (log_size = ${segment.size}/${config.segmentSize}}, " +
+          s"index_size = ${segment.index.entries}/${segment.index.maxEntries}, " +
+          s"time_index_size = ${segment.timeIndex.entries}/${segment.timeIndex.maxEntries}, " +
+          s"inactive_time_ms = ${segment.timeWaitedForRoll(time.milliseconds)}/${config.segmentMs - segment.rollJitterMs}).")
       roll()
     } else {
       segment
@@ -636,6 +728,7 @@ class Log(val dir: File,
   /**
    * Roll the log over to a new active segment starting with the current logEndOffset.
    * This will trim the index to the exact size of the number of entries it currently contains.
+   *
    * @return The newly rolled segment
    */
   def roll(): LogSegment = {
@@ -644,7 +737,8 @@ class Log(val dir: File,
       val newOffset = logEndOffset
       val logFile = logFilename(dir, newOffset)
       val indexFile = indexFilename(dir, newOffset)
-      for(file <- List(logFile, indexFile); if file.exists) {
+      val timeIndexFile = timeIndexFilename(dir, newOffset)
+      for(file <- List(logFile, indexFile, timeIndexFile); if file.exists) {
         warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
         file.delete()
       }
@@ -652,8 +746,11 @@ class Log(val dir: File,
       segments.lastEntry() match {
         case null =>
         case entry => {
-          entry.getValue.index.trimToValidSize()
-          entry.getValue.log.trim()
+          val seg = entry.getValue
+          seg.onBecomeInactiveSegment()
+          seg.index.trimToValidSize()
+          seg.timeIndex.trimToValidSize()
+          seg.log.trim()
         }
       }
       val segment = new LogSegment(dir,
@@ -692,6 +789,7 @@ class Log(val dir: File,
 
   /**
    * Flush log segments for all offsets up to offset-1
+   *
    * @param offset The offset to flush up to (non-inclusive); the new recovery point
    */
   def flush(offset: Long) : Unit = {
@@ -723,6 +821,7 @@ class Log(val dir: File,
 
   /**
    * Truncate this log so that it ends with the greatest offset < targetOffset.
+   *
    * @param targetOffset The offset to truncate to, an upper bound on all offsets in the log after truncation is complete.
    */
   private[log] def truncateTo(targetOffset: Long) {
@@ -748,6 +847,7 @@ class Log(val dir: File,
 
   /**
    *  Delete all data in the log and start at the new offset
+   *
    *  @param newOffset The new offset to start the log with
    */
   private[log] def truncateFullyAndStartAt(newOffset: Long) {
@@ -826,6 +926,7 @@ class Log(val dir: File,
 
   /**
    * Perform an asynchronous delete on the given file if it exists (otherwise do nothing)
+   *
    * @throws KafkaStorageException if the file can't be renamed and still exists
    */
   private def asyncDeleteSegment(segment: LogSegment) {
@@ -893,6 +994,7 @@ class Log(val dir: File,
   }
   /**
    * Add the given segment to the segments in this log. If this segment replaces an existing segment, delete it.
+   *
    * @param segment The segment to add
    */
   def addSegment(segment: LogSegment) = this.segments.put(segment.baseOffset, segment)
@@ -910,6 +1012,9 @@ object Log {
   /** an index file */
   val IndexFileSuffix = ".index"
 
+  /** a time index file */
+  val TimeIndexFileSuffix = ".timeindex"
+
   /** a file that is scheduled to be deleted */
   val DeletedFileSuffix = ".deleted"
 
@@ -920,13 +1025,14 @@ object Log {
   val SwapFileSuffix = ".swap"
 
   /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility
-    * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
+   * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
   /** TODO: Get rid of CleanShutdownFile in 0.8.2 */
   val CleanShutdownFile = ".kafka_cleanshutdown"
 
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
    * so that ls sorts the files numerically.
+   *
    * @param offset The offset to use in the file name
    * @return The filename
    */
@@ -940,6 +1046,7 @@ object Log {
 
   /**
    * Construct a log file name in the given dir with the given base offset
+   *
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
@@ -948,12 +1055,21 @@ object Log {
 
   /**
    * Construct an index file name in the given dir using the given base offset
+   *
    * @param dir The directory in which the log will reside
    * @param offset The base offset of the log file
    */
   def indexFilename(dir: File, offset: Long) =
     new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
 
+  /**
+   * Construct a time index file name in the given dir using the given base offset
+   *
+   * @param dir The directory in which the log will reside
+   * @param offset The base offset of the log file
+   */
+  def timeIndexFilename(dir: File, offset: Long) =
+    new File(dir, filenamePrefixFromOffset(offset) + TimeIndexFileSuffix)
 
   /**
    * Parse the topic and partition out of the directory name of a log

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 25c36e7..d4bb1f2 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -334,7 +334,7 @@ private[log] class Cleaner(val id: Int,
     val deleteHorizonMs = 
       log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
         case None => 0L
-        case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
+        case Some(seg) => seg.largestTimestamp - log.config.deleteRetentionMs
     }
         
     // group the segments and clean the groups
@@ -366,23 +366,32 @@ private[log] class Cleaner(val id: Int,
     val logFile = new File(segments.head.log.file.getPath + Log.CleanedFileSuffix)
     logFile.delete()
     val indexFile = new File(segments.head.index.file.getPath + Log.CleanedFileSuffix)
+    val timeIndexFile = new File(segments.head.timeIndex.file.getPath + Log.CleanedFileSuffix)
     indexFile.delete()
+    timeIndexFile.delete()
     val messages = new FileMessageSet(logFile, fileAlreadyExists = false, initFileSize = log.initFileSize(), preallocate = log.config.preallocate)
     val index = new OffsetIndex(indexFile, segments.head.baseOffset, segments.head.index.maxIndexSize)
-    val cleaned = new LogSegment(messages, index, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
+    val timeIndex = new TimeIndex(timeIndexFile, segments.head.baseOffset, segments.head.timeIndex.maxIndexSize)
+    val cleaned = new LogSegment(messages, index, timeIndex, segments.head.baseOffset, segments.head.indexIntervalBytes, log.config.randomSegmentJitter, time)
 
     try {
       // clean segments into the new destination segment
       for (old <- segments) {
-        val retainDeletes = old.lastModified > deleteHorizonMs
-        info("Cleaning segment %s in log %s (last modified %s) into %s, %s deletes."
-            .format(old.baseOffset, log.name, new Date(old.lastModified), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+        val retainDeletes = old.largestTimestamp > deleteHorizonMs
+        info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
+            .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
         cleanInto(log.topicAndPartition, old, cleaned, map, retainDeletes, log.config.messageFormatVersion.messageFormatVersion)
       }
 
       // trim excess index
       index.trimToValidSize()
 
+      // Append the last index entry
+      cleaned.onBecomeInactiveSegment()
+
+      // trim time index
+      timeIndex.trimToValidSize()
+
       // flush new segment to disk before swap
       cleaned.flush()
 
@@ -422,6 +431,8 @@ private[log] class Cleaner(val id: Int,
       // read a chunk of messages and copy any that are to be retained to the write buffer to be written out
       readBuffer.clear()
       writeBuffer.clear()
+      var maxTimestamp = Message.NoTimestamp
+      var offsetOfMaxTimestamp = -1L
       val messages = new ByteBufferMessageSet(source.log.readInto(readBuffer, position))
       throttler.maybeThrottle(messages.sizeInBytes)
       // check each message to see if it is to be retained
@@ -433,6 +444,10 @@ private[log] class Cleaner(val id: Int,
           if (shouldRetainMessage(source, map, retainDeletes, entry)) {
             ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
             stats.recopyMessage(size)
+            if (entry.message.timestamp > maxTimestamp) {
+              maxTimestamp = entry.message.timestamp
+              offsetOfMaxTimestamp = entry.offset
+            }
           }
           messagesRead += 1
         } else {
@@ -443,12 +458,16 @@ private[log] class Cleaner(val id: Int,
           val retainedMessages = new mutable.ArrayBuffer[MessageAndOffset]
           messages.foreach { messageAndOffset =>
             messagesRead += 1
-            if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset))
+            if (shouldRetainMessage(source, map, retainDeletes, messageAndOffset)) {
               retainedMessages += messageAndOffset
+              // We need the max timestamp and last offset for time index
+              if (messageAndOffset.message.timestamp > maxTimestamp)
+                maxTimestamp = messageAndOffset.message.timestamp
+            }
             else writeOriginalMessageSet = false
           }
-
-          // There are no messages compacted out, write the original message set back
+          offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
+          // There are no messages compacted out and no message format conversion, write the original message set back
           if (writeOriginalMessageSet)
             ByteBufferMessageSet.writeMessage(writeBuffer, entry.message, entry.offset)
           else
@@ -461,7 +480,8 @@ private[log] class Cleaner(val id: Int,
       if (writeBuffer.position > 0) {
         writeBuffer.flip()
         val retained = new ByteBufferMessageSet(writeBuffer)
-        dest.append(retained.head.offset, retained)
+        dest.append(firstOffset = retained.head.offset, largestTimestamp = maxTimestamp,
+          offsetOfLargestTimestamp = offsetOfMaxTimestamp, messages = retained)
         throttler.maybeThrottle(writeBuffer.limit)
       }
       
@@ -569,14 +589,17 @@ private[log] class Cleaner(val id: Int,
       var group = List(segs.head)
       var logSize = segs.head.size
       var indexSize = segs.head.index.sizeInBytes
+      var timeIndexSize = segs.head.timeIndex.sizeInBytes
       segs = segs.tail
       while(segs.nonEmpty &&
             logSize + segs.head.size <= maxSize &&
             indexSize + segs.head.index.sizeInBytes <= maxIndexSize &&
+            timeIndexSize + segs.head.timeIndex.sizeInBytes <= maxIndexSize &&
             segs.head.index.lastOffset - group.last.index.baseOffset <= Int.MaxValue) {
         group = segs.head :: group
         logSize += segs.head.size
         indexSize += segs.head.index.sizeInBytes
+        timeIndexSize += segs.head.timeIndex.sizeInBytes
         segs = segs.tail
       }
       grouped ::= group.reverse

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 4357ef4..e6c60b9 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -108,7 +108,7 @@ class LogManager(val logDirs: Array[File],
    */
   private def loadLogs(): Unit = {
     info("Loading logs.")
-
+    val startMs = time.milliseconds
     val threadPools = mutable.ArrayBuffer.empty[ExecutorService]
     val jobs = mutable.Map.empty[File, Seq[Future[_]]]
 
@@ -177,7 +177,7 @@ class LogManager(val logDirs: Array[File],
       threadPools.foreach(_.shutdown())
     }
 
-    info("Logs loading complete.")
+    info(s"Logs loading complete in ${time.milliseconds - startMs} ms.")
   }
 
   /**
@@ -423,7 +423,7 @@ class LogManager(val logDirs: Array[File],
     if (log.config.retentionMs < 0)
       return 0
     val startMs = time.milliseconds
-    log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs)
+    log.deleteOldSegments(startMs - _.largestTimestamp > log.config.retentionMs)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 6bbc50c..d894020 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -36,6 +36,7 @@ import java.io.{IOException, File}
  *
  * @param log The message set containing log entries
  * @param index The offset index
+ * @param timeIndex The timestamp index
  * @param baseOffset A lower bound on the offsets in this segment
  * @param indexIntervalBytes The approximate number of bytes between entries in the index
  * @param time The time instance
@@ -43,6 +44,7 @@ import java.io.{IOException, File}
 @nonthreadsafe
 class LogSegment(val log: FileMessageSet,
                  val index: OffsetIndex,
+                 val timeIndex: TimeIndex,
                  val baseOffset: Long,
                  val indexIntervalBytes: Int,
                  val rollJitterMs: Long,
@@ -53,9 +55,17 @@ class LogSegment(val log: FileMessageSet,
   /* the number of bytes since we last added an entry in the offset index */
   private var bytesSinceLastIndexEntry = 0
 
+  /* The timestamp we used for time based log rolling */
+  private var rollingBasedTimestamp: Option[Long] = None
+
+  /* The maximum timestamp we see so far */
+  @volatile private var maxTimestampSoFar = timeIndex.lastEntry.timestamp
+  @volatile private var offsetOfMaxTimestamp = timeIndex.lastEntry.offset
+
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
          new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+         new TimeIndex(Log.timeIndexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
          rollJitterMs,
@@ -70,21 +80,33 @@ class LogSegment(val log: FileMessageSet,
    *
    * It is assumed this method is being called from within a lock.
    *
-   * @param offset The first offset in the message set.
+   * @param firstOffset The first offset in the message set.
+   * @param largestTimestamp The largest timestamp in the message set.
+   * @param offsetOfLargestTimestamp The offset of the message that has the largest timestamp in the messages to append.
    * @param messages The messages to append.
    */
   @nonthreadsafe
-  def append(offset: Long, messages: ByteBufferMessageSet) {
+  def append(firstOffset: Long, largestTimestamp: Long, offsetOfLargestTimestamp: Long, messages: ByteBufferMessageSet) {
     if (messages.sizeInBytes > 0) {
-      trace("Inserting %d bytes at offset %d at position %d".format(messages.sizeInBytes, offset, log.sizeInBytes()))
+      trace("Inserting %d bytes at offset %d at position %d with largest timestamp %d at offset %d"
+          .format(messages.sizeInBytes, firstOffset, log.sizeInBytes(), largestTimestamp, offsetOfLargestTimestamp))
+      val physicalPosition = log.sizeInBytes()
+      if (physicalPosition == 0)
+        rollingBasedTimestamp = Some(largestTimestamp)
+      // append the messages
+      log.append(messages)
+      // Update the in memory max timestamp and corresponding offset.
+      if (largestTimestamp > maxTimestampSoFar) {
+        maxTimestampSoFar = largestTimestamp
+        offsetOfMaxTimestamp = offsetOfLargestTimestamp
+      }
       // append an entry to the index (if needed)
       if(bytesSinceLastIndexEntry > indexIntervalBytes) {
-        index.append(offset, log.sizeInBytes())
-        this.bytesSinceLastIndexEntry = 0
+        index.append(firstOffset, physicalPosition)
+        timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
+        bytesSinceLastIndexEntry = 0
       }
-      // append the messages
-      log.append(messages)
-      this.bytesSinceLastIndexEntry += messages.sizeInBytes
+      bytesSinceLastIndexEntry += messages.sizeInBytes
     }
   }
 
@@ -97,13 +119,12 @@ class LogSegment(val log: FileMessageSet,
    * @param offset The offset we want to translate
    * @param startingFilePosition A lower bound on the file position from which to begin the search. This is purely an optimization and
    * when omitted, the search will begin at the position in the offset index.
-   *
    * @return The position in the log storing the message with the least offset >= the requested offset or null if no message meets this criteria.
    */
   @threadsafe
   private[log] def translateOffset(offset: Long, startingFilePosition: Int = 0): OffsetPosition = {
     val mapping = index.lookup(offset)
-    log.searchFor(offset, max(mapping.position, startingFilePosition))
+    log.searchForOffset(offset, max(mapping.position, startingFilePosition))
   }
 
   /**
@@ -165,30 +186,34 @@ class LogSegment(val log: FileMessageSet,
    *
    * @param maxMessageSize A bound the memory allocation in the case of a corrupt message size--we will assume any message larger than this
    * is corrupt.
-   *
    * @return The number of bytes truncated from the log
    */
   @nonthreadsafe
   def recover(maxMessageSize: Int): Int = {
     index.truncate()
     index.resize(index.maxIndexSize)
+    timeIndex.truncate()
+    timeIndex.resize(timeIndex.maxIndexSize)
     var validBytes = 0
     var lastIndexEntry = 0
     val iter = log.iterator(maxMessageSize)
+    maxTimestampSoFar = Message.NoTimestamp
     try {
       while(iter.hasNext) {
         val entry = iter.next
         entry.message.ensureValid()
+
+        // The max timestamp should have been put in the outer message, so we don't need to iterate over the inner messages.
+        if (entry.message.timestamp > maxTimestampSoFar) {
+          maxTimestampSoFar = entry.message.timestamp
+          offsetOfMaxTimestamp = entry.offset
+        }
+
+        // Build offset index
         if(validBytes - lastIndexEntry > indexIntervalBytes) {
-          // we need to decompress the message, if required, to get the offset of the first uncompressed message
-          val startOffset =
-            entry.message.compressionCodec match {
-              case NoCompressionCodec =>
-                entry.offset
-              case _ =>
-                ByteBufferMessageSet.deepIterator(entry).next().offset
-          }
+          val startOffset = entry.firstOffset
           index.append(startOffset, validBytes)
+          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp)
           lastIndexEntry = validBytes
         }
         validBytes += MessageSet.entrySize(entry.message)
@@ -200,14 +225,35 @@ class LogSegment(val log: FileMessageSet,
     val truncated = log.sizeInBytes - validBytes
     log.truncateTo(validBytes)
     index.trimToValidSize()
+    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
+    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
+    timeIndex.trimToValidSize()
     truncated
   }
 
+  def loadLargestTimestamp(readToLogEnd: Boolean = false) {
+    // Get the last time index entry. If the time index is empty, it will return (-1, baseOffset)
+    val lastTimeIndexEntry = timeIndex.lastEntry
+    maxTimestampSoFar = lastTimeIndexEntry.timestamp
+    offsetOfMaxTimestamp = lastTimeIndexEntry.offset
+    if (readToLogEnd) {
+      val offsetPosition = index.lookup(lastTimeIndexEntry.offset)
+      // Scan the rest of the messages to see if there is a larger timestamp after the last time index entry.
+      val maxTimestampOffsetAfterLastEntry = log.largestTimestampAfter(offsetPosition.position)
+      if (maxTimestampOffsetAfterLastEntry.timestamp > lastTimeIndexEntry.timestamp) {
+        maxTimestampSoFar = maxTimestampOffsetAfterLastEntry.timestamp
+        offsetOfMaxTimestamp = maxTimestampOffsetAfterLastEntry.offset
+      }
+    }
+  }
+
+
   override def toString = "LogSegment(baseOffset=" + baseOffset + ", size=" + size + ")"
 
   /**
    * Truncate off all index and log entries with offsets >= the given offset.
    * If the given offset is larger than the largest message in this segment, do nothing.
+   *
    * @param offset The offset to truncate to
    * @return The number of log bytes truncated
    */
@@ -217,12 +263,19 @@ class LogSegment(val log: FileMessageSet,
     if(mapping == null)
       return 0
     index.truncateTo(offset)
+    timeIndex.truncateTo(offset)
     // after truncation, reset and allocate more space for the (new currently  active) index
     index.resize(index.maxIndexSize)
+    timeIndex.resize(timeIndex.maxIndexSize)
     val bytesTruncated = log.truncateTo(mapping.position)
-    if(log.sizeInBytes == 0)
+    if(log.sizeInBytes == 0) {
       created = time.milliseconds
+      rollingBasedTimestamp = None
+    }
     bytesSinceLastIndexEntry = 0
+    // We may need to reload the max timestamp after truncation.
+    if (maxTimestampSoFar >= 0)
+      loadLargestTimestamp(readToLogEnd = true)
     bytesTruncated
   }
 
@@ -251,6 +304,7 @@ class LogSegment(val log: FileMessageSet,
     LogFlushStats.logFlushTimer.time {
       log.flush()
       index.flush()
+      timeIndex.flush()
     }
   }
 
@@ -270,27 +324,96 @@ class LogSegment(val log: FileMessageSet,
     catch {
       case e: IOException => throw kafkaStorageException("index", e)
     }
+    try timeIndex.renameTo(new File(CoreUtils.replaceSuffix(timeIndex.file.getPath, oldSuffix, newSuffix)))
+    catch {
+      case e: IOException => throw kafkaStorageException("timeindex", e)
+    }
+  }
+
+  /**
+   * Append the largest time index entry to the time index when this log segment become inactive segment.
+   * This entry will be used to decide when to delete the segment.
+   */
+  def onBecomeInactiveSegment() {
+    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
+  }
+
+  /**
+   * The time this segment has waited to be rolled. If the first message in the segment does not have a timestamp,
+   * the time is based on the create time of the segment. Otherwise the time is based on the timestamp of that message.
+   */
+  def timeWaitedForRoll(now: Long) : Long= {
+    // Load the timestamp of the first message into memory
+    if (!rollingBasedTimestamp.isDefined) {
+      val iter = log.iterator
+      if (iter.hasNext)
+        rollingBasedTimestamp = Some(iter.next.message.timestamp)
+      else
+        // If the log is empty, we return 0 as time waited.
+        return now - created
+    }
+    now - {if (rollingBasedTimestamp.get >= 0) rollingBasedTimestamp.get else created}
+  }
+
+  /**
+   * Search the message offset based on timestamp.
+   * This method returns an option of TimestampOffset. The offset is the offset of the first message whose timestamp is
+   * greater than or equals to the target timestamp.
+   *
+   * If all the message in the segment have smaller timestamps, the returned offset will be last offset + 1 and the
+   * timestamp will be max timestamp in the segment.
+   *
+   * If all the messages in the segment have larger timestamps, or no message in the segment has a timestamp,
+   * the returned the offset will be the base offset of the segment and the timestamp will be Message.NoTimestamp.
+   *
+   * This methods only returns None when the log is not empty but we did not see any messages when scanning the log
+   * from the indexed position. This could happen if the log is truncated after we get the indexed position but
+   * before we scan the log from there. In this case we simply return None and the caller will need to check on
+   * the truncated log and maybe retry or even do the search on another log segment.
+   *
+   * @param timestamp The timestamp to search for.
+   * @return an offset which points to the first message whose timestamp is larger than or equals to the
+   *         target timestamp.
+   *         None maybe returned when the log is truncated.
+   */
+  def findOffsetByTimestamp(timestamp: Long): Option[Long] = {
+    if (log.end == log.start) {
+      // The log segment is empty, just return base offset with no timestamp.
+      Some(baseOffset)
+    } else {
+      // Get the index entry with a timestamp less than or equal to the target timestamp
+      val timestampOffset = timeIndex.lookup(timestamp)
+      val position = index.lookup(timestampOffset.offset).position
+      // Search the timestamp
+      log.searchForTimestamp(timestamp, position)
+    }
   }
 
   /**
    * Close this log segment
    */
   def close() {
+    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestamp, skipFullCheck = true)
     CoreUtils.swallow(index.close)
+    CoreUtils.swallow(timeIndex.close())
     CoreUtils.swallow(log.close)
   }
 
   /**
    * Delete this log segment from the filesystem.
+   *
    * @throws KafkaStorageException if the delete fails.
    */
   def delete() {
     val deletedLog = log.delete()
     val deletedIndex = index.delete()
+    val deletedTimeIndex = timeIndex.delete()
     if(!deletedLog && log.file.exists)
       throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
     if(!deletedIndex && index.file.exists)
       throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
+    if(!deletedTimeIndex && timeIndex.file.exists)
+      throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
   }
 
   /**
@@ -299,10 +422,16 @@ class LogSegment(val log: FileMessageSet,
   def lastModified = log.file.lastModified
 
   /**
+   * The largest timestamp this segment contains.
+   */
+  def largestTimestamp = if (maxTimestampSoFar >= 0) maxTimestampSoFar else lastModified
+
+  /**
    * Change the last modified time for this log segment
    */
   def lastModified_=(ms: Long) = {
     log.file.setLastModified(ms)
     index.file.setLastModified(ms)
+    timeIndex.file.setLastModified(ms)
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 848fe3b..ad1b196 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -17,18 +17,11 @@
 
 package kafka.log
 
-import org.apache.kafka.common.utils.Utils
+import java.io.File
+import java.nio.ByteBuffer
 
-import scala.math._
-import java.io._
-import java.nio._
-import java.nio.channels._
-import java.util.concurrent.locks._
-
-import kafka.utils._
 import kafka.utils.CoreUtils.inLock
 import kafka.common.InvalidOffsetException
-import sun.nio.ch.DirectBuffer
 
 /**
  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
@@ -55,137 +48,58 @@ import sun.nio.ch.DirectBuffer
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
-  
-  private val lock = new ReentrantLock
-  
-  /* initialize the memory mapping for this index */
-  @volatile
-  private[this] var mmap: MappedByteBuffer = {
-    val newlyCreated = _file.createNewFile()
-    val raf = new RandomAccessFile(_file, "rw")
-    try {
-      /* pre-allocate the file if necessary */
-      if (newlyCreated) {
-        if (maxIndexSize < 8)
-          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
-        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-      }
-
-      /* memory-map the file */
-      val len = raf.length()
-      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-
-      /* set the position in the index for the next entry */
-      if (newlyCreated)
-        idx.position(0)
-      else
-        // if this is a pre-existing index, assume it is all valid and set position to last entry
-        idx.position(roundToExactMultiple(idx.limit, 8))
-      idx
-    } finally {
-      CoreUtils.swallow(raf.close())
-    }
-  }
+class OffsetIndex(file: File, baseOffset: Long, maxIndexSize: Int = -1)
+    extends AbstractIndex[Long, Int](file, baseOffset, maxIndexSize) {
 
-  /* the number of eight-byte entries currently in the index */
-  @volatile
-  private[this] var _entries = mmap.position / 8
-
-  /* The maximum number of eight-byte entries this index can hold */
-  @volatile
-  private[this] var _maxEntries = mmap.limit / 8
-
-  @volatile
-  private[this] var _lastOffset = readLastEntry.offset
+  override def entrySize = 8
+  
+  /* the last offset in the index */
+  private[this] var _lastOffset = lastEntry.offset
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
-    .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
-
-  /** The maximum number of entries this index can hold */
-  def maxEntries: Int = _maxEntries
-
-  /** The last offset in the index */
-  def lastOffset: Long = _lastOffset
-
-  /** The index file */
-  def file: File = _file
+    .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
 
   /**
    * The last entry in the index
    */
-  def readLastEntry(): OffsetPosition = {
+  private def lastEntry: OffsetPosition = {
     inLock(lock) {
       _entries match {
         case 0 => OffsetPosition(baseOffset, 0)
-        case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1))
+        case s => parseEntry(mmap, s - 1).asInstanceOf[OffsetPosition]
       }
     }
   }
 
+  def lastOffset: Long = _lastOffset
+
   /**
    * Find the largest offset less than or equal to the given targetOffset 
    * and return a pair holding this offset and its corresponding physical file position.
    * 
    * @param targetOffset The offset to look up.
-   * 
-   * @return The offset found and the corresponding file position for this offset. 
-   * If the target offset is smaller than the least entry in the index (or the index is empty),
-   * the pair (baseOffset, 0) is returned.
+   * @return The offset found and the corresponding file position for this offset
+   *         If the target offset is smaller than the least entry in the index (or the index is empty),
+   *         the pair (baseOffset, 0) is returned.
    */
   def lookup(targetOffset: Long): OffsetPosition = {
     maybeLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, targetOffset)
+      val slot = indexSlotFor(idx, targetOffset, IndexSearchType.KEY)
       if(slot == -1)
         OffsetPosition(baseOffset, 0)
       else
-        OffsetPosition(baseOffset + relativeOffset(idx, slot), physical(idx, slot))
-      }
-  }
-  
-  /**
-   * Find the slot in which the largest offset less than or equal to the given
-   * target offset is stored.
-   * 
-   * @param idx The index buffer
-   * @param targetOffset The offset to look for
-   * 
-   * @return The slot found or -1 if the least entry in the index is larger than the target offset or the index is empty
-   */
-  private def indexSlotFor(idx: ByteBuffer, targetOffset: Long): Int = {
-    // we only store the difference from the base offset so calculate that
-    val relOffset = targetOffset - baseOffset
-    
-    // check if the index is empty
-    if (_entries == 0)
-      return -1
-    
-    // check if the target offset is smaller than the least offset
-    if (relativeOffset(idx, 0) > relOffset)
-      return -1
-      
-    // binary search for the entry
-    var lo = 0
-    var hi = _entries - 1
-    while (lo < hi) {
-      val mid = ceil(hi/2.0 + lo/2.0).toInt
-      val found = relativeOffset(idx, mid)
-      if (found == relOffset)
-        return mid
-      else if (found < relOffset)
-        lo = mid
-      else
-        hi = mid - 1
+        parseEntry(idx, slot).asInstanceOf[OffsetPosition]
     }
-    lo
   }
-  
-  /* return the nth offset relative to the base offset */
-  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8)
-  
-  /* return the nth physical position */
-  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * 8 + 4)
+
+  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize)
+
+  private def physical(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 4)
+
+  override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
+      OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
+  }
   
   /**
    * Get the nth offset mapping from the index
@@ -208,37 +122,25 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
     inLock(lock) {
       require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
       if (_entries == 0 || offset > _lastOffset) {
-        debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
+        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
         mmap.putInt((offset - baseOffset).toInt)
         mmap.putInt(position)
         _entries += 1
         _lastOffset = offset
-        require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
+        require(_entries * entrySize == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
       } else {
         throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
-          .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
+          .format(offset, entries, _lastOffset, file.getAbsolutePath))
       }
     }
   }
-  
-  /**
-   * True iff there are no more slots available in this index
-   */
-  def isFull: Boolean = _entries >= _maxEntries
-  
-  /**
-   * Truncate the entire index, deleting all entries
-   */
-  def truncate() = truncateToEntries(0)
-  
-  /**
-   * Remove all entries from the index which have an offset greater than or equal to the given offset.
-   * Truncating to an offset larger than the largest in the index has no effect.
-   */
-  def truncateTo(offset: Long) {
+
+  override def truncate() = truncateToEntries(0)
+
+  override def truncateTo(offset: Long) {
     inLock(lock) {
       val idx = mmap.duplicate
-      val slot = indexSlotFor(idx, offset)
+      val slot = indexSlotFor(idx, offset, IndexSearchType.KEY)
 
       /* There are 3 cases for choosing the new size
        * 1) if there is no entry in the index <= the offset, delete everything
@@ -262,139 +164,19 @@ class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long,
   private def truncateToEntries(entries: Int) {
     inLock(lock) {
       _entries = entries
-      mmap.position(_entries * 8)
-      _lastOffset = readLastEntry.offset
-    }
-  }
-  
-  /**
-   * Trim this segment to fit just the valid entries, deleting all trailing unwritten bytes from
-   * the file.
-   */
-  def trimToValidSize() {
-    inLock(lock) {
-      resize(_entries * 8)
+      mmap.position(_entries * entrySize)
+      _lastOffset = lastEntry.offset
     }
   }
 
-  /**
-   * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
-   * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
-   * loading segments from disk or truncating back to an old segment where a new log segment became active;
-   * we want to reset the index size to maximum index size to avoid rolling new segment.
-   */
-  def resize(newSize: Int) {
-    inLock(lock) {
-      val raf = new RandomAccessFile(_file, "rw")
-      val roundedNewSize = roundToExactMultiple(newSize, 8)
-      val position = mmap.position
-      
-      /* Windows won't let us modify the file length while the file is mmapped :-( */
-      if (Os.isWindows)
-        forceUnmap(mmap)
-      try {
-        raf.setLength(roundedNewSize)
-        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
-        _maxEntries = mmap.limit / 8
-        mmap.position(position)
-      } finally {
-        CoreUtils.swallow(raf.close())
-      }
-    }
-  }
-  
-  /**
-   * Forcefully free the buffer's mmap. We do this only on windows.
-   */
-  private def forceUnmap(m: MappedByteBuffer) {
-    try {
-      m match {
-        case buffer: DirectBuffer =>
-          val bufferCleaner = buffer.cleaner()
-          /* cleaner can be null if the mapped region has size 0 */
-          if (bufferCleaner != null)
-            bufferCleaner.clean()
-        case _ =>
-      }
-    } catch {
-      case t: Throwable => warn("Error when freeing index buffer", t)
-    }
-  }
-  
-  /**
-   * Flush the data in the index to disk
-   */
-  def flush() {
-    inLock(lock) {
-      mmap.force()
-    }
-  }
-  
-  /**
-   * Delete this index file
-   */
-  def delete(): Boolean = {
-    info("Deleting index " + _file.getAbsolutePath)
-    if (Os.isWindows)
-      CoreUtils.swallow(forceUnmap(mmap))
-    _file.delete()
-  }
-  
-  /** The number of entries in this index */
-  def entries = _entries
-  
-  /**
-   * The number of bytes actually used by this index
-   */
-  def sizeInBytes() = 8 * _entries
-  
-  /** Close the index */
-  def close() {
-    trimToValidSize()
-  }
-  
-  /**
-   * Rename the file that backs this offset index
-   * @throws IOException if rename fails
-   */
-  def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
-    finally _file = f
-  }
-  
-  /**
-   * Do a basic sanity check on this index to detect obvious problems
-   * @throws IllegalArgumentException if any problems are found
-   */
-  def sanityCheck() {
-    require(_entries == 0 || lastOffset > baseOffset,
-            "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
-            .format(_file.getAbsolutePath, lastOffset, baseOffset))
-    val len = _file.length()
-    require(len % 8 == 0,
-            "Index file " + _file.getName + " is corrupt, found " + len +
+  override def sanityCheck() {
+    require(_entries == 0 || _lastOffset > baseOffset,
+            s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
+                s"is ${_lastOffset} which is no larger than the base offset $baseOffset.")
+    val len = file.length()
+    require(len % entrySize == 0,
+            "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
             " bytes which is not positive or not a multiple of 8.")
   }
-  
-  /**
-   * Round a number to the greatest exact multiple of the given factor less than the given number.
-   * E.g. roundToExactMultiple(67, 8) == 64
-   */
-  private def roundToExactMultiple(number: Int, factor: Int) = factor * (number / factor)
-  
-  /**
-   * Execute the given function in a lock only if we are running on windows. We do this 
-   * because Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
-   * and this requires synchronizing reads.
-   */
-  private def maybeLock[T](lock: Lock)(fun: => T): T = {
-    if(Os.isWindows)
-      lock.lock()
-    try {
-      fun
-    } finally {
-      if(Os.isWindows)
-        lock.unlock()
-    }
-  }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/OffsetPosition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetPosition.scala b/core/src/main/scala/kafka/log/OffsetPosition.scala
deleted file mode 100644
index 24b6dcf..0000000
--- a/core/src/main/scala/kafka/log/OffsetPosition.scala
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.log
-
-/**
- * The mapping between a logical log offset and the physical position
- * in some log file of the beginning of the message set entry with the
- * given offset.
- */
-case class OffsetPosition(offset: Long, position: Int)

http://git-wip-us.apache.org/repos/asf/kafka/blob/79d3fd2b/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
new file mode 100644
index 0000000..7f24081
--- /dev/null
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.nio.ByteBuffer
+
+import kafka.common.InvalidOffsetException
+import kafka.message.Message
+import kafka.utils.CoreUtils._
+import kafka.utils.Logging
+
+/**
+ * An index that maps from the timestamp to the logical offsets of the messages in a segment. This index might be
+ * sparse, i.e. it may not hold an entry for all the messages in the segment.
+ *
+ * The index is stored in a file that is preallocated to hold a fixed maximum amount of 12-byte time index entries.
+ * The file format is a series of time index entries. The physical format is a 8 bytes timestamp and a 4 bytes "relative"
+ * offset used in the [[OffsetIndex]]. A time index entry (TIMESTAMP, OFFSET) means that the biggest timestamp seen
+ * before OFFSET is TIMESTAMP. i.e. Any message whose timestamp is greater than TIMESTAMP must come after OFFSET.
+ *
+ * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
+ * storage format.
+ *
+ * The timestamps in the same time index file are guaranteed to be monotonically increasing.
+ *
+ * The index support timestamp lookup for a memory map of this file. The lookup is done using a binary search to find
+ * the offset of the message whose indexed timestamp is closest but smaller or equals to the target timestamp.
+ *
+ * Time index files can be opened in two ways: either as an empty, mutable index that allows appends or
+ * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
+ * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
+ *
+ * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
+ *
+ */
+class TimeIndex(file: File,
+                baseOffset: Long,
+                maxIndexSize: Int = -1)
+    extends AbstractIndex[Long, Long](file, baseOffset, maxIndexSize) with Logging {
+
+  override def entrySize = 12
+
+  // We override the full check to reserve the last time index entry slot for the on roll call.
+  override def isFull: Boolean = entries >= maxEntries - 1
+
+  private def timestamp(buffer: ByteBuffer, n: Int): Long = buffer.getLong(n * entrySize)
+
+  private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8)
+
+  /**
+   * The last entry in the index
+   */
+  def lastEntry: TimestampOffset = {
+    inLock(lock) {
+      _entries match {
+        case 0 => TimestampOffset(Message.NoTimestamp, baseOffset)
+        case s => parseEntry(mmap, s - 1).asInstanceOf[TimestampOffset]
+      }
+    }
+  }
+
+  /**
+   * Get the nth timestamp mapping from the time index
+   * @param n The entry number in the time index
+   * @return The timestamp/offset pair at that entry
+   */
+  def entry(n: Int): TimestampOffset = {
+    maybeLock(lock) {
+      if(n >= _entries)
+        throw new IllegalArgumentException("Attempt to fetch the %dth entry from a time index of size %d.".format(n, _entries))
+      val idx = mmap.duplicate
+      TimestampOffset(timestamp(idx, n), relativeOffset(idx, n))
+    }
+  }
+
+  override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
+    TimestampOffset(timestamp(buffer, n), baseOffset + relativeOffset(buffer, n))
+  }
+
+  /**
+   * Attempt to append a time index entry to the time index.
+   * The new entry is appended only if both the timestamp and offsets are greater than the last appended timestamp and
+   * the last appended offset.
+   *
+   * @param timestamp The timestamp of the new time index entry
+   * @param offset The offset of the new time index entry
+   * @param skipFullCheck To skip checking whether the segment is full or not. We only skip the check when the segment
+   *                      gets rolled or the segment is closed.
+   */
+  def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false) {
+    inLock(lock) {
+      if (!skipFullCheck)
+        require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
+      // We do not throw exception when the offset equals to the offset of last entry. That means we are trying
+      // to insert the same time index entry as the last entry.
+      // If the timestamp index entry to be inserted is the same as the last entry, we simply ignore the insertion
+      // because that could happen in the following two scenarios:
+      // 1. An log segment is closed.
+      // 2. LogSegment.onBecomeInactiveSegment() is called when an active log segment is rolled.
+      if (_entries != 0 && offset < lastEntry.offset)
+        throw new InvalidOffsetException("Attempt to append an offset (%d) to slot %d no larger than the last offset appended (%d) to %s."
+          .format(offset, _entries, lastEntry.offset, file.getAbsolutePath))
+      if (_entries != 0 && timestamp < lastEntry.timestamp)
+        throw new IllegalStateException("Attempt to append an timestamp (%d) to slot %d no larger than the last timestamp appended (%d) to %s."
+            .format(timestamp, _entries, lastEntry.timestamp, file.getAbsolutePath))
+      // We only append to the time index when the timestamp is greater than the last inserted timestamp.
+      // If all the messages are in message format v0, the timestamp will always be NoTimestamp. In that case, the time
+      // index will be empty.
+      if (timestamp > lastEntry.timestamp) {
+        debug("Adding index entry %d => %d to %s.".format(timestamp, offset, file.getName))
+        mmap.putLong(timestamp)
+        mmap.putInt((offset - baseOffset).toInt)
+        _entries += 1
+        require(_entries * entrySize == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
+      }
+    }
+  }
+
+  /**
+   * Find the time index entry whose timestamp is less than or equal to the given timestamp.
+   * If the target timestamp is smaller than the least timestamp in the time index, (NoTimestamp, baseOffset) is
+   * returned.
+   *
+   * @param targetTimestamp The timestamp to look up.
+   * @return The time index entry found.
+   */
+  def lookup(targetTimestamp: Long): TimestampOffset = {
+    maybeLock(lock) {
+      val idx = mmap.duplicate
+      val slot = indexSlotFor(idx, targetTimestamp, IndexSearchType.KEY)
+      if (slot == -1)
+        TimestampOffset(Message.NoTimestamp, baseOffset)
+      else {
+        val entry = parseEntry(idx, slot).asInstanceOf[TimestampOffset]
+        TimestampOffset(entry.timestamp, entry.offset)
+      }
+    }
+  }
+
+  override def truncate() = truncateToEntries(0)
+
+  /**
+   * Remove all entries from the index which have an offset greater than or equal to the given offset.
+   * Truncating to an offset larger than the largest in the index has no effect.
+   */
+  override def truncateTo(offset: Long) {
+    inLock(lock) {
+      val idx = mmap.duplicate
+      val slot = indexSlotFor(idx, offset, IndexSearchType.VALUE)
+
+      /* There are 3 cases for choosing the new size
+       * 1) if there is no entry in the index <= the offset, delete everything
+       * 2) if there is an entry for this exact offset, delete it and everything larger than it
+       * 3) if there is no entry for this offset, delete everything larger than the next smallest
+       */
+      val newEntries =
+        if(slot < 0)
+          0
+        else if(relativeOffset(idx, slot) == offset - baseOffset)
+          slot
+        else
+          slot + 1
+      truncateToEntries(newEntries)
+    }
+  }
+
+  /**
+   * Truncates index to a known number of entries.
+   */
+  private def truncateToEntries(entries: Int) {
+    inLock(lock) {
+      _entries = entries
+      mmap.position(_entries * entrySize)
+    }
+  }
+
+  override def sanityCheck() {
+    val entry = lastEntry
+    val lastTimestamp = entry.timestamp
+    val lastOffset = entry.offset
+    require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)),
+      s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " +
+          s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}")
+    require(_entries == 0 || lastOffset >= baseOffset,
+      s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
+          s"is $lastOffset which is smaller than the first offset $baseOffset")
+    val len = file.length()
+    require(len % entrySize == 0,
+      "Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
+          " bytes which is not positive or not a multiple of 12.")
+  }
+}
\ No newline at end of file