You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/11/10 06:08:19 UTC
kafka git commit: KAFKA-6175;
AbstractIndex should cache index file to avoid unnecessary disk
access during resize()
Repository: kafka
Updated Branches:
refs/heads/trunk 66cab2f7f -> 12af521c4
KAFKA-6175; AbstractIndex should cache index file to avoid unnecessary disk access during resize()
This patch also adds the a test for test the log deletion after close.
Author: Dong Lin <li...@gmail.com>
Reviewers: Jiangjie Qin <be...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #4179 from lindong28/KAFKA-6175
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12af521c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12af521c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12af521c
Branch: refs/heads/trunk
Commit: 12af521c487a146456442f895b9fc99a45ed100f
Parents: 66cab2f
Author: Dong Lin <li...@gmail.com>
Authored: Thu Nov 9 22:08:03 2017 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Thu Nov 9 22:08:03 2017 -0800
----------------------------------------------------------------------
.../main/scala/kafka/log/AbstractIndex.scala | 48 +++++++++++++-------
core/src/main/scala/kafka/log/OffsetIndex.scala | 37 ++++++++-------
core/src/main/scala/kafka/log/TimeIndex.scala | 14 +++---
.../src/test/scala/unit/kafka/log/LogTest.scala | 18 ++++++++
4 files changed, 76 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index c214dad..899c107 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -39,6 +39,10 @@ import scala.math.ceil
abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
val maxIndexSize: Int = -1, val writable: Boolean) extends Logging {
+ // Length of the index file
+ @volatile
+ private var _length: Long = _
+
protected def entrySize: Int
protected val lock = new ReentrantLock
@@ -56,12 +60,12 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
}
/* memory-map the file */
- val len = raf.length()
+ _length = raf.length()
val idx = {
if (writable)
- raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+ raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
else
- raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
+ raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
}
/* set the position in the index for the next entry */
if(newlyCreated)
@@ -94,28 +98,40 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
def entries: Int = _entries
+ def length: Long = _length
+
/**
* Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
* trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
* loading segments from disk or truncating back to an old segment where a new log segment became active;
* we want to reset the index size to maximum index size to avoid rolling new segment.
+ *
+ * @param newSize new size of the index file
+ * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
*/
- def resize(newSize: Int) {
+ def resize(newSize: Int): Boolean = {
inLock(lock) {
- val raf = new RandomAccessFile(file, "rw")
val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
- val position = mmap.position()
- /* Windows won't let us modify the file length while the file is mmapped :-( */
- if (OperatingSystem.IS_WINDOWS)
- safeForceUnmap()
- try {
- raf.setLength(roundedNewSize)
- mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
- _maxEntries = mmap.limit() / entrySize
- mmap.position(position)
- } finally {
- CoreUtils.swallow(raf.close())
+ if (_length == roundedNewSize) {
+ false
+ } else {
+ val raf = new RandomAccessFile(file, "rw")
+ try {
+ val position = mmap.position()
+
+ /* Windows won't let us modify the file length while the file is mmapped :-( */
+ if (OperatingSystem.IS_WINDOWS)
+ safeForceUnmap()
+ raf.setLength(roundedNewSize)
+ _length = roundedNewSize
+ mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+ _maxEntries = mmap.limit() / entrySize
+ mmap.position(position)
+ true
+ } finally {
+ CoreUtils.swallow(raf.close())
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index c156972..eb15842 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -26,26 +26,26 @@ import kafka.common.InvalidOffsetException
/**
* An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
* that is it may not hold an entry for all messages in the log.
- *
+ *
* The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries.
- *
+ *
* The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant
* to locate the offset/location pair for the greatest offset less than or equal to the target offset.
- *
+ *
* Index files can be opened in two ways: either as an empty, mutable index that allows appends or
- * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
+ * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
* immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
- *
+ *
* No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
- *
- * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the
+ *
+ * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the
* message with that offset. The offset stored is relative to the base offset of the index file. So, for example,
* if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use
* only 4 bytes for the offset.
- *
+ *
* The frequency of entries is up to the user of this class.
- *
- * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
+ *
+ * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
* storage format.
*/
// Avoid shadowing mutable `file` in AbstractIndex
@@ -53,10 +53,10 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
override def entrySize = 8
-
+
/* the last offset in the index */
private[this] var _lastOffset = lastEntry.offset
-
+
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
.format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position()))
@@ -75,9 +75,9 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
def lastOffset: Long = _lastOffset
/**
- * Find the largest offset less than or equal to the given targetOffset
+ * Find the largest offset less than or equal to the given targetOffset
* and return a pair holding this offset and its corresponding physical file position.
- *
+ *
* @param targetOffset The offset to look up.
* @return The offset found and the corresponding file position for this offset
* If the target offset is smaller than the least entry in the index (or the index is empty),
@@ -117,7 +117,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}
-
+
/**
* Get the nth offset mapping from the index
* @param n The entry number in the index
@@ -164,7 +164,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
* 2) if there is an entry for this exact offset, delete it and everything larger than it
* 3) if there is no entry for this offset, delete everything larger than the next smallest
*/
- val newEntries =
+ val newEntries =
if(slot < 0)
0
else if(relativeOffset(idx, slot) == offset - baseOffset)
@@ -190,9 +190,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
require(_entries == 0 || _lastOffset > baseOffset,
s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
s"is ${_lastOffset} which is no larger than the base offset $baseOffset.")
- val len = file.length()
- require(len % entrySize == 0,
- "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
+ require(length % entrySize == 0,
+ "Index file " + file.getAbsolutePath + " is corrupt, found " + length +
" bytes which is not positive or not a multiple of 8.")
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index def1f7a..47ab2e5 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -184,10 +184,13 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
}
}
- override def resize(newSize: Int) {
+ override def resize(newSize: Int): Boolean = {
inLock(lock) {
- super.resize(newSize)
- _lastEntry = lastEntryFromIndexFile
+ if (super.resize(newSize)) {
+ _lastEntry = lastEntryFromIndexFile
+ true
+ } else
+ false
}
}
@@ -211,9 +214,8 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
require(_entries == 0 || lastOffset >= baseOffset,
s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
s"is $lastOffset which is smaller than the first offset $baseOffset")
- val len = file.length()
- require(len % entrySize == 0,
- "Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
+ require(length % entrySize == 0,
+ "Time index file " + file.getAbsolutePath + " is corrupt, found " + length +
" bytes which is not positive or not a multiple of 12.")
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 723ccab..1e408ec 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2145,6 +2145,24 @@ class LogTest {
}
@Test
+ def testLogDeletionAfterClose() {
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
+ val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+ val log = createLog(logDir, logConfig)
+
+ // append some messages to create some segments
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+
+ assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+ assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+
+ log.close()
+ log.delete()
+ assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
+ assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+ }
+
+ @Test
def testLogDeletionAfterDeleteRecords() {
def createRecords = TestUtils.singletonRecords("test".getBytes)
val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5)