You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/03/26 11:27:47 UTC

[kafka] branch trunk updated: KAFKA-9373: Reduce shutdown time by avoiding unnecessary loading of indexes (#8346)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 222726d  KAFKA-9373: Reduce shutdown time by avoiding unnecessary loading of indexes (#8346)
222726d is described below

commit 222726d6f97f624711e64487c8e48114aca7526e
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Thu Mar 26 04:26:51 2020 -0700

    KAFKA-9373: Reduce shutdown time by avoiding unnecessary loading of indexes (#8346)
    
    KAFKA-7283 enabled lazy mmap on index files by initializing indices
    on-demand rather than performing costly disk/memory operations when
    creating all indices on broker startup. This helped reducing the startup
    time of brokers. However, segment indices are still created on closing
    segments, regardless of whether they need to be closed or not.
    
    This is a cleaned up version of #7900, which was submitted by @efeg. It
    eliminates unnecessary disk accesses and memory map operations while
    deleting, renaming or closing offset and time indexes.
    
    In a cluster with 31 brokers, where each broker has 13K to 20K segments,
    @efeg and team observed up to 2 orders of magnitude faster LogManager
    shutdown times - i.e. dropping the LogManager shutdown time of each
    broker from 10s of seconds to 100s of milliseconds.
    
    To avoid confusion between `renameTo` and `setFile`, I replaced the
    latter with the more restricted updateParentDir` (it turns out that's
    all we need).
    
    Reviewers: Jun Rao <ju...@gmail.com>, Andrew Choi <a2...@edu.uwaterloo.ca>
    
    Co-authored-by: Adem Efe Gencer <ag...@linkedin.com>
    Co-authored-by: Ismael Juma <is...@juma.me.uk>
---
 .../apache/kafka/common/record/FileRecords.java    |   8 +-
 core/src/main/scala/kafka/log/AbstractIndex.scala  |  10 +-
 core/src/main/scala/kafka/log/LazyIndex.scala      | 108 ++++++++++++++++++---
 core/src/main/scala/kafka/log/Log.scala            |   2 +-
 core/src/main/scala/kafka/log/LogSegment.scala     |  32 +++---
 .../main/scala/kafka/log/TransactionIndex.scala    |  11 ++-
 .../test/scala/unit/kafka/log/LogSegmentTest.scala |  17 ++++
 .../jmh/server/HighwatermarkCheckpointBench.java   |  11 ++-
 8 files changed, 152 insertions(+), 47 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 9b312d9..c097f55 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -208,11 +208,11 @@ public class FileRecords extends AbstractRecords implements Closeable {
     }
 
     /**
-     * Update the file reference (to be used with caution since this does not reopen the file channel)
-     * @param file The new file to use
+     * Update the parent directory (to be used with caution since this does not reopen the file channel)
+     * @param parentDir The new parent directory
      */
-    public void setFile(File file) {
-        this.file = file;
+    public void updateParentDir(File parentDir) {
+        this.file = new File(parentDir, file.getName());
     }
 
     /**
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index e8e7136..675fbcf 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -32,11 +32,11 @@ import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils
 /**
  * The abstract index class which holds entry format agnostic methods.
  *
- * @param file The index file
+ * @param _file The index file
  * @param baseOffset the base offset of the segment that this index is corresponding to.
  * @param maxIndexSize The maximum index size in bytes.
  */
-abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
+abstract class AbstractIndex(@volatile private var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
                              val writable: Boolean) extends Closeable {
   import AbstractIndex._
 
@@ -153,12 +153,16 @@ abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val
    */
   def isFull: Boolean = _entries >= _maxEntries
 
+  def file: File = _file
+
   def maxEntries: Int = _maxEntries
 
   def entries: Int = _entries
 
   def length: Long = _length
 
+  def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
+
   /**
    * Reset the size of the memory map and the underneath file. This is used in two kinds of cases: (1) in
    * trimToValidSize() which is called at closing the segment or new segment being rolled; (2) at
@@ -205,7 +209,7 @@ abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val
    */
   def renameTo(f: File): Unit = {
     try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-    finally file = f
+    finally _file = f
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala
index cd7e0e5..a5a7c34 100644
--- a/core/src/main/scala/kafka/log/LazyIndex.scala
+++ b/core/src/main/scala/kafka/log/LazyIndex.scala
@@ -18,22 +18,32 @@
 package kafka.log
 
 import java.io.File
+import java.nio.file.{Files, NoSuchFileException}
 import java.util.concurrent.locks.ReentrantLock
 
 import LazyIndex._
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.threadsafe
+import org.apache.kafka.common.utils.Utils
 
 /**
-  * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading (i.e. memory mapping) the
-  * underlying index until it is accessed for the first time via the `get` method.
+  * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading
+  * (i.e. memory mapping) the underlying index until it is accessed for the first time via the
+  * `get` method.
   *
-  * This is an important optimization with regards to broker start-up time if it has a large number of segments.
+  * In addition, this class exposes a number of methods (e.g. updateParentDir, renameTo, close,
+  * etc.) that provide the desired behavior without causing the index to be loaded. If the index
+  * had previously been loaded, the methods in this class simply delegate to the relevant method in
+  * the index.
   *
-  * Methods of this class are thread safe. Make sure to check `AbstractIndex` subclasses documentation
-  * to establish their thread safety.
+  * This is an important optimization with regards to broker start-up and shutdown time if it has a
+  * large number of segments.
   *
-  * @param loadIndex A function that takes a `File` pointing to an index and returns a loaded `AbstractIndex` instance.
+  * Methods of this class are thread safe. Make sure to check `AbstractIndex` subclasses
+  * documentation to establish their thread safety.
+  *
+  * @param loadIndex A function that takes a `File` pointing to an index and returns a loaded
+  *                  `AbstractIndex` instance.
   */
 @threadsafe
 class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: IndexWrapper, loadIndex: File => T) {
@@ -42,12 +52,6 @@ class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper:
 
   def file: File = indexWrapper.file
 
-  def file_=(f: File): Unit = {
-    inLock(lock) {
-      indexWrapper.file = f
-    }
-  }
-
   def get: T = {
     indexWrapper match {
       case indexValue: IndexValue[T] => indexValue.index
@@ -64,6 +68,36 @@ class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper:
     }
   }
 
+  def updateParentDir(parentDir: File): Unit = {
+    inLock(lock) {
+      indexWrapper.updateParentDir(parentDir)
+    }
+  }
+
+  def renameTo(f: File): Unit = {
+    inLock(lock) {
+      indexWrapper.renameTo(f)
+    }
+  }
+
+  def deleteIfExists(): Boolean = {
+    inLock(lock) {
+      indexWrapper.deleteIfExists()
+    }
+  }
+
+  def close(): Unit = {
+    inLock(lock) {
+      indexWrapper.close()
+    }
+  }
+
+  def closeHandler(): Unit = {
+    inLock(lock) {
+      indexWrapper.closeHandler()
+    }
+  }
+
 }
 
 object LazyIndex {
@@ -75,15 +109,57 @@ object LazyIndex {
     new LazyIndex(new IndexFile(file), file => new TimeIndex(file, baseOffset, maxIndexSize, writable))
 
   private sealed trait IndexWrapper {
+
     def file: File
-    def file_=(f: File): Unit
+
+    def updateParentDir(f: File): Unit
+
+    def renameTo(f: File): Unit
+
+    def deleteIfExists(): Boolean
+
+    def close(): Unit
+
+    def closeHandler(): Unit
+
   }
 
-  private class IndexFile(@volatile var file: File) extends IndexWrapper
+  private class IndexFile(@volatile private var _file: File) extends IndexWrapper {
+
+    def file: File = _file
+
+    def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
+
+    def renameTo(f: File): Unit = {
+      try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
+      catch {
+        case _: NoSuchFileException if !file.exists => ()
+      }
+      finally _file = f
+    }
+
+    def deleteIfExists(): Boolean = Files.deleteIfExists(file.toPath)
+
+    def close(): Unit = ()
+
+    def closeHandler(): Unit = ()
+
+  }
 
   private class IndexValue[T <: AbstractIndex](val index: T) extends IndexWrapper {
-    override def file: File = index.file
-    override def file_=(f: File): Unit = index.file = f
+
+    def file: File = index.file
+
+    def updateParentDir(parentDir: File): Unit = index.updateParentDir(parentDir)
+
+    def renameTo(f: File): Unit = index.renameTo(f)
+
+    def deleteIfExists(): Boolean = index.deleteIfExists()
+
+    def close(): Unit = index.close()
+
+    def closeHandler(): Unit = index.closeHandler()
+
   }
 
 }
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 20b0cd2..6c6f376 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -973,7 +973,7 @@ class Log(@volatile private var _dir: File,
         if (renamedDir != dir) {
           _dir = renamedDir
           _parentDir = renamedDir.getParent
-          logSegments.foreach(_.updateDir(renamedDir))
+          logSegments.foreach(_.updateParentDir(renamedDir))
           producerStateManager.logDir = dir
           // re-initialize leader epoch cache so that LeaderEpochCheckpointFile.checkpoint can correctly reference
           // the checkpoint file in renamed log directory
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 64e96e1..6d3beb7 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -480,21 +480,21 @@ class LogSegment private[log] (val log: FileRecords,
    * Update the directory reference for the log and indices in this segment. This would typically be called after a
    * directory is renamed.
    */
-  def updateDir(dir: File): Unit = {
-    log.setFile(new File(dir, log.file.getName))
-    lazyOffsetIndex.file = new File(dir, lazyOffsetIndex.file.getName)
-    lazyTimeIndex.file = new File(dir, lazyTimeIndex.file.getName)
-    txnIndex.file = new File(dir, txnIndex.file.getName)
+  def updateParentDir(dir: File): Unit = {
+    log.updateParentDir(dir)
+    lazyOffsetIndex.updateParentDir(dir)
+    lazyTimeIndex.updateParentDir(dir)
+    txnIndex.updateParentDir(dir)
   }
 
   /**
-   * Change the suffix for the index and log file for this log segment
+   * Change the suffix for the index and log files for this log segment
    * IOException from this method should be handled by the caller
    */
   def changeFileSuffixes(oldSuffix: String, newSuffix: String): Unit = {
     log.renameTo(new File(CoreUtils.replaceSuffix(log.file.getPath, oldSuffix, newSuffix)))
-    offsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
-    timeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
+    lazyOffsetIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyOffsetIndex.file.getPath, oldSuffix, newSuffix)))
+    lazyTimeIndex.renameTo(new File(CoreUtils.replaceSuffix(lazyTimeIndex.file.getPath, oldSuffix, newSuffix)))
     txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, oldSuffix, newSuffix)))
   }
 
@@ -585,9 +585,11 @@ class LogSegment private[log] (val log: FileRecords,
    * Close this log segment
    */
   def close(): Unit = {
-    CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true), this)
-    CoreUtils.swallow(offsetIndex.close(), this)
-    CoreUtils.swallow(timeIndex.close(), this)
+    if (_maxTimestampSoFar.nonEmpty || _offsetOfMaxTimestampSoFar.nonEmpty)
+      CoreUtils.swallow(timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar,
+        skipFullCheck = true), this)
+    CoreUtils.swallow(lazyOffsetIndex.close(), this)
+    CoreUtils.swallow(lazyTimeIndex.close(), this)
     CoreUtils.swallow(log.close(), this)
     CoreUtils.swallow(txnIndex.close(), this)
   }
@@ -596,8 +598,8 @@ class LogSegment private[log] (val log: FileRecords,
     * Close file handlers used by the log segment but don't write to disk. This is used when the disk may have failed
     */
   def closeHandlers(): Unit = {
-    CoreUtils.swallow(offsetIndex.closeHandler(), this)
-    CoreUtils.swallow(timeIndex.closeHandler(), this)
+    CoreUtils.swallow(lazyOffsetIndex.closeHandler(), this)
+    CoreUtils.swallow(lazyTimeIndex.closeHandler(), this)
     CoreUtils.swallow(log.closeHandlers(), this)
     CoreUtils.swallow(txnIndex.close(), this)
   }
@@ -620,8 +622,8 @@ class LogSegment private[log] (val log: FileRecords,
 
     CoreUtils.tryAll(Seq(
       () => delete(log.deleteIfExists _, "log", log.file, logIfMissing = true),
-      () => delete(offsetIndex.deleteIfExists _, "offset index", lazyOffsetIndex.file, logIfMissing = true),
-      () => delete(timeIndex.deleteIfExists _, "time index", lazyTimeIndex.file, logIfMissing = true),
+      () => delete(lazyOffsetIndex.deleteIfExists _, "offset index", lazyOffsetIndex.file, logIfMissing = true),
+      () => delete(lazyTimeIndex.deleteIfExists _, "time index", lazyTimeIndex.file, logIfMissing = true),
       () => delete(txnIndex.deleteIfExists _, "transaction index", txnIndex.file, logIfMissing = false)
     ))
   }
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index 696bc3a..9152bc4 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -42,12 +42,13 @@ private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTx
  * order to find the start of the transactions.
  */
 @nonthreadsafe
-class TransactionIndex(val startOffset: Long, @volatile var file: File) extends Logging {
+class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Logging {
+
   // note that the file is not created until we need it
   @volatile private var maybeChannel: Option[FileChannel] = None
   private var lastOffset: Option[Long] = None
 
-  if (file.exists)
+  if (_file.exists)
     openChannel()
 
   def append(abortedTxn: AbortedTxn): Unit = {
@@ -62,6 +63,10 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
 
   def flush(): Unit = maybeChannel.foreach(_.force(true))
 
+  def file: File = _file
+
+  def updateParentDir(parentDir: File): Unit = _file = new File(parentDir, file.getName)
+
   /**
    * Delete this index.
    *
@@ -106,7 +111,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
     try {
       if (file.exists)
         Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-    } finally file = f
+    } finally _file = f
   }
 
   def truncateTo(offset: Long): Unit = {
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 6cc6354..a29e7e5 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -145,6 +145,9 @@ class LogSegmentTest {
     val maxSegmentMs = 300000
     val time = new MockTime
     val seg = createSegment(0, time = time)
+    // Force load indexes before closing the segment
+    seg.timeIndex
+    seg.offsetIndex
     seg.close()
 
     val reopened = createSegment(0, time = time)
@@ -262,11 +265,25 @@ class LogSegmentTest {
     val seg = createSegment(40)
     val logFile = seg.log.file
     val indexFile = seg.lazyOffsetIndex.file
+    val timeIndexFile = seg.lazyTimeIndex.file
+    // Ensure that files for offset and time indices have not been created eagerly.
+    assertFalse(seg.lazyOffsetIndex.file.exists)
+    assertFalse(seg.lazyTimeIndex.file.exists)
     seg.changeFileSuffixes("", ".deleted")
+    // Ensure that attempt to change suffixes for non-existing offset and time indices does not create new files.
+    assertFalse(seg.lazyOffsetIndex.file.exists)
+    assertFalse(seg.lazyTimeIndex.file.exists)
+    // Ensure that file names are updated accordingly.
     assertEquals(logFile.getAbsolutePath + ".deleted", seg.log.file.getAbsolutePath)
     assertEquals(indexFile.getAbsolutePath + ".deleted", seg.lazyOffsetIndex.file.getAbsolutePath)
+    assertEquals(timeIndexFile.getAbsolutePath + ".deleted", seg.lazyTimeIndex.file.getAbsolutePath)
     assertTrue(seg.log.file.exists)
+    // Ensure lazy creation of offset index file upon accessing it.
+    seg.lazyOffsetIndex.get
     assertTrue(seg.lazyOffsetIndex.file.exists)
+    // Ensure lazy creation of time index file upon accessing it.
+    seg.lazyTimeIndex.get
+    assertTrue(seg.lazyTimeIndex.file.exists)
   }
 
   /**
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java
index 9cb8ac6..98861dc 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/HighwatermarkCheckpointBench.java
@@ -51,7 +51,6 @@ import org.openjdk.jmh.annotations.State;
 import org.openjdk.jmh.annotations.TearDown;
 import org.openjdk.jmh.annotations.Threads;
 import org.openjdk.jmh.annotations.Warmup;
-import scala.Option;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -59,8 +58,9 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
-import scala.jdk.CollectionConverters;
 
+import scala.collection.JavaConverters;
+import scala.Option;
 
 @Warmup(iterations = 5)
 @Measurement(iterations = 5)
@@ -91,6 +91,7 @@ public class HighwatermarkCheckpointBench {
     private LogManager logManager;
 
 
+    @SuppressWarnings("deprecation")
     @Setup(Level.Trial)
     public void setup() {
         this.scheduler = new KafkaScheduler(1, "scheduler-thread", true);
@@ -101,8 +102,8 @@ public class HighwatermarkCheckpointBench {
         this.time = new MockTime();
         this.failureChannel = new LogDirFailureChannel(brokerProperties.logDirs().size());
         final List<File> files =
-            CollectionConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
-        this.logManager = TestUtils.createLogManager(CollectionConverters.asScalaBuffer(files),
+            JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
+        this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
                 LogConfig.apply(), CleanerConfig.apply(1, 4 * 1024 * 1024L, 0.9d,
                         1024 * 1024, 32 * 1024 * 1024,
                         Double.MAX_VALUE, 15 * 1000, true, "MD5"), time);
@@ -160,7 +161,7 @@ public class HighwatermarkCheckpointBench {
         this.metrics.close();
         this.scheduler.shutdown();
         this.quotaManagers.shutdown();
-        for (File dir : CollectionConverters.asJavaCollection(logManager.liveLogDirs())) {
+        for (File dir : JavaConverters.asJavaCollection(logManager.liveLogDirs())) {
             Utils.delete(dir);
         }
     }