You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/12/13 17:46:01 UTC
kafka git commit: KAFKA-4497: LogCleaner appended the wrong offset to
time index. Backport the fix to 0.10.1.
Repository: kafka
Updated Branches:
refs/heads/0.10.1 dce2e3f59 -> 5b869e7ee
KAFKA-4497: LogCleaner appended the wrong offset to time index. Backport the fix to 0.10.1.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b869e7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b869e7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b869e7e
Branch: refs/heads/0.10.1
Commit: 5b869e7ee4c6b554787330c386988b5447b127ca
Parents: dce2e3f
Author: Jiangjie Qin <be...@gmail.com>
Authored: Sun Dec 11 17:32:05 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sun Dec 11 17:32:05 2016 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/log/LogCleaner.scala | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/5b869e7e/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 219957f..e0b0bb8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -494,14 +494,15 @@ private[log] class Cleaner(val id: Int,
writeOriginalMessageSet = false
retainedMessages += deepMessageAndOffset
- // We need the max timestamp and last offset for time index
- if (deepMessageAndOffset.message.timestamp > maxTimestamp)
+ // We need the max timestamp and message offset for time index
+ if (deepMessageAndOffset.message.timestamp > maxTimestamp) {
maxTimestamp = deepMessageAndOffset.message.timestamp
+ offsetOfMaxTimestamp = deepMessageAndOffset.offset
+ }
} else {
writeOriginalMessageSet = false
}
}
- offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
// There are no messages compacted out and no message format conversion, write the original message set back
if (writeOriginalMessageSet)
ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset)
@@ -758,7 +759,7 @@ private case class CleanerStats(time: Time = SystemTime) {
def elapsedSecs = (endTime - startTime)/1000.0
def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
-
+
def clear() {
startTime = time.milliseconds
mapCompleteTime = -1L