You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/10/20 23:50:25 UTC
kafka git commit: KAFKA-6093;
log.close() should not prevent log from being accessed
Repository: kafka
Updated Branches:
refs/heads/1.0 51bb83d0d -> 0fb01be23
KAFKA-6093; log.close() should not prevent log from being accessed
Author: Dong Lin <li...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>
Closes #4100 from lindong28/KAFKA-6093
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0fb01be2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0fb01be2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0fb01be2
Branch: refs/heads/1.0
Commit: 0fb01be2325298a40df074c3a1bc0e5c0865dfcd
Parents: 51bb83d
Author: Dong Lin <li...@gmail.com>
Authored: Sat Oct 21 00:50:21 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Oct 21 00:50:21 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 57 +++++++++++---------
.../src/test/scala/unit/kafka/log/LogTest.scala | 18 +++++++
2 files changed, 49 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/0fb01be2/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c82bf56..bd192a3 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -151,7 +151,9 @@ class Log(@volatile var dir: File,
/* A lock that guards all modifications to the log */
private val lock = new Object
- @volatile private var isClosed = false
+ // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
+ // After memory mapped buffer is closed, no disk IO operation should be performed for this log
+ @volatile private var isMemoryMappedBufferClosed = false
/* last time it was flushed */
private val lastflushedTime = new AtomicLong(time.milliseconds)
@@ -163,9 +165,9 @@ class Log(@volatile var dir: File,
0
}
- private def checkIfLogOffline(): Unit = {
- if (isClosed)
- throw new KafkaStorageException(s"The log for partition $topicPartition is offline")
+ private def checkIfMemoryMappedBufferClosed(): Unit = {
+ if (isMemoryMappedBufferClosed)
+ throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
}
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@@ -470,7 +472,7 @@ class Log(@volatile var dir: File,
}
private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
info(s"Loading producer state from offset $lastOffset for partition $topicPartition with message " +
s"format version $messageFormatVersion")
@@ -556,18 +558,20 @@ class Log(@volatile var dir: File,
def numberOfSegments: Int = segments.size
/**
- * Close this log
+ * Close this log.
+ * The memory mapped buffer for index files of this log will be left open until the log is deleted.
*/
def close() {
debug(s"Closing log $name")
lock synchronized {
- checkIfLogOffline()
- isClosed = true
- // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
- // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
- // (the clean shutdown file is written after the logs are all closed).
- producerStateManager.takeSnapshot()
- logSegments.foreach(_.close())
+ checkIfMemoryMappedBufferClosed()
+ maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
+ // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
+ // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
+ // (the clean shutdown file is written after the logs are all closed).
+ producerStateManager.takeSnapshot()
+ logSegments.foreach(_.close())
+ }
}
}
@@ -577,8 +581,8 @@ class Log(@volatile var dir: File,
def closeHandlers() {
debug(s"Closing handlers of log $name")
lock synchronized {
- isClosed = true
logSegments.foreach(_.closeHandlers())
+ isMemoryMappedBufferClosed = true
}
}
@@ -629,7 +633,7 @@ class Log(@volatile var dir: File,
// they are valid, insert them in the log
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
if (assignOffsets) {
// assign offsets to the message set
val offset = new LongRef(nextOffsetMetadata.messageOffset)
@@ -761,7 +765,7 @@ class Log(@volatile var dir: File,
}
private def updateFirstUnstableOffset(): Unit = lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
@@ -786,7 +790,7 @@ class Log(@volatile var dir: File,
// in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
if (newLogStartOffset > logStartOffset) {
info(s"Incrementing log start offset of partition $topicPartition to $newLogStartOffset in dir ${dir.getParent}")
logStartOffset = newLogStartOffset
@@ -1128,7 +1132,6 @@ class Log(@volatile var dir: File,
*/
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
- checkIfLogOffline()
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
@@ -1144,7 +1147,7 @@ class Log(@volatile var dir: File,
if (segments.size == numToDelete)
roll()
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
// remove the segments for lookups
deletable.foreach(deleteSegment)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
@@ -1294,7 +1297,7 @@ class Log(@volatile var dir: File,
maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
val start = time.nanoseconds
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
val newOffset = math.max(expectedNextOffset, logEndOffset)
val logFile = Log.logFile(dir, newOffset)
val offsetIdxFile = offsetIndexFile(dir, newOffset)
@@ -1371,7 +1374,7 @@ class Log(@volatile var dir: File,
segment.flush()
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
if (offset > this.recoveryPoint) {
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
@@ -1422,18 +1425,20 @@ class Log(@volatile var dir: File,
private[log] def delete() {
maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
logSegments.foreach(_.delete())
segments.clear()
leaderEpochCache.clear()
Utils.delete(dir)
+ // File handlers will be closed if this log is deleted
+ isMemoryMappedBufferClosed = true
}
}
}
// visible for testing
private[log] def takeProducerSnapshot(): Unit = lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
producerStateManager.takeSnapshot()
}
@@ -1468,7 +1473,7 @@ class Log(@volatile var dir: File,
} else {
info("Truncating log %s to offset %d.".format(name, targetOffset))
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
if (segments.firstEntry.getValue.baseOffset > targetOffset) {
truncateFullyAndStartAt(targetOffset)
} else {
@@ -1496,7 +1501,7 @@ class Log(@volatile var dir: File,
maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
debug(s"Truncate and start log '$name' at offset $newOffset")
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
val segmentsToDelete = logSegments.toList
segmentsToDelete.foreach(deleteSegment)
addSegment(new LogSegment(dir,
@@ -1623,7 +1628,7 @@ class Log(@volatile var dir: File,
*/
private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
lock synchronized {
- checkIfLogOffline()
+ checkIfMemoryMappedBufferClosed()
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
http://git-wip-us.apache.org/repos/asf/kafka/blob/0fb01be2/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e313cd4..0a0ce19 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2094,6 +2094,24 @@ class LogTest {
topic + "-" + partition
@Test
+ def testLogDeletionAfterClose() {
+ def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
+ val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+ val log = createLog(logDir, logConfig)
+
+ // append some messages to create some segments
+ log.appendAsLeader(createRecords, leaderEpoch = 0)
+
+ assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+ assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+
+ log.close()
+ log.delete()
+ assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
+ assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+ }
+
+ @Test
def testDeleteOldSegments() {
def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)