You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/11/06 17:28:37 UTC

kafka git commit: KAFKA-6172; Cache lastEntry in TimeIndex to avoid unnecessary disk access

Repository: kafka
Updated Branches:
  refs/heads/trunk 7672e9ec3 -> 0c895706e


KAFKA-6172; Cache lastEntry in TimeIndex to avoid unnecessary disk access

Author: Dong Lin <li...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jiangjie Qin <be...@gmail.com>

Closes #4177 from lindong28/KAFKA-6172


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c895706
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c895706
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c895706

Branch: refs/heads/trunk
Commit: 0c895706e8ab511efe352a824a0c9e2dab62499e
Parents: 7672e9e
Author: Dong Lin <li...@gmail.com>
Authored: Mon Nov 6 09:28:29 2017 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Mon Nov 6 09:28:29 2017 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/TimeIndex.scala | 22 +++++++++++++++++-----
 1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0c895706/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index aab9300..def1f7a 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -53,6 +53,8 @@ import org.apache.kafka.common.record.RecordBatch
 class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
     extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) with Logging {
 
+  @volatile private var _lastEntry = lastEntryFromIndexFile
+
   override def entrySize = 12
 
   // We override the full check to reserve the last time index entry slot for the on roll call.
@@ -62,10 +64,12 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
 
   private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8)
 
+  def lastEntry: TimestampOffset = _lastEntry
+
   /**
-   * The last entry in the index
+   * Read the last entry from the index file. This operation involves disk access.
    */
-  def lastEntry: TimestampOffset = {
+  private def lastEntryFromIndexFile: TimestampOffset = {
     inLock(lock) {
       _entries match {
         case 0 => TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
@@ -126,6 +130,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
         mmap.putLong(timestamp)
         mmap.putInt((offset - baseOffset).toInt)
         _entries += 1
+        _lastEntry = TimestampOffset(timestamp, offset)
         require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
       }
     }
@@ -179,6 +184,13 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
     }
   }
 
+  override def resize(newSize: Int) {
+    inLock(lock) {
+      super.resize(newSize)
+      _lastEntry = lastEntryFromIndexFile
+    }
+  }
+
   /**
    * Truncates index to a known number of entries.
    */
@@ -186,13 +198,13 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
     inLock(lock) {
       _entries = entries
       mmap.position(_entries * entrySize)
+      _lastEntry = lastEntryFromIndexFile
     }
   }
 
   override def sanityCheck() {
-    val entry = lastEntry
-    val lastTimestamp = entry.timestamp
-    val lastOffset = entry.offset
+    val lastTimestamp = lastEntry.timestamp
+    val lastOffset = lastEntry.offset
     require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)),
       s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " +
           s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}")