You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2016/04/05 20:46:09 UTC
kafka git commit: KAFKA-3510; OffsetIndex thread safety
Repository: kafka
Updated Branches:
refs/heads/trunk 732b111f4 -> aee8ebb46
KAFKA-3510; OffsetIndex thread safety
* Make all fields accessed outside of a lock `volatile`
* Only allow mutation within the class
* Remove unnecessary `AtomicInteger` since mutation always happens inside a lock
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Guozhang Wang <wa...@gmail.com>
Closes #1188 from ijuma/kafka-3510-offset-index-thread-safety
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/aee8ebb4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/aee8ebb4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/aee8ebb4
Branch: refs/heads/trunk
Commit: aee8ebb46cd9393a0886c42cb88b080d065da397
Parents: 732b111
Author: Ismael Juma <is...@juma.me.uk>
Authored: Tue Apr 5 11:46:04 2016 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Tue Apr 5 11:46:04 2016 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 2 +-
core/src/main/scala/kafka/log/LogSegment.scala | 2 +-
core/src/main/scala/kafka/log/OffsetIndex.scala | 173 ++++++++++---------
.../scala/kafka/tools/DumpLogSegments.scala | 2 +-
.../scala/unit/kafka/log/OffsetIndexTest.scala | 10 +-
5 files changed, 98 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 81c19fa..8465b64 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -215,7 +215,7 @@ class Log(val dir: File,
val fileName = logFile.getName
val startOffset = fileName.substring(0, fileName.length - LogFileSuffix.length).toLong
val indexFile = new File(CoreUtils.replaceSuffix(logFile.getPath, LogFileSuffix, IndexFileSuffix) + SwapFileSuffix)
- val index = new OffsetIndex(file = indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
+ val index = new OffsetIndex(indexFile, baseOffset = startOffset, maxIndexSize = config.maxIndexSize)
val swapSegment = new LogSegment(new FileMessageSet(file = swapFile),
index = index,
baseOffset = startOffset,
http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/log/LogSegment.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogSegment.scala b/core/src/main/scala/kafka/log/LogSegment.scala
index 9fc68a4..3a4bbc8 100755
--- a/core/src/main/scala/kafka/log/LogSegment.scala
+++ b/core/src/main/scala/kafka/log/LogSegment.scala
@@ -55,7 +55,7 @@ class LogSegment(val log: FileMessageSet,
def this(dir: File, startOffset: Long, indexIntervalBytes: Int, maxIndexSize: Int, rollJitterMs: Long, time: Time, fileAlreadyExists: Boolean = false, initFileSize: Int = 0, preallocate: Boolean = false) =
this(new FileMessageSet(file = Log.logFilename(dir, startOffset), fileAlreadyExists = fileAlreadyExists, initFileSize = initFileSize, preallocate = preallocate),
- new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
+ new OffsetIndex(Log.indexFilename(dir, startOffset), baseOffset = startOffset, maxIndexSize = maxIndexSize),
startOffset,
indexIntervalBytes,
rollJitterMs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/log/OffsetIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetIndex.scala b/core/src/main/scala/kafka/log/OffsetIndex.scala
index e95c9d1..ce35d68 100755
--- a/core/src/main/scala/kafka/log/OffsetIndex.scala
+++ b/core/src/main/scala/kafka/log/OffsetIndex.scala
@@ -24,7 +24,6 @@ import java.io._
import java.nio._
import java.nio.channels._
import java.util.concurrent.locks._
-import java.util.concurrent.atomic._
import kafka.utils._
import kafka.utils.CoreUtils.inLock
import kafka.common.InvalidOffsetException
@@ -54,62 +53,70 @@ import kafka.common.InvalidOffsetException
* All external APIs translate from relative offsets to full offsets, so users of this class do not interact with the internal
* storage format.
*/
-class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
+class OffsetIndex(@volatile private[this] var _file: File, val baseOffset: Long, val maxIndexSize: Int = -1) extends Logging {
private val lock = new ReentrantLock
/* initialize the memory mapping for this index */
- private var mmap: MappedByteBuffer =
- {
- val newlyCreated = file.createNewFile()
- val raf = new RandomAccessFile(file, "rw")
- try {
- /* pre-allocate the file if necessary */
- if(newlyCreated) {
- if(maxIndexSize < 8)
- throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
- raf.setLength(roundToExactMultiple(maxIndexSize, 8))
- }
-
- /* memory-map the file */
- val len = raf.length()
- val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
-
- /* set the position in the index for the next entry */
- if(newlyCreated)
- idx.position(0)
- else
- // if this is a pre-existing index, assume it is all valid and set position to last entry
- idx.position(roundToExactMultiple(idx.limit, 8))
- idx
- } finally {
- CoreUtils.swallow(raf.close())
+ @volatile
+ private[this] var mmap: MappedByteBuffer = {
+ val newlyCreated = _file.createNewFile()
+ val raf = new RandomAccessFile(_file, "rw")
+ try {
+ /* pre-allocate the file if necessary */
+ if (newlyCreated) {
+ if (maxIndexSize < 8)
+ throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize)
+ raf.setLength(roundToExactMultiple(maxIndexSize, 8))
}
+
+ /* memory-map the file */
+ val len = raf.length()
+ val idx = raf.getChannel.map(FileChannel.MapMode.READ_WRITE, 0, len)
+
+ /* set the position in the index for the next entry */
+ if (newlyCreated)
+ idx.position(0)
+ else
+ // if this is a pre-existing index, assume it is all valid and set position to last entry
+ idx.position(roundToExactMultiple(idx.limit, 8))
+ idx
+ } finally {
+ CoreUtils.swallow(raf.close())
}
-
+ }
+
/* the number of eight-byte entries currently in the index */
- private var size = new AtomicInteger(mmap.position / 8)
-
- /**
- * The maximum number of eight-byte entries this index can hold
- */
@volatile
- var maxEntries = mmap.limit / 8
-
- /* the last offset in the index */
- var lastOffset = readLastEntry.offset
+ private[this] var _entries = mmap.position / 8
+
+ /* The maximum number of eight-byte entries this index can hold */
+ @volatile
+ private[this] var _maxEntries = mmap.limit / 8
+
+ @volatile
+ private[this] var _lastOffset = readLastEntry.offset
debug("Loaded index file %s with maxEntries = %d, maxIndexSize = %d, entries = %d, lastOffset = %d, file position = %d"
- .format(file.getAbsolutePath, maxEntries, maxIndexSize, entries(), lastOffset, mmap.position))
+ .format(_file.getAbsolutePath, _maxEntries, maxIndexSize, _entries, _lastOffset, mmap.position))
+
+ /** The maximum number of entries this index can hold */
+ def maxEntries: Int = _maxEntries
+
+ /** The last offset in the index */
+ def lastOffset: Long = _lastOffset
+
+ /** The index file */
+ def file: File = _file
/**
* The last entry in the index
*/
def readLastEntry(): OffsetPosition = {
inLock(lock) {
- size.get match {
+ _entries match {
case 0 => OffsetPosition(baseOffset, 0)
- case s => OffsetPosition(baseOffset + relativeOffset(this.mmap, s-1), physical(this.mmap, s-1))
+ case s => OffsetPosition(baseOffset + relativeOffset(mmap, s - 1), physical(mmap, s - 1))
}
}
}
@@ -149,22 +156,22 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
val relOffset = targetOffset - baseOffset
// check if the index is empty
- if(entries == 0)
+ if (_entries == 0)
return -1
// check if the target offset is smaller than the least offset
- if(relativeOffset(idx, 0) > relOffset)
+ if (relativeOffset(idx, 0) > relOffset)
return -1
// binary search for the entry
var lo = 0
- var hi = entries-1
- while(lo < hi) {
+ var hi = _entries - 1
+ while (lo < hi) {
val mid = ceil(hi/2.0 + lo/2.0).toInt
val found = relativeOffset(idx, mid)
- if(found == relOffset)
+ if (found == relOffset)
return mid
- else if(found < relOffset)
+ else if (found < relOffset)
lo = mid
else
hi = mid - 1
@@ -185,8 +192,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
*/
def entry(n: Int): OffsetPosition = {
maybeLock(lock) {
- if(n >= entries)
- throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, entries))
+ if(n >= _entries)
+ throw new IllegalArgumentException("Attempt to fetch the %dth entry from an index of size %d.".format(n, _entries))
val idx = mmap.duplicate
OffsetPosition(relativeOffset(idx, n), physical(idx, n))
}
@@ -197,17 +204,17 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
*/
def append(offset: Long, position: Int) {
inLock(lock) {
- require(!isFull, "Attempt to append to a full index (size = " + size + ").")
- if (size.get == 0 || offset > lastOffset) {
- debug("Adding index entry %d => %d to %s.".format(offset, position, file.getName))
- this.mmap.putInt((offset - baseOffset).toInt)
- this.mmap.putInt(position)
- this.size.incrementAndGet()
- this.lastOffset = offset
- require(entries * 8 == mmap.position, entries + " entries but file position in index is " + mmap.position + ".")
+ require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
+ if (_entries == 0 || offset > _lastOffset) {
+ debug("Adding index entry %d => %d to %s.".format(offset, position, _file.getName))
+ mmap.putInt((offset - baseOffset).toInt)
+ mmap.putInt(position)
+ _entries += 1
+ _lastOffset = offset
+ require(_entries * 8 == mmap.position, _entries + " entries but file position in index is " + mmap.position + ".")
} else {
throw new InvalidOffsetException("Attempt to append an offset (%d) to position %d no larger than the last offset appended (%d) to %s."
- .format(offset, entries, lastOffset, file.getAbsolutePath))
+ .format(offset, _entries, _lastOffset, _file.getAbsolutePath))
}
}
}
@@ -215,7 +222,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
/**
* True iff there are no more slots available in this index
*/
- def isFull: Boolean = entries >= this.maxEntries
+ def isFull: Boolean = _entries >= _maxEntries
/**
* Truncate the entire index, deleting all entries
@@ -252,9 +259,9 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
*/
private def truncateToEntries(entries: Int) {
inLock(lock) {
- this.size.set(entries)
- mmap.position(this.size.get * 8)
- this.lastOffset = readLastEntry.offset
+ _entries = entries
+ mmap.position(_entries * 8)
+ _lastOffset = readLastEntry.offset
}
}
@@ -264,7 +271,7 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
*/
def trimToValidSize() {
inLock(lock) {
- resize(entries * 8)
+ resize(_entries * 8)
}
}
@@ -276,18 +283,18 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
*/
def resize(newSize: Int) {
inLock(lock) {
- val raf = new RandomAccessFile(file, "rw")
+ val raf = new RandomAccessFile(_file, "rw")
val roundedNewSize = roundToExactMultiple(newSize, 8)
- val position = this.mmap.position
+ val position = mmap.position
/* Windows won't let us modify the file length while the file is mmapped :-( */
- if(Os.isWindows)
- forceUnmap(this.mmap)
+ if (Os.isWindows)
+ forceUnmap(mmap)
try {
raf.setLength(roundedNewSize)
- this.mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
- this.maxEntries = this.mmap.limit / 8
- this.mmap.position(position)
+ mmap = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize)
+ _maxEntries = mmap.limit / 8
+ mmap.position(position)
} finally {
CoreUtils.swallow(raf.close())
}
@@ -319,19 +326,19 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
* Delete this index file
*/
def delete(): Boolean = {
- info("Deleting index " + this.file.getAbsolutePath)
- if(Os.isWindows)
- CoreUtils.swallow(forceUnmap(this.mmap))
- this.file.delete()
+ info("Deleting index " + _file.getAbsolutePath)
+ if (Os.isWindows)
+ CoreUtils.swallow(forceUnmap(mmap))
+ _file.delete()
}
/** The number of entries in this index */
- def entries() = size.get
+ def entries = _entries
/**
* The number of bytes actually used by this index
*/
- def sizeInBytes() = 8 * entries
+ def sizeInBytes() = 8 * _entries
/** Close the index */
def close() {
@@ -343,8 +350,8 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
* @throws IOException if rename fails
*/
def renameTo(f: File) {
- try Utils.atomicMoveWithFallback(file.toPath, f.toPath)
- finally this.file = f
+ try Utils.atomicMoveWithFallback(_file.toPath, f.toPath)
+ finally _file = f
}
/**
@@ -352,13 +359,13 @@ class OffsetIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSi
* @throws IllegalArgumentException if any problems are found
*/
def sanityCheck() {
- require(entries == 0 || lastOffset > baseOffset,
+ require(_entries == 0 || lastOffset > baseOffset,
"Corrupt index found, index file (%s) has non-zero size but the last offset is %d and the base offset is %d"
- .format(file.getAbsolutePath, lastOffset, baseOffset))
- val len = file.length()
- require(len % 8 == 0,
- "Index file " + file.getName + " is corrupt, found " + len +
- " bytes which is not positive or not a multiple of 8.")
+ .format(_file.getAbsolutePath, lastOffset, baseOffset))
+ val len = _file.length()
+ require(len % 8 == 0,
+ "Index file " + _file.getName + " is corrupt, found " + len +
+ " bytes which is not positive or not a multiple of 8.")
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index e882a30..dc99672 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -124,7 +124,7 @@ object DumpLogSegments {
val startOffset = file.getName().split("\\.")(0).toLong
val logFile = new File(file.getAbsoluteFile.getParent, file.getName.split("\\.")(0) + Log.LogFileSuffix)
val messageSet = new FileMessageSet(logFile, false)
- val index = new OffsetIndex(file = file, baseOffset = startOffset)
+ val index = new OffsetIndex(file, baseOffset = startOffset)
//Check that index passes sanityCheck, this is the check that determines if indexes will be rebuilt on startup or not.
if (indexSanityOnly) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/aee8ebb4/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
index dfd7b54..869e618 100644
--- a/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
@@ -34,7 +34,7 @@ class OffsetIndexTest extends JUnitSuite {
@Before
def setup() {
- this.idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
+ this.idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 45L, maxIndexSize = 30 * 8)
}
@After
@@ -103,7 +103,7 @@ class OffsetIndexTest extends JUnitSuite {
idx.append(first.offset, first.position)
idx.append(sec.offset, sec.position)
idx.close()
- val idxRo = new OffsetIndex(file = idx.file, baseOffset = idx.baseOffset)
+ val idxRo = new OffsetIndex(idx.file, baseOffset = idx.baseOffset)
assertEquals(first, idxRo.lookup(first.offset))
assertEquals(sec, idxRo.lookup(sec.offset))
assertEquals(sec.offset, idxRo.lastOffset)
@@ -113,7 +113,7 @@ class OffsetIndexTest extends JUnitSuite {
@Test
def truncate() {
- val idx = new OffsetIndex(file = nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
+ val idx = new OffsetIndex(nonExistantTempFile(), baseOffset = 0L, maxIndexSize = 10 * 8)
idx.truncate()
for(i <- 1 until 10)
idx.append(i, i)
@@ -140,7 +140,7 @@ class OffsetIndexTest extends JUnitSuite {
idx.append(5, 5)
idx.truncate()
- assertEquals("Full truncation should leave no entries", 0, idx.entries())
+ assertEquals("Full truncation should leave no entries", 0, idx.entries)
idx.append(0, 0)
}
@@ -169,4 +169,4 @@ class OffsetIndexTest extends JUnitSuite {
file.delete()
file
}
-}
\ No newline at end of file
+}