You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/12 21:37:22 UTC
kafka git commit: KAFKA-3123;
Ensure cleaning is resumed if truncateTo throws
Repository: kafka
Updated Branches:
refs/heads/trunk 7d1ef63be -> 9180cb23d
KAFKA-3123; Ensure cleaning is resumed if truncateTo throws
…f range
>From https://github.com/apache/kafka/pull/1716#discussion_r112000498, ensure the cleaner is restarted if Log.truncateTo throws
Author: Mickael Maison <mi...@gmail.com>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3296 from mimaison/KAFKA-3123
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9180cb23
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9180cb23
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9180cb23
Branch: refs/heads/trunk
Commit: 9180cb23d7a9816927ae9a7736d7faff673e1def
Parents: 7d1ef63
Author: Mickael Maison <mi...@gmail.com>
Authored: Mon Jun 12 22:36:47 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Mon Jun 12 22:36:47 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogManager.scala | 15 +++++++++------
1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/9180cb23/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index d8cdf90..61879be 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -324,13 +324,16 @@ class LogManager(val logDirs: Array[File],
// If the log does not exist, skip it
if (log != null) {
//May need to abort and pause the cleaning of the log, and resume after truncation is done.
- val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset
- if (needToStopCleaner && cleaner != null)
+ val needToStopCleaner = cleaner != null && truncateOffset < log.activeSegment.baseOffset
+ if (needToStopCleaner)
cleaner.abortAndPauseCleaning(topicPartition)
- log.truncateTo(truncateOffset)
- if (needToStopCleaner && cleaner != null) {
- cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
- cleaner.resumeCleaning(topicPartition)
+ try {
+ log.truncateTo(truncateOffset)
+ if (needToStopCleaner)
+ cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
+ } finally {
+ if (needToStopCleaner)
+ cleaner.resumeCleaning(topicPartition)
}
}
}