You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/10 15:29:19 UTC

kafka git commit: KAFKA-6027; Access to log should throw KafkaStorageException after the log has been marked offline

Repository: kafka
Updated Branches:
  refs/heads/trunk b411f57c1 -> 29c46ddb9


KAFKA-6027; Access to log should throw KafkaStorageException after the log has been marked offline

Author: Dong Lin <li...@gmail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>

Closes #4045 from lindong28/KAFKA-6027


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29c46ddb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29c46ddb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29c46ddb

Branch: refs/heads/trunk
Commit: 29c46ddb91fac8c0fb3f35c1511e2afdcf67fd2e
Parents: b411f57
Author: Dong Lin <li...@gmail.com>
Authored: Tue Oct 10 08:29:17 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Oct 10 08:29:17 2017 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/log/AbstractIndex.scala    |  6 ++++-
 core/src/main/scala/kafka/log/Log.scala         | 23 +++++++++++++++++++-
 core/src/main/scala/kafka/log/LogCleaner.scala  |  4 +++-
 .../scala/kafka/server/ReplicaManager.scala     |  6 ++---
 .../src/test/scala/unit/kafka/log/LogTest.scala |  8 +++++++
 5 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 40ec870..c214dad 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -174,7 +174,11 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
     trimToValidSize()
   }
 
-  def closeHandler(): Unit = safeForceUnmap()
+  def closeHandler(): Unit = {
+    inLock(lock) {
+      safeForceUnmap()
+    }
+  }
 
   /**
    * Do a basic sanity check on this index to detect obvious problems

http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a6f76ab..c82bf56 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -151,6 +151,7 @@ class Log(@volatile var dir: File,
 
   /* A lock that guards all modifications to the log */
   private val lock = new Object
+  @volatile private var isClosed = false
 
   /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(time.milliseconds)
@@ -162,6 +163,11 @@ class Log(@volatile var dir: File,
       0
   }
 
+  private def checkIfLogOffline(): Unit = {
+    if (isClosed)
+      throw new KafkaStorageException(s"The log for partition $topicPartition is offline")
+  }
+
   @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
 
   /* The earliest offset which is part of an incomplete transaction. This is used to compute the
@@ -464,6 +470,7 @@ class Log(@volatile var dir: File,
   }
 
   private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
+    checkIfLogOffline()
     val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
     info(s"Loading producer state from offset $lastOffset for partition $topicPartition with message " +
       s"format version $messageFormatVersion")
@@ -554,6 +561,8 @@ class Log(@volatile var dir: File,
   def close() {
     debug(s"Closing log $name")
     lock synchronized {
+      checkIfLogOffline()
+      isClosed = true
       // We take a snapshot at the last written offset to hopefully avoid the need to scan the log
       // after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
       // (the clean shutdown file is written after the logs are all closed).
@@ -568,6 +577,7 @@ class Log(@volatile var dir: File,
   def closeHandlers() {
     debug(s"Closing handlers of log $name")
     lock synchronized {
+      isClosed = true
       logSegments.foreach(_.closeHandlers())
     }
   }
@@ -619,7 +629,7 @@ class Log(@volatile var dir: File,
 
       // they are valid, insert them in the log
       lock synchronized {
-
+        checkIfLogOffline()
         if (assignOffsets) {
           // assign offsets to the message set
           val offset = new LongRef(nextOffsetMetadata.messageOffset)
@@ -751,6 +761,7 @@ class Log(@volatile var dir: File,
   }
 
   private def updateFirstUnstableOffset(): Unit = lock synchronized {
+    checkIfLogOffline()
     val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
       case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
         val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
@@ -775,6 +786,7 @@ class Log(@volatile var dir: File,
     // in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
     maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
       lock synchronized {
+        checkIfLogOffline()
         if (newLogStartOffset > logStartOffset) {
           info(s"Incrementing log start offset of partition $topicPartition to $newLogStartOffset in dir ${dir.getParent}")
           logStartOffset = newLogStartOffset
@@ -1116,6 +1128,7 @@ class Log(@volatile var dir: File,
    */
   private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
     lock synchronized {
+      checkIfLogOffline()
       val deletable = deletableSegments(predicate)
       if (deletable.nonEmpty)
         info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
@@ -1131,6 +1144,7 @@ class Log(@volatile var dir: File,
         if (segments.size == numToDelete)
           roll()
         lock synchronized {
+          checkIfLogOffline()
           // remove the segments for lookups
           deletable.foreach(deleteSegment)
           maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
@@ -1280,6 +1294,7 @@ class Log(@volatile var dir: File,
     maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
       val start = time.nanoseconds
       lock synchronized {
+        checkIfLogOffline()
         val newOffset = math.max(expectedNextOffset, logEndOffset)
         val logFile = Log.logFile(dir, newOffset)
         val offsetIdxFile = offsetIndexFile(dir, newOffset)
@@ -1356,6 +1371,7 @@ class Log(@volatile var dir: File,
         segment.flush()
 
       lock synchronized {
+        checkIfLogOffline()
         if (offset > this.recoveryPoint) {
           this.recoveryPoint = offset
           lastflushedTime.set(time.milliseconds)
@@ -1406,6 +1422,7 @@ class Log(@volatile var dir: File,
   private[log] def delete() {
     maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
       lock synchronized {
+        checkIfLogOffline()
         logSegments.foreach(_.delete())
         segments.clear()
         leaderEpochCache.clear()
@@ -1416,6 +1433,7 @@ class Log(@volatile var dir: File,
 
   // visible for testing
   private[log] def takeProducerSnapshot(): Unit = lock synchronized {
+    checkIfLogOffline()
     producerStateManager.takeSnapshot()
   }
 
@@ -1450,6 +1468,7 @@ class Log(@volatile var dir: File,
       } else {
         info("Truncating log %s to offset %d.".format(name, targetOffset))
         lock synchronized {
+          checkIfLogOffline()
           if (segments.firstEntry.getValue.baseOffset > targetOffset) {
             truncateFullyAndStartAt(targetOffset)
           } else {
@@ -1477,6 +1496,7 @@ class Log(@volatile var dir: File,
     maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
       debug(s"Truncate and start log '$name' at offset $newOffset")
       lock synchronized {
+        checkIfLogOffline()
         val segmentsToDelete = logSegments.toList
         segmentsToDelete.foreach(deleteSegment)
         addSegment(new LogSegment(dir,
@@ -1603,6 +1623,7 @@ class Log(@volatile var dir: File,
    */
   private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
     lock synchronized {
+      checkIfLogOffline()
       // need to do this in two phases to be crash safe AND do the delete asynchronously
       // if we crash in the middle of this we complete the swap in loadSegments()
       if (!isRecoveredSwapFile)

http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 3ab244b..9017e4b 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -30,6 +30,7 @@ import kafka.utils._
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
 
@@ -264,6 +265,7 @@ class LogCleaner(val config: CleanerConfig,
             endOffset = nextDirtyOffset
           } catch {
             case _: LogCleaningAbortedException => // task can be aborted, let it go.
+            case _: KafkaStorageException => // partition is already offline. let it go.
             case e: IOException =>
               val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
               logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
@@ -493,7 +495,7 @@ private[log] class Cleaner(val id: Int,
    * provided
    *
    * @param topicPartition The topic and partition of the log segment to clean
-   * @param source The dirty log segment
+   * @param sourceRecords The dirty log segment
    * @param dest The cleaned log segment
    * @param map The key=>offset mapping
    * @param retainDeletes Should delete tombstones be retained while cleaning this segment

http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 560fdd4..755a043 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1368,7 +1368,7 @@ class ReplicaManager(val config: KafkaConfig,
         partition.getReplica(config.brokerId).exists { replica =>
           replica.log.isDefined && replica.log.get.dir.getParent == dir
         }
-      }.map(_.topicPartition)
+      }.map(_.topicPartition).toSet
 
       info(s"Partitions ${newOfflinePartitions.mkString(",")} are offline due to failure on log directory $dir")
 
@@ -1377,13 +1377,13 @@ class ReplicaManager(val config: KafkaConfig,
         partition.removePartitionMetrics()
       }
 
-      newOfflinePartitions.map(_.topic).toSet.foreach { topic: String =>
+      newOfflinePartitions.map(_.topic).foreach { topic: String =>
         val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic)
         if (!topicHasPartitions)
           brokerTopicStats.removeMetrics(topic)
       }
 
-      replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions.toSet)
+      replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
       highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
       info("Broker %d stopped fetcher for partitions %s because they are in the failed log dir %s"
         .format(localBrokerId, newOfflinePartitions.mkString(", "), dir))

http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1d0fd15..ec59a26 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1181,6 +1181,14 @@ class LogTest {
       log.readUncommitted(1, 200, None).records.batches.iterator.next().lastOffset)
   }
 
+  @Test(expected = classOf[KafkaStorageException])
+  def testLogRollAfterLogHandlerClosed() {
+    val logConfig = createLogConfig()
+    val log = createLog(logDir,  logConfig)
+    log.closeHandlers()
+    log.roll(1)
+  }
+
   @Test
   def testReadWithMinMessage() {
     val logConfig = createLogConfig(segmentBytes = 72)