You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2017/10/10 15:29:19 UTC
kafka git commit: KAFKA-6027;
Access to log should throw KafkaStorageException after the log has
been marked offline
Repository: kafka
Updated Branches:
refs/heads/trunk b411f57c1 -> 29c46ddb9
KAFKA-6027; Access to log should throw KafkaStorageException after the log has been marked offline
Author: Dong Lin <li...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
Closes #4045 from lindong28/KAFKA-6027
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/29c46ddb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/29c46ddb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/29c46ddb
Branch: refs/heads/trunk
Commit: 29c46ddb91fac8c0fb3f35c1511e2afdcf67fd2e
Parents: b411f57
Author: Dong Lin <li...@gmail.com>
Authored: Tue Oct 10 08:29:17 2017 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Tue Oct 10 08:29:17 2017 -0700
----------------------------------------------------------------------
.../main/scala/kafka/log/AbstractIndex.scala | 6 ++++-
core/src/main/scala/kafka/log/Log.scala | 23 +++++++++++++++++++-
core/src/main/scala/kafka/log/LogCleaner.scala | 4 +++-
.../scala/kafka/server/ReplicaManager.scala | 6 ++---
.../src/test/scala/unit/kafka/log/LogTest.scala | 8 +++++++
5 files changed, 41 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index 40ec870..c214dad 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -174,7 +174,11 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
trimToValidSize()
}
- def closeHandler(): Unit = safeForceUnmap()
+ def closeHandler(): Unit = {
+ inLock(lock) {
+ safeForceUnmap()
+ }
+ }
/**
* Do a basic sanity check on this index to detect obvious problems
http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a6f76ab..c82bf56 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -151,6 +151,7 @@ class Log(@volatile var dir: File,
/* A lock that guards all modifications to the log */
private val lock = new Object
+ @volatile private var isClosed = false
/* last time it was flushed */
private val lastflushedTime = new AtomicLong(time.milliseconds)
@@ -162,6 +163,11 @@ class Log(@volatile var dir: File,
0
}
+ private def checkIfLogOffline(): Unit = {
+ if (isClosed)
+ throw new KafkaStorageException(s"The log for partition $topicPartition is offline")
+ }
+
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
/* The earliest offset which is part of an incomplete transaction. This is used to compute the
@@ -464,6 +470,7 @@ class Log(@volatile var dir: File,
}
private def loadProducerState(lastOffset: Long, reloadFromCleanShutdown: Boolean): Unit = lock synchronized {
+ checkIfLogOffline()
val messageFormatVersion = config.messageFormatVersion.messageFormatVersion
info(s"Loading producer state from offset $lastOffset for partition $topicPartition with message " +
s"format version $messageFormatVersion")
@@ -554,6 +561,8 @@ class Log(@volatile var dir: File,
def close() {
debug(s"Closing log $name")
lock synchronized {
+ checkIfLogOffline()
+ isClosed = true
// We take a snapshot at the last written offset to hopefully avoid the need to scan the log
// after restarting and to ensure that we cannot inadvertently hit the upgrade optimization
// (the clean shutdown file is written after the logs are all closed).
@@ -568,6 +577,7 @@ class Log(@volatile var dir: File,
def closeHandlers() {
debug(s"Closing handlers of log $name")
lock synchronized {
+ isClosed = true
logSegments.foreach(_.closeHandlers())
}
}
@@ -619,7 +629,7 @@ class Log(@volatile var dir: File,
// they are valid, insert them in the log
lock synchronized {
-
+ checkIfLogOffline()
if (assignOffsets) {
// assign offsets to the message set
val offset = new LongRef(nextOffsetMetadata.messageOffset)
@@ -751,6 +761,7 @@ class Log(@volatile var dir: File,
}
private def updateFirstUnstableOffset(): Unit = lock synchronized {
+ checkIfLogOffline()
val updatedFirstStableOffset = producerStateManager.firstUnstableOffset match {
case Some(logOffsetMetadata) if logOffsetMetadata.messageOffsetOnly || logOffsetMetadata.messageOffset < logStartOffset =>
val offset = math.max(logOffsetMetadata.messageOffset, logStartOffset)
@@ -775,6 +786,7 @@ class Log(@volatile var dir: File,
// in an unclean manner within log.flush.start.offset.checkpoint.interval.ms. The chance of this happening is low.
maybeHandleIOException(s"Exception while increasing log start offset for $topicPartition to $newLogStartOffset in dir ${dir.getParent}") {
lock synchronized {
+ checkIfLogOffline()
if (newLogStartOffset > logStartOffset) {
info(s"Incrementing log start offset of partition $topicPartition to $newLogStartOffset in dir ${dir.getParent}")
logStartOffset = newLogStartOffset
@@ -1116,6 +1128,7 @@ class Log(@volatile var dir: File,
*/
private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
lock synchronized {
+ checkIfLogOffline()
val deletable = deletableSegments(predicate)
if (deletable.nonEmpty)
info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
@@ -1131,6 +1144,7 @@ class Log(@volatile var dir: File,
if (segments.size == numToDelete)
roll()
lock synchronized {
+ checkIfLogOffline()
// remove the segments for lookups
deletable.foreach(deleteSegment)
maybeIncrementLogStartOffset(segments.firstEntry.getValue.baseOffset)
@@ -1280,6 +1294,7 @@ class Log(@volatile var dir: File,
maybeHandleIOException(s"Error while rolling log segment for $topicPartition in dir ${dir.getParent}") {
val start = time.nanoseconds
lock synchronized {
+ checkIfLogOffline()
val newOffset = math.max(expectedNextOffset, logEndOffset)
val logFile = Log.logFile(dir, newOffset)
val offsetIdxFile = offsetIndexFile(dir, newOffset)
@@ -1356,6 +1371,7 @@ class Log(@volatile var dir: File,
segment.flush()
lock synchronized {
+ checkIfLogOffline()
if (offset > this.recoveryPoint) {
this.recoveryPoint = offset
lastflushedTime.set(time.milliseconds)
@@ -1406,6 +1422,7 @@ class Log(@volatile var dir: File,
private[log] def delete() {
maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
lock synchronized {
+ checkIfLogOffline()
logSegments.foreach(_.delete())
segments.clear()
leaderEpochCache.clear()
@@ -1416,6 +1433,7 @@ class Log(@volatile var dir: File,
// visible for testing
private[log] def takeProducerSnapshot(): Unit = lock synchronized {
+ checkIfLogOffline()
producerStateManager.takeSnapshot()
}
@@ -1450,6 +1468,7 @@ class Log(@volatile var dir: File,
} else {
info("Truncating log %s to offset %d.".format(name, targetOffset))
lock synchronized {
+ checkIfLogOffline()
if (segments.firstEntry.getValue.baseOffset > targetOffset) {
truncateFullyAndStartAt(targetOffset)
} else {
@@ -1477,6 +1496,7 @@ class Log(@volatile var dir: File,
maybeHandleIOException(s"Error while truncating the entire log for $topicPartition in dir ${dir.getParent}") {
debug(s"Truncate and start log '$name' at offset $newOffset")
lock synchronized {
+ checkIfLogOffline()
val segmentsToDelete = logSegments.toList
segmentsToDelete.foreach(deleteSegment)
addSegment(new LogSegment(dir,
@@ -1603,6 +1623,7 @@ class Log(@volatile var dir: File,
*/
private[log] def replaceSegments(newSegment: LogSegment, oldSegments: Seq[LogSegment], isRecoveredSwapFile: Boolean = false) {
lock synchronized {
+ checkIfLogOffline()
// need to do this in two phases to be crash safe AND do the delete asynchronously
// if we crash in the middle of this we complete the swap in loadSegments()
if (!isRecoveredSwapFile)
http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 3ab244b..9017e4b 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -30,6 +30,7 @@ import kafka.utils._
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
import org.apache.kafka.common.record.MemoryRecords.RecordFilter.BatchRetention
@@ -264,6 +265,7 @@ class LogCleaner(val config: CleanerConfig,
endOffset = nextDirtyOffset
} catch {
case _: LogCleaningAbortedException => // task can be aborted, let it go.
+ case _: KafkaStorageException => // partition is already offline. let it go.
case e: IOException =>
val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
@@ -493,7 +495,7 @@ private[log] class Cleaner(val id: Int,
* provided
*
* @param topicPartition The topic and partition of the log segment to clean
- * @param source The dirty log segment
+ * @param sourceRecords The dirty log segment
* @param dest The cleaned log segment
* @param map The key=>offset mapping
* @param retainDeletes Should delete tombstones be retained while cleaning this segment
http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 560fdd4..755a043 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -1368,7 +1368,7 @@ class ReplicaManager(val config: KafkaConfig,
partition.getReplica(config.brokerId).exists { replica =>
replica.log.isDefined && replica.log.get.dir.getParent == dir
}
- }.map(_.topicPartition)
+ }.map(_.topicPartition).toSet
info(s"Partitions ${newOfflinePartitions.mkString(",")} are offline due to failure on log directory $dir")
@@ -1377,13 +1377,13 @@ class ReplicaManager(val config: KafkaConfig,
partition.removePartitionMetrics()
}
- newOfflinePartitions.map(_.topic).toSet.foreach { topic: String =>
+ newOfflinePartitions.map(_.topic).foreach { topic: String =>
val topicHasPartitions = allPartitions.values.exists(partition => topic == partition.topic)
if (!topicHasPartitions)
brokerTopicStats.removeMetrics(topic)
}
- replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions.toSet)
+ replicaFetcherManager.removeFetcherForPartitions(newOfflinePartitions)
highWatermarkCheckpoints = highWatermarkCheckpoints.filterKeys(_ != dir)
info("Broker %d stopped fetcher for partitions %s because they are in the failed log dir %s"
.format(localBrokerId, newOfflinePartitions.mkString(", "), dir))
http://git-wip-us.apache.org/repos/asf/kafka/blob/29c46ddb/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1d0fd15..ec59a26 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1181,6 +1181,14 @@ class LogTest {
log.readUncommitted(1, 200, None).records.batches.iterator.next().lastOffset)
}
+ @Test(expected = classOf[KafkaStorageException])
+ def testLogRollAfterLogHandlerClosed() {
+ val logConfig = createLogConfig()
+ val log = createLog(logDir, logConfig)
+ log.closeHandlers()
+ log.roll(1)
+ }
+
@Test
def testReadWithMinMessage() {
val logConfig = createLogConfig(segmentBytes = 72)