You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/05 20:46:09 UTC

kafka git commit: KAFKA-3510; OffsetIndex thread safety

Repository: kafka
Updated Branches:
  refs/heads/trunk 732b111f4 -> aee8ebb46


KAFKA-3510; OffsetIndex thread safety

* Make all fields accessed outside of a lock `volatile`
* Only allow mutation within the class
* Remove unnecessary `AtomicInteger` since mutation always happens inside a lock

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Guozhang Wang <wa...@gmail.com>

Closes #1188 from ijuma/kafka-3510-offset-index-thread-safety


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aee8ebb4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aee8ebb4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aee8ebb4

Branch: refs/heads/trunk
Commit: aee8ebb46cd9393a0886c42cb88b080d065da397
Parents: 732b111
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Apr 5 11:46:04 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 5 11:46:04 2016 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         |   2 +-
 core/src/main/scala/kafka/log/LogSegment.scala  |   2 +-
 core/src/main/scala/kafka/log/OffsetIndex.scala | 173 ++++++++++---------
 .../scala/kafka/tools/DumpLogSegments.scala     |   2 +-
 .../scala/unit/kafka/log/OffsetIndexTest.scala  |  10 +-
 5 files changed, 98 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 81c19fa..8465b64 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -215,7 +215,7 @@ class Log(val dir: File,
       val fileName = logFile.getName
       val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
       val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
-      val index =  new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+      val index =  new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
       val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
                                        index = index,
                                        baseOffset = startOffset,

http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9fc68a4..3a4bbc8 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -55,7 +55,7 @@ class LogSegment(val log: FileMessageSet,
 
   def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
     this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
-         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+         new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
          startOffset,
          indexIntervalBytes,
          rollJitterMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index e95c9d1..ce35d68 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -24,7 +24,6 @@ import java.io._
 import java.nio._
 import java.nio.channels._
 import java.util.concurrent.locks._
-import java.util.concurrent.atomic._
 import kafka.utils._
 import kafka.utils.CoreUtils.inLock
 import kafka.common.InvalidOffsetException
@@ -54,62 +53,70 @@ import kafka.common.InvalidOffsetException
  * All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal 
  * storage format.
  */
-class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
+class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
   
   private val lock = new ReentrantLock
   
   /* initialize the memory mapping for this index */
-  private var mmap: MappedByteBuffer = 
-    {
-      val newlyCreated = file.createNewFile()
-      val raf = new RandomAccessFile(file, "rw")
-      try {
-        /* pre-allocate the file if necessary */
-        if(newlyCreated) {
-          if(maxIndexSize < 8)
-            throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
-          raf.setLength(roundToExactMultiple(maxIndexSize, 8))
-        }
-          
-        /* memory-map the file */
-        val len = raf.length()
-        val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-          
-        /* set the position in the index for the next entry */
-        if(newlyCreated)
-          idx.position(0)
-        else
-          // if this is a pre-existing index, assume it is all valid and set position to last entry
-          idx.position(roundToExactMultiple(idx.limit, 8))
-        idx
-      } finally {
-        CoreUtils.swallow(raf.close())
+  @volatile
+  private[this] var mmap: MappedByteBuffer = {
+    val newlyCreated = _file.createNewFile()
+    val raf = new RandomAccessFile(_file, "rw")
+    try {
+      /* pre-allocate the file if necessary */
+      if (newlyCreated) {
+        if (maxIndexSize < 8)
+          throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+        raf.setLength(roundToExactMultiple(maxIndexSize, 8))
       }
+
+      /* memory-map the file */
+      val len = raf.length()
+      val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+
+      /* set the position in the index for the next entry */
+      if (newlyCreated)
+        idx.position(0)
+      else
+        // if this is a pre-existing index, assume it is all valid and set position to last entry
+        idx.position(roundToExactMultiple(idx.limit, 8))
+      idx
+    } finally {
+      CoreUtils.swallow(raf.close())
     }
-  
+  }
+
   /* the number of eight-byte entries currently in the index */
-  private var size = new AtomicInteger(mmap.position / 8)
-  
-  /**
-   * The maximum number of eight-byte entries this index can hold
-   */
   @volatile
-  var maxEntries = mmap.limit / 8
-  
-  /* the last offset in the index */
-  var lastOffset = readLastEntry.offset
+  private[this] var _entries = mmap.position / 8
+
+  /* The maximum number of eight-byte entries this index can hold */
+  @volatile
+  private[this] var _maxEntries = mmap.limit / 8
+
+  @volatile
+  private[this] var _lastOffset = readLastEntry.offset
   
   debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
-    .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
+    .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
+
+  /** The maximum number of entries this index can hold */
+  def maxEntries: Int = _maxEntries
+
+  /** The last offset in the index */
+  def lastOffset: Long = _lastOffset
+
+  /** The index file */
+  def file: File = _file
 
   /**
    * The last entry in the index
    */
   def readLastEntry(): OffsetPosition = {
     inLock(lock) {
-      size.get match {
+      _entries match {
         case 0 => OffsetPosition(baseOffset, 0)
-        case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1))
+        case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1))
       }
     }
   }
@@ -149,22 +156,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
     val relOffset = targetOffset - baseOffset
     
     // check if the index is empty
-    if(entries == 0)
+    if (_entries == 0)
       return -1
     
     // check if the target offset is smaller than the least offset
-    if(relativeOffset(idx, 0) > relOffset)
+    if (relativeOffset(idx, 0) > relOffset)
       return -1
       
     // binary search for the entry
     var lo = 0
-    var hi = entries-1
-    while(lo < hi) {
+    var hi = _entries - 1
+    while (lo < hi) {
       val mid = ceil(hi/2.0 + lo/2.0).toInt
       val found = relativeOffset(idx, mid)
-      if(found == relOffset)
+      if (found == relOffset)
         return mid
-      else if(found < relOffset)
+      else if (found < relOffset)
         lo = mid
       else
         hi = mid - 1
@@ -185,8 +192,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def entry(n: Int): OffsetPosition = {
     maybeLock(lock) {
-      if(n >= entries)
-        throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
+      if(n >= _entries)
+        throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, _entries))
       val idx = mmap.duplicate
       OffsetPosition(relativeOffset(idx, n), physical(idx, n))
     }
@@ -197,17 +204,17 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def append(offset: Long, position: Int) {
     inLock(lock) {
-      require(!isFull, "Attempt to append to a full index (size = " + size + ").")
-      if (size.get == 0 || offset > lastOffset) {
-        debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
-        this.mmap.putInt((offset - baseOffset).toInt)
-        this.mmap.putInt(position)
-        this.size.incrementAndGet()
-        this.lastOffset = offset
-        require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
+      require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
+      if (_entries == 0 || offset > _lastOffset) {
+        debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
+        mmap.putInt((offset - baseOffset).toInt)
+        mmap.putInt(position)
+        _entries += 1
+        _lastOffset = offset
+        require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
       } else {
         throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
-          .format(offset, entries, lastOffset, file.getAbsolutePath))
+          .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
       }
     }
   }
@@ -215,7 +222,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
   /**
    * True iff there are no more slots available in this index
    */
-  def isFull: Boolean = entries >= this.maxEntries
+  def isFull: Boolean = _entries >= _maxEntries
   
   /**
    * Truncate the entire index, deleting all entries
@@ -252,9 +259,9 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   private def truncateToEntries(entries: Int) {
     inLock(lock) {
-      this.size.set(entries)
-      mmap.position(this.size.get * 8)
-      this.lastOffset = readLastEntry.offset
+      _entries = entries
+      mmap.position(_entries * 8)
+      _lastOffset = readLastEntry.offset
     }
   }
   
@@ -264,7 +271,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def trimToValidSize() {
     inLock(lock) {
-      resize(entries * 8)
+      resize(_entries * 8)
     }
   }
 
@@ -276,18 +283,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    */
   def resize(newSize: Int) {
     inLock(lock) {
-      val raf = new RandomAccessFile(file, "rw")
+      val raf = new RandomAccessFile(_file, "rw")
       val roundedNewSize = roundToExactMultiple(newSize, 8)
-      val position = this.mmap.position
+      val position = mmap.position
       
       /* Windows won't let us modify the file length while the file is mmapped :-( */
-      if(Os.isWindows)
-        forceUnmap(this.mmap)
+      if (Os.isWindows)
+        forceUnmap(mmap)
       try {
         raf.setLength(roundedNewSize)
-        this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
-        this.maxEntries = this.mmap.limit / 8
-        this.mmap.position(position)
+        mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+        _maxEntries = mmap.limit / 8
+        mmap.position(position)
       } finally {
         CoreUtils.swallow(raf.close())
       }
@@ -319,19 +326,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    * Delete this index file
    */
   def delete(): Boolean = {
-    info("Deleting index " + this.file.getAbsolutePath)
-    if(Os.isWindows)
-      CoreUtils.swallow(forceUnmap(this.mmap))
-    this.file.delete()
+    info("Deleting index " + _file.getAbsolutePath)
+    if (Os.isWindows)
+      CoreUtils.swallow(forceUnmap(mmap))
+    _file.delete()
   }
   
   /** The number of entries in this index */
-  def entries() = size.get
+  def entries = _entries
   
   /**
    * The number of bytes actually used by this index
    */
-  def sizeInBytes() = 8 * entries
+  def sizeInBytes() = 8 * _entries
   
   /** Close the index */
   def close() {
@@ -343,8 +350,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    * @throws IOException if rename fails
    */
   def renameTo(f: File) {
-    try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
-    finally this.file = f
+    try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
+    finally _file = f
   }
   
   /**
@@ -352,13 +359,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
    * @throws IllegalArgumentException if any problems are found
    */
   def sanityCheck() {
-    require(entries == 0 || lastOffset > baseOffset,
+    require(_entries == 0 || lastOffset > baseOffset,
             "Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
-            .format(file.getAbsolutePath, lastOffset, baseOffset))
-      val len = file.length()
-      require(len % 8 == 0, 
-              "Index file " + file.getName + " is corrupt, found " + len + 
-              " bytes which is not positive or not a multiple of 8.")
+            .format(_file.getAbsolutePath, lastOffset, baseOffset))
+    val len = _file.length()
+    require(len % 8 == 0,
+            "Index file " + _file.getName + " is corrupt, found " + len +
+            " bytes which is not positive or not a multiple of 8.")
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index e882a30..dc99672 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -124,7 +124,7 @@ object DumpLogSegments {
     val startOffset = file.getName().split("\\.")(0).toLong
     val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
     val messageSet = new FileMessageSet(logFile, false)
-    val index = new OffsetIndex(file = file, baseOffset = startOffset)
+    val index = new OffsetIndex(file, baseOffset = startOffset)
 
     //Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not.
     if (indexSanityOnly) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index dfd7b54..869e618 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -34,7 +34,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Before
   def setup() {
-    this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
+    this.idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
   }
   
   @After
@@ -103,7 +103,7 @@ class OffsetIndexTest extends JUnitSuite {
     idx.append(first.offset, first.position)
     idx.append(sec.offset, sec.position)
     idx.close()
-    val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset)
+    val idxRo = new OffsetIndex(idx.file, baseOffset = idx.baseOffset)
     assertEquals(first, idxRo.lookup(first.offset))
     assertEquals(sec, idxRo.lookup(sec.offset))
     assertEquals(sec.offset, idxRo.lastOffset)
@@ -113,7 +113,7 @@ class OffsetIndexTest extends JUnitSuite {
   
   @Test
   def truncate() {
-	val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+	val idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
 	idx.truncate()
     for(i <- 1 until 10)
       idx.append(i, i)
@@ -140,7 +140,7 @@ class OffsetIndexTest extends JUnitSuite {
     idx.append(5, 5)
     
     idx.truncate()
-    assertEquals("Full truncation should leave no entries", 0, idx.entries())
+    assertEquals("Full truncation should leave no entries", 0, idx.entries)
     idx.append(0, 0)
   }
   
@@ -169,4 +169,4 @@ class OffsetIndexTest extends JUnitSuite {
     file.delete()
     file
   }
-}
\ No newline at end of file
+}