You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2016/12/13 17:46:01 UTC

kafka git commit: KAFKA-4497: LogCleaner appended the wrong offset to time index. Backport the fix to 0.10.1.

Repository: kafka
Updated Branches:
  refs/heads/0.10.1 dce2e3f59 -> 5b869e7ee


KAFKA-4497: LogCleaner appended the wrong offset to time index. Backport the fix to 0.10.1.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5b869e7e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5b869e7e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5b869e7e

Branch: refs/heads/0.10.1
Commit: 5b869e7ee4c6b554787330c386988b5447b127ca
Parents: dce2e3f
Author: Jiangjie Qin <be...@gmail.com>
Authored: Sun Dec 11 17:32:05 2016 -0800
Committer: Jiangjie Qin <be...@gmail.com>
Committed: Sun Dec 11 17:32:05 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogCleaner.scala | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5b869e7e/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 219957f..e0b0bb8 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -494,14 +494,15 @@ private[log] class Cleaner(val id: Int,
                 writeOriginalMessageSet = false
 
               retainedMessages += deepMessageAndOffset
-              // We need the max timestamp and last offset for time index
-              if (deepMessageAndOffset.message.timestamp > maxTimestamp)
+              // We need the max timestamp and message offset for time index
+              if (deepMessageAndOffset.message.timestamp > maxTimestamp) {
                 maxTimestamp = deepMessageAndOffset.message.timestamp
+                offsetOfMaxTimestamp = deepMessageAndOffset.offset
+              }
             } else {
               writeOriginalMessageSet = false
             }
           }
-          offsetOfMaxTimestamp = if (retainedMessages.nonEmpty) retainedMessages.last.offset else -1L
           // There are no messages compacted out and no message format conversion, write the original message set back
           if (writeOriginalMessageSet)
             ByteBufferMessageSet.writeMessage(writeBuffer, shallowMessage, shallowOffset)
@@ -758,7 +759,7 @@ private case class CleanerStats(time: Time = SystemTime) {
   def elapsedSecs = (endTime - startTime)/1000.0
   
   def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
-  
+
   def clear() {
     startTime = time.milliseconds
     mapCompleteTime = -1L