You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/10/20 23:50:25 UTC

kafka git commit: KAFKA-6093; log.close() should not prevent log from being accessed

Repository: kafka
Updated Branches:
  refs/heads/1.0 51bb83d0d -> 0fb01be23


KAFKA-6093; log.close() should not prevent log from being accessed

Author: Dong Lin <li...@gmail.com>

Reviewers: Jun Rao <ju...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #4100 from lindong28/KAFKA-6093


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0fb01be2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0fb01be2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0fb01be2

Branch: refs/heads/1.0
Commit: 0fb01be2325298a40df074c3a1bc0e5c0865dfcd
Parents: 51bb83d
Author: Dong Lin <li...@gmail.com>
Authored: Sat Oct 21 00:50:21 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Sat Oct 21 00:50:21 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 57 +++++++++++---------
 .../src/test/scala/unit/kafka/log/LogTest.scala | 18 +++++++
 2 files changed, 49 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0fb01be2/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index c82bf56..bd192a3 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -151,7 +151,9 @@ class Log(@volatile var dir: File,
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
-  @volatile private var isClosed = false
+  // The memory mapped buffer for index files of this log will be closed with either delete() or closeHandlers()
+  // After memory mapped buffer is closed, no disk IO operation should be performed for this log
+  @volatile private var isMemoryMappedBufferClosed = false
 
   /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(time.milliseconds)
@@ -163,9 +165,9 @@ class Log(@volatile var dir: File,
       0
   }
 
-  private def checkIfLogOffline(): Unit = {
-    if (isClosed)
-      throw new KafkaStorageException(s"The log for partition $topicPartition is offline")
+  private def checkIfMemoryMappedBufferClosed(): Unit = {
+    if (isMemoryMappedBufferClosed)
+      throw new KafkaStorageException(s"The memory mapped buffer for log of $topicPartition is already closed")
   }
 
   @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@@ -470,7 +472,7 @@ class Log(@volatile var dir: File,
   }
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
-    checkIfLogOffline()
+    checkIfMemoryMappedBufferClosed()
     val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
     info(s"Loading producer state from offset $lastOffset for partition $topicPartition with message " +
       s"format version $messageFormatVersion")
@@ -556,18 +558,20 @@ class Log(@volatile var dir: File,
   def numberOfSegments: Int = segments.size
 
   /**
-   * Close this log
+   * Close this log.
+   * The memory mapped buffer for index files of this log will be left open until the log is deleted.
    */
   def close() {
     debug(s"Closing log $name")
     lock synchronized {
-      checkIfLogOffline()
-      isClosed = true
-      // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
-      // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
-      // (the clean shutdown file is written after the logs are all closed).
-      producerStateManager.takeSnapshot()
-      logSegments.foreach(_.close())
+      checkIfMemoryMappedBufferClosed()
+      maybeHandleIOException(s"Error while renaming dir for $topicPartition in dir ${dir.getParent}") {
+        // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
+        // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
+        // (the clean shutdown file is written after the logs are all closed).
+        producerStateManager.takeSnapshot()
+        logSegments.foreach(_.close())
+      }
     }
   }
 
@@ -577,8 +581,8 @@ class Log(@volatile var dir: File,
   def closeHandlers() {
     debug(s"Closing handlers of log $name")
     lock synchronized {
-      isClosed = true
       logSegments.foreach(_.closeHandlers())
+      isMemoryMappedBufferClosed = true
     }
   }
 
@@ -629,7 +633,7 @@ class Log(@volatile var dir: File,
 
       // they are valid, insert them in the log
       lock synchronized {
-        checkIfLogOffline()
+        checkIfMemoryMappedBufferClosed()
         if (assignOffsets) {
           // assign offsets to the message set
           val offset = new LongRef(nextOffsetMetadata.messageOffset)
@@ -761,7 +765,7 @@ class Log(@volatile var dir: File,
   }
 
   private def updateFirstUnstableOffset(): Unit = lock synchronized {
-    checkIfLogOffline()
+    checkIfMemoryMappedBufferClosed()
     val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
       case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
         val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
@@ -786,7 +790,7 @@ class Log(@volatile var dir: File,
     // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
     maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
       lock synchronized {
-        checkIfLogOffline()
+        checkIfMemoryMappedBufferClosed()
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset of partition $topicPartition to $newLogStartOffset in dir ${dir.getParent}")
           logStartOffset = newLogStartOffset
@@ -1128,7 +1132,6 @@ class Log(@volatile var dir: File,
    */
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
     lock synchronized {
-      checkIfLogOffline()
       val deletable = deletableSegments(predicate)
       if (deletable.nonEmpty)
         info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
@@ -1144,7 +1147,7 @@ class Log(@volatile var dir: File,
         if (segments.size == numToDelete)
           roll()
         lock synchronized {
-          checkIfLogOffline()
+          checkIfMemoryMappedBufferClosed()
           // remove the segments for lookups
           deletable.foreach(deleteSegment)
           maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
@@ -1294,7 +1297,7 @@ class Log(@volatile var dir: File,
     maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
       val start = time.nanoseconds
       lock synchronized {
-        checkIfLogOffline()
+        checkIfMemoryMappedBufferClosed()
         val newOffset = math.max(expectedNextOffset, logEndOffset)
         val logFile = Log.logFile(dir, newOffset)
         val offsetIdxFile = offsetIndexFile(dir, newOffset)
@@ -1371,7 +1374,7 @@ class Log(@volatile var dir: File,
         segment.flush()
 
       lock synchronized {
-        checkIfLogOffline()
+        checkIfMemoryMappedBufferClosed()
         if (offset > this.recoveryPoint) {
           this.recoveryPoint = offset
           lastflushedTime.set(time.milliseconds)
@@ -1422,18 +1425,20 @@ class Log(@volatile var dir: File,
   private[log] def delete() {
     maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
       lock synchronized {
-        checkIfLogOffline()
+        checkIfMemoryMappedBufferClosed()
         logSegments.foreach(_.delete())
         segments.clear()
         leaderEpochCache.clear()
         Utils.delete(dir)
+        // File handlers will be closed if this log is deleted
+        isMemoryMappedBufferClosed = true
       }
     }
   }
 
   // visible for testing
   private[log] def takeProducerSnapshot(): Unit = lock synchronized {
-    checkIfLogOffline()
+    checkIfMemoryMappedBufferClosed()
     producerStateManager.takeSnapshot()
   }
 
@@ -1468,7 +1473,7 @@ class Log(@volatile var dir: File,
       } else {
         info("Truncating log %s to offset %d.".format(name, targetOffset))
         lock synchronized {
-          checkIfLogOffline()
+          checkIfMemoryMappedBufferClosed()
           if (segments.firstEntry.getValue.baseOffset > targetOffset) {
             truncateFullyAndStartAt(targetOffset)
           } else {
@@ -1496,7 +1501,7 @@ class Log(@volatile var dir: File,
     maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
       debug(s"Truncate and start log '$name' at offset $newOffset")
       lock synchronized {
-        checkIfLogOffline()
+        checkIfMemoryMappedBufferClosed()
         val segmentsToDelete = logSegments.toList
         segmentsToDelete.foreach(deleteSegment)
         addSegment(new LogSegment(dir,
@@ -1623,7 +1628,7 @@ class Log(@volatile var dir: File,
    */
   private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
     lock synchronized {
-      checkIfLogOffline()
+      checkIfMemoryMappedBufferClosed()
       // need to do this in two phases to be crash safe AND do the delete asynchronously
       // if we crash in the middle of this we complete the swap in loadSegments()
       if (!isRecoveredSwapFile)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0fb01be2/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index e313cd4..0a0ce19 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -2094,6 +2094,24 @@ class LogTest {
     topic + "-" + partition
 
   @Test
+  def testLogDeletionAfterClose() {
+    def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
+    val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)
+    val log = createLog(logDir, logConfig)
+
+    // append some messages to create some segments
+    log.appendAsLeader(createRecords, leaderEpoch = 0)
+
+    assertEquals("The deleted segments should be gone.", 1, log.numberOfSegments)
+    assertEquals("Epoch entries should have gone.", 1, epochCache(log).epochEntries().size)
+
+    log.close()
+    log.delete()
+    assertEquals("The number of segments should be 0", 0, log.numberOfSegments)
+    assertEquals("Epoch entries should have gone.", 0, epochCache(log).epochEntries().size)
+  }
+
+  @Test
   def testDeleteOldSegments() {
     def createRecords = TestUtils.singletonRecords(value = "test".getBytes, timestamp = mockTime.milliseconds - 1000)
     val logConfig = createLogConfig(segmentBytes = createRecords.sizeInBytes * 5, segmentIndexBytes = 1000, retentionMs = 999)