You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/02 20:06:25 UTC

[kafka] branch 2.4 updated: KAFKA-9156: Fix LazyTimeIndex & LazyOffsetIndex concurrency (#7760)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 58fe5ee  KAFKA-9156: Fix LazyTimeIndex & LazyOffsetIndex concurrency (#7760)
58fe5ee is described below

commit 58fe5eebf274e31cdf6fa162797a117eb1fa84e6
Author: Alex Mironov <al...@gmail.com>
AuthorDate: Mon Dec 2 21:05:14 2019 +0100

    KAFKA-9156: Fix LazyTimeIndex & LazyOffsetIndex concurrency (#7760)
    
    Race condition in concurrent  `get` method invocation of lazy indexes might lead
    to multiple `OffsetIndex`/`TimeIndex` objects being concurrently created. When
    that happens position of `MappedByteBuffer` in `AbstractIndex` is advanced to
    the last entry which in turn leads to a critical `BufferOverflowException` being
    thrown whenever leader or replica tries to append a record to the segment.
    
    Moreover, `file_=` setter is seemingly also vulnerable to the race, since multiple
    threads can attempt to set a new file reference as well as create new
    Time/OffsetIndex objects at the same time. This might lead to the discrepant
    File references being held by e.g. LazyTimeIndex and its TimeIndex counterpart.
    
    This patch attempts to fix the issue by making sure that index objects are
    atomically constructed when loaded lazily via `get` method. Additionally, `file`
    reference modifications are also made atomic and thread safe.
    
    Note that the `Lazy*Index` mutation operations are executed with a lock held by
    the callers, but `get` can be called without a lock (e.g. from `Log.read`).
    
    Reviewers: Jun Rao <ju...@gmail.com>, Jason Gustafson <ja...@confluent.io>, Shilin Lu, Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/AbstractIndex.scala | 12 +--
 core/src/main/scala/kafka/log/LazyIndex.scala     | 90 +++++++++++++++++++++++
 core/src/main/scala/kafka/log/LogSegment.scala    |  8 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala   | 36 +--------
 core/src/main/scala/kafka/log/TimeIndex.scala     | 36 +--------
 core/src/test/scala/unit/kafka/log/LogUtils.scala |  4 +-
 6 files changed, 104 insertions(+), 82 deletions(-)

diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 242d074..007ab07 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -38,8 +38,8 @@ import scala.math.ceil
  * @param baseOffset the base offset of the segment that this index is corresponding to.
  * @param maxIndexSize The maximum index size in bytes.
  */
-abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
-                                   val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable {
+abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
+                             val writable: Boolean) extends Closeable {
   import AbstractIndex._
 
   // Length of the index file
@@ -144,11 +144,11 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
    * The maximum number of entries this index can hold
    */
   @volatile
-  private[this] var _maxEntries = mmap.limit() / entrySize
+  private[this] var _maxEntries: Int = mmap.limit() / entrySize
 
   /** The number of entries in this index */
   @volatile
-  protected var _entries = mmap.position() / entrySize
+  protected var _entries: Int = mmap.position() / entrySize
 
   /**
    * True iff there are no more slots available in this index
@@ -244,7 +244,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
   /**
    * The number of bytes actually used by this index
    */
-  def sizeInBytes = entrySize * _entries
+  def sizeInBytes: Int = entrySize * _entries
 
   /** Close the index */
   def close(): Unit = {
@@ -427,7 +427,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
 }
 
 object AbstractIndex extends Logging {
-  override val loggerName: String = classOf[AbstractIndex[_, _]].getName
+  override val loggerName: String = classOf[AbstractIndex].getName
 }
 
 object IndexSearchType extends Enumeration {
diff --git a/core/src/main/scala/kafka/log/LazyIndex.scala b/core/src/main/scala/kafka/log/LazyIndex.scala
new file mode 100644
index 0000000..596fcaa
--- /dev/null
+++ b/core/src/main/scala/kafka/log/LazyIndex.scala
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io.File
+import java.util.concurrent.locks.ReentrantLock
+
+import LazyIndex._
+import kafka.utils.CoreUtils.inLock
+import kafka.utils.threadsafe
+
+/**
+  * A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading (i.e. memory mapping) the
+  * underlying index until it is accessed for the first time via the `get` method.
+  *
+  * This is an important optimization with regards to broker start-up time if it has a large number of segments.
+  *
+  * Methods of this class are thread safe. Make sure to check `AbstractIndex` subclasses documentation
+  * to establish their thread safety.
+  *
+  * @param loadIndex A function that takes a `File` pointing to an index and returns a loaded `AbstractIndex` instance.
+  */
+@threadsafe
+class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: IndexWrapper, loadIndex: File => T) {
+
+  private val lock = new ReentrantLock()
+
+  def file: File = indexWrapper.file
+
+  def file_=(f: File): Unit = {
+    inLock(lock) {
+      indexWrapper.file = f
+    }
+  }
+
+  def get: T = {
+    indexWrapper match {
+      case indexValue: IndexValue[T] => indexValue.index
+      case _: IndexFile =>
+        inLock(lock) {
+          indexWrapper match {
+            case indexValue: IndexValue[T] => indexValue.index
+            case indexFile: IndexFile =>
+              val indexValue = new IndexValue(loadIndex(indexFile.file))
+              indexWrapper = indexValue
+              indexValue.index
+          }
+        }
+    }
+  }
+
+}
+
+object LazyIndex {
+
+  def forOffset(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true): LazyIndex[OffsetIndex] =
+    new LazyIndex(new IndexFile(file), file => new OffsetIndex(file, baseOffset, maxIndexSize, writable))
+
+  def forTime(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true): LazyIndex[TimeIndex] =
+    new LazyIndex(new IndexFile(file), file => new TimeIndex(file, baseOffset, maxIndexSize, writable))
+
+  private sealed trait IndexWrapper {
+    def file: File
+    def file_=(f: File)
+  }
+
+  private class IndexFile(@volatile var file: File) extends IndexWrapper
+
+  private class IndexValue[T <: AbstractIndex](val index: T) extends IndexWrapper {
+    override def file: File = index.file
+    override def file_=(f: File): Unit = index.file = f
+  }
+
+}
+
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 6e33665..a54c2c2 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -54,8 +54,8 @@ import scala.math._
  */
 @nonthreadsafe
 class LogSegment private[log] (val log: FileRecords,
-                               val lazyOffsetIndex: LazyOffsetIndex,
-                               val lazyTimeIndex: LazyTimeIndex,
+                               val lazyOffsetIndex: LazyIndex[OffsetIndex],
+                               val lazyTimeIndex: LazyIndex[TimeIndex],
                                val txnIndex: TransactionIndex,
                                val baseOffset: Long,
                                val indexIntervalBytes: Int,
@@ -655,8 +655,8 @@ object LogSegment {
     val maxIndexSize = config.maxIndexSize
     new LogSegment(
       FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
-      new LazyOffsetIndex(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
-      new LazyTimeIndex(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
+      LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
+      LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
       new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)),
       baseOffset,
       indexIntervalBytes = config.indexInterval,
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index 204124a..cd90799 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.errors.InvalidOffsetException
  */
 // Avoid shadowing mutable `file` in AbstractIndex
 class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
-    extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
+    extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
   import OffsetIndex._
 
   override def entrySize = 8
@@ -205,37 +205,3 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
 object OffsetIndex extends Logging {
   override val loggerName: String = classOf[OffsetIndex].getName
 }
-
-
-
-/**
-  * A thin wrapper on top of the raw OffsetIndex object to avoid initialization on construction. This defers the OffsetIndex
-  * initialization to the time it gets accessed so the cost of the heavy memory mapped operation gets amortized over time.
-  *
-  * Combining with skipping sanity check for safely flushed segments, the startup time of a broker can be reduced, especially
-  * for the the broker with a lot of log segments
-  *
-  */
-class LazyOffsetIndex(@volatile private var _file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) {
-  @volatile private var offsetIndex: Option[OffsetIndex] = None
-
-  def file: File = {
-    if (offsetIndex.isDefined)
-      offsetIndex.get.file
-    else
-      _file
-  }
-
-  def file_=(f: File): Unit = {
-    if (offsetIndex.isDefined)
-      offsetIndex.get.file = f
-    else
-      _file = f
-  }
-
-  def get: OffsetIndex = {
-    if (offsetIndex.isEmpty)
-      offsetIndex = Some(new OffsetIndex(_file, baseOffset, maxIndexSize, writable))
-    offsetIndex.get
-  }
-}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/log/TimeIndex.scala b/core/src/main/scala/kafka/log/TimeIndex.scala
index a5f4e79..40e1dcf 100644
--- a/core/src/main/scala/kafka/log/TimeIndex.scala
+++ b/core/src/main/scala/kafka/log/TimeIndex.scala
@@ -51,7 +51,7 @@ import org.apache.kafka.common.record.RecordBatch
  */
 // Avoid shadowing mutable file in AbstractIndex
 class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
-    extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) {
+    extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
   import TimeIndex._
 
   @volatile private var _lastEntry = lastEntryFromIndexFile
@@ -227,37 +227,3 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
 object TimeIndex extends Logging {
   override val loggerName: String = classOf[TimeIndex].getName
 }
-
-
-
-/**
-  * A thin wrapper on top of the raw TimeIndex object to avoid initialization on construction. This defers the TimeIndex
-  * initialization to the time it gets accessed so the cost of the heavy memory mapped operation gets amortized over time.
-  *
-  * Combining with skipping sanity check for safely flushed segments, the startup time of a broker can be reduced, especially
-  * for the the broker with a lot of log segments
-  *
-  */
-class LazyTimeIndex(@volatile private var _file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true) {
-  @volatile private var timeIndex: Option[TimeIndex] = None
-
-  def file: File = {
-    if (timeIndex.isDefined)
-      timeIndex.get.file
-    else
-      _file
-  }
-
-  def file_=(f: File): Unit = {
-    if (timeIndex.isDefined)
-      timeIndex.get.file = f
-    else
-      _file = f
-  }
-
-  def get: TimeIndex = {
-    if (timeIndex.isEmpty)
-      timeIndex = Some(new TimeIndex(_file, baseOffset, maxIndexSize, writable))
-    timeIndex.get
-  }
-}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/log/LogUtils.scala b/core/src/test/scala/unit/kafka/log/LogUtils.scala
index 838a699..8bf9812 100644
--- a/core/src/test/scala/unit/kafka/log/LogUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogUtils.scala
@@ -31,8 +31,8 @@ object LogUtils {
                     indexIntervalBytes: Int = 10,
                     time: Time = Time.SYSTEM): LogSegment = {
     val ms = FileRecords.open(Log.logFile(logDir, offset))
-    val idx = new LazyOffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
-    val timeIdx = new LazyTimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
+    val idx = LazyIndex.forOffset(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
+    val timeIdx = LazyIndex.forTime(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
     val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))
 
     new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)