You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/11/06 17:28:37 UTC
kafka git commit: KAFKA-6172;
Cache lastEntry in TimeIndex to avoid unnecessary disk access
Repository: kafka
Updated Branches:
refs/heads/trunk 7672e9ec3 -> 0c895706e
KAFKA-6172; Cache lastEntry in TimeIndex to avoid unnecessary disk access
Author: Dong Lin <li...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jiangjie Qin <be...@gmail.com>
Closes #4177 from lindong28/KAFKA-6172
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0c895706
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0c895706
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0c895706
Branch: refs/heads/trunk
Commit: 0c895706e8ab511efe352a824a0c9e2dab62499e
Parents: 7672e9e
Author: Dong Lin <li...@gmail.com>
Authored: Mon Nov 6 09:28:29 2017 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Mon Nov 6 09:28:29 2017 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/TimeIndex.scala | 22 +++++++++++++++++-----
1 file changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0c895706/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index aab9300..def1f7a 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -53,6 +53,8 @@ import org.apache.kafka.common.record.RecordBatch
class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) with Logging {
+ @volatile private var _lastEntry = lastEntryFromIndexFile
+
override def entrySize = 12
// We override the full check to reserve the last time index entry slot for the on roll call.
@@ -62,10 +64,12 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
private def relativeOffset(buffer: ByteBuffer, n: Int): Int = buffer.getInt(n * entrySize + 8)
+ def lastEntry: TimestampOffset = _lastEntry
+
/**
- * The last entry in the index
+ * Read the last entry from the index file. This operation involves disk access.
*/
- def lastEntry: TimestampOffset = {
+ private def lastEntryFromIndexFile: TimestampOffset = {
inLock(lock) {
_entries match {
case 0 => TimestampOffset(RecordBatch.NO_TIMESTAMP, baseOffset)
@@ -126,6 +130,7 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
mmap.putLong(timestamp)
mmap.putInt((offset - baseOffset).toInt)
_entries += 1
+ _lastEntry = TimestampOffset(timestamp, offset)
require(_entries * entrySize == mmap.position(), _entries + " entries but file position in index is " + mmap.position() + ".")
}
}
@@ -179,6 +184,13 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
}
}
+ override def resize(newSize: Int) {
+ inLock(lock) {
+ super.resize(newSize)
+ _lastEntry = lastEntryFromIndexFile
+ }
+ }
+
/**
* Truncates index to a known number of entries.
*/
@@ -186,13 +198,13 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
inLock(lock) {
_entries = entries
mmap.position(_entries * entrySize)
+ _lastEntry = lastEntryFromIndexFile
}
}
override def sanityCheck() {
- val entry = lastEntry
- val lastTimestamp = entry.timestamp
- val lastOffset = entry.offset
+ val lastTimestamp = lastEntry.timestamp
+ val lastOffset = lastEntry.offset
require(_entries == 0 || (lastTimestamp >= timestamp(mmap, 0)),
s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last timestamp " +
s"is $lastTimestamp which is no larger than the first timestamp ${timestamp(mmap, 0)}")