You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/04/23 01:50:45 UTC

git commit: kafka-866; Recover segment does shallow iteration to fix index causing inconsistencies; patched by Sriram Subramanian; reviewed by Jun Rao

Updated Branches:
  refs/heads/0.8 c5bb1d499 -> f1d2141ca


kafka-866; Recover segment does shallow iteration to fix index causing inconsistencies; patched by Sriram Subramanian; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f1d2141c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f1d2141c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f1d2141c

Branch: refs/heads/0.8
Commit: f1d2141ca065d51b41aeac8c7e2124672298d5f0
Parents: c5bb1d4
Author: Sriram Subramanian <sr...@gmail.com>
Authored: Mon Apr 22 16:50:29 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Mon Apr 22 16:50:29 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala |   10 +++++++++-
 1 files changed, 9 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f1d2141c/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index e38b95c..ef708e2 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -218,7 +218,15 @@ private[kafka] class Log(val dir: File,
         val entry = iter.next
         entry.message.ensureValid()
         if(validBytes - lastIndexEntry > indexIntervalBytes) {
-          segment.index.append(entry.offset, validBytes)
+          // we need to decompress the message, if required, to get the offset of the first uncompressed message
+          val startOffset =
+            entry.message.compressionCodec match {
+              case NoCompressionCodec =>
+                entry.offset
+              case _ =>
+                ByteBufferMessageSet.decompress(entry.message).head.offset
+          }
+          segment.index.append(startOffset, validBytes)
           lastIndexEntry = validBytes
         }
         validBytes += MessageSet.entrySize(entry.message)