You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jq...@apache.org on 2017/11/10 06:08:19 UTC

kafka git commit: KAFKA-6175; AbstractIndex should cache index file to avoid unnecessary disk access during resize()

Repository: kafka
Updated Branches:
  refs/heads/trunk 66cab2f7f -> 12af521c4


KAFKA-6175; AbstractIndex should cache index file to avoid unnecessary disk access during resize()

This patch also adds the a test for test the log deletion after close.

Author: Dong Lin <li...@gmail.com>

Reviewers: Jiangjie Qin <be...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4179 from lindong28/KAFKA-6175


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/12af521c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/12af521c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/12af521c

Branch: refs/heads/trunk
Commit: 12af521c487a146456442f895b9fc99a45ed100f
Parents: 66cab2f
Author: Dong Lin <li...@gmail.com>
Authored: Thu Nov 9 22:08:03 2017 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Thu Nov 9 22:08:03 2017 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/log/AbstractIndex.scala    | 48 +++++++++++++-------
 core/src/main/scala/kafka/log/OffsetIndex.scala | 37 ++++++++-------
 core/src/main/scala/kafka/log/TimeIndex.scala   | 14 +++---
 .../src/test/scala/unit/kafka/log/LogTest.scala | 18 ++++++++
 4 files changed, 76 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index c214dad..899c107 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -39,6 +39,10 @@ import scala.math.ceil
 abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
                                    val maxIndexSize: Int = -1, val writable: Boolean) extends Logging {
 
+  // Length of the index file
+  @volatile
+  private var _length: Long = _
+
   protected def entrySize: Int
 
   protected val lock = new ReentrantLock
@@ -56,12 +60,12 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
       }
 
       /* memory-map the file */
-      val len = raf.length()
+      _length = raf.length()
       val idx = {
         if (writable)
-          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+          raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, _length)
         else
-          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, len)
+          raf.getChannel.map(FileChannel.MapMode.READ_ONLY, 0, _length)
       }
       /* set the position in the index for the next entry */
       if(newlyCreated)
@@ -94,28 +98,40 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
 
   def entries: Int = _entries
 
+  def length: Long = _length
+
   /**
    * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
    * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
    * loading segments from disk or truncating back to an old segment where a new log segment became active;
    * we want to reset the index size to maximum index size to avoid rolling new segment.
+   *
+   * @param newSize new size of the index file
+   * @return a boolean indicating whether the size of the memory map and the underneath file is changed or not.
    */
-  def resize(newSize: Int) {
+  def resize(newSize: Int): Boolean = {
     inLock(lock) {
-      val raf = new RandomAccessFile(file, "rw")
       val roundedNewSize = roundDownToExactMultiple(newSize, entrySize)
-      val position = mmap.position()
 
-      /* Windows won't let us modify the file length while the file is mmapped :-( */
-      if (OperatingSystem.IS_WINDOWS)
-        safeForceUnmap()
-      try {
-        raf.setLength(roundedNewSize)
-        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
-        _maxEntries = mmap.limit() / entrySize
-        mmap.position(position)
-      } finally {
-        CoreUtils.swallow(raf.close())
+      if (_length == roundedNewSize) {
+        false
+      } else {
+        val raf = new RandomAccessFile(file, "rw")
+        try {
+          val position = mmap.position()
+
+          /* Windows won't let us modify the file length while the file is mmapped :-( */
+          if (OperatingSystem.IS_WINDOWS)
+            safeForceUnmap()
+          raf.setLength(roundedNewSize)
+          _length = roundedNewSize
+          mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+          _maxEntries = mmap.limit() / entrySize
+          mmap.position(position)
+          true
+        } finally {
+          CoreUtils.swallow(raf.close())
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index c156972..eb15842 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -26,26 +26,26 @@ import kafka.common.InvalidOffsetException
 /**
  * An index that maps offsets to physical file locations for a particular log segment. This index may be sparse:
  * that is it may not hold an entry for all messages in the log.
- * 
+ *
  * The index is stored in a file that is pre-allocated to hold a fixed maximum number of 8-byte entries.
- * 
+ *
  * The index supports lookups against a memory-map of this file. These lookups are done using a simple binary search variant
  * to locate the offset/location pair for the greatest offset less than or equal to the target offset.
- * 
+ *
  * Index files can be opened in two ways: either as an empty, mutable index that allows appends or
- * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an 
+ * an immutable read-only index file that has previously been populated. The makeReadOnly method will turn a mutable file into an
  * immutable one and truncate off any extra bytes. This is done when the index file is rolled over.
- * 
+ *
  * No attempt is made to checksum the contents of this file, in the event of a crash it is rebuilt.
- * 
- * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the 
+ *
+ * The file format is a series of entries. The physical format is a 4 byte "relative" offset and a 4 byte file location for the
  * message with that offset. The offset stored is relative to the base offset of the index file. So, for example,
  * if the base offset was 50, then the offset 55 would be stored as 5. Using relative offsets in this way let's us use
  * only 4 bytes for the offset.
- * 
+ *
  * The frequency of entries is up to the user of this class.
- * 
- * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
+ *
+ * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
  * storage format.
  */
 // Avoid shadowing mutable `file` in AbstractIndex
@@ -53,10 +53,10 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
     extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
 
   override def entrySize = 8
-  
+
   /* the last offset in the index */
   private[this] var _lastOffset = lastEntry.offset
-  
+
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
     .format(file.getAbsolutePath, maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position()))
 
@@ -75,9 +75,9 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
   def lastOffset: Long = _lastOffset
 
   /**
-   * Find the largest offset less than or equal to the given targetOffset 
+   * Find the largest offset less than or equal to the given targetOffset
    * and return a pair holding this offset and its corresponding physical file position.
-   * 
+   *
    * @param targetOffset The offset to look up.
    * @return The offset found and the corresponding file position for this offset
    *         If the target offset is smaller than the least entry in the index (or the index is empty),
@@ -117,7 +117,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
   override def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry = {
       OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
   }
-  
+
   /**
    * Get the nth offset mapping from the index
    * @param n The entry number in the index
@@ -164,7 +164,7 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
        * 2) if there is an entry for this exact offset, delete it and everything larger than it
        * 3) if there is no entry for this offset, delete everything larger than the next smallest
        */
-      val newEntries = 
+      val newEntries =
         if(slot < 0)
           0
         else if(relativeOffset(idx, slot) == offset - baseOffset)
@@ -190,9 +190,8 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
     require(_entries == 0 || _lastOffset > baseOffset,
             s"Corrupt index found, index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
                 s"is ${_lastOffset} which is no larger than the base offset $baseOffset.")
-    val len = file.length()
-    require(len % entrySize == 0,
-            "Index file " + file.getAbsolutePath + " is corrupt, found " + len +
+    require(length % entrySize == 0,
+            "Index file " + file.getAbsolutePath + " is corrupt, found " + length +
             " bytes which is not positive or not a multiple of 8.")
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/main/scala/kafka/log/TimeIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index def1f7a..47ab2e5 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -184,10 +184,13 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
     }
   }
 
-  override def resize(newSize: Int) {
+  override def resize(newSize: Int): Boolean = {
     inLock(lock) {
-      super.resize(newSize)
-      _lastEntry = lastEntryFromIndexFile
+      if (super.resize(newSize)) {
+        _lastEntry = lastEntryFromIndexFile
+        true
+      } else
+        false
     }
   }
 
@@ -211,9 +214,8 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
     require(_entries == 0 || lastOffset >= baseOffset,
       s"Corrupt time index found, time index file (${file.getAbsolutePath}) has non-zero size but the last offset " +
           s"is $lastOffset which is smaller than the first offset $baseOffset")
-    val len = file.length()
-    require(len % entrySize == 0,
-      "Time index file " + file.getAbsolutePath + " is corrupt, found " + len +
+    require(length % entrySize == 0,
+      "Time index file " + file.getAbsolutePath + " is corrupt, found " + length +
           " bytes which is not positive or not a multiple of 12.")
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/12af521c/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 723ccab..1e408ec 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2145,6 +2145,24 @@ class LogTest {
   }
 
   @Test
+  def testLogDeletionAfterClose() {
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+    val log = createLog(logDir, logConfig)
+
+    // append some messages to create some segments
+    log.appendAsLeader(createRecords, leaderEpoch = 0)
+
+    assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+
+    log.close()
+    log.delete()
+    assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+  }
+
+  @Test
   def testLogDeletionAfterDeleteRecords() {
     def createRecords = TestUtils.singletonRecords("test".getBytes)
     val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5)