You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/04 22:43:53 UTC

[3/5] git commit: High watermark recovered incorrrectly from file; kafka-681; patched by Jun Rao; reviewed by Joel Koshy, Neha Narkhede and John Fung

High watermark recovered incorrrectly from file; kafka-681; patched by Jun Rao; reviewed by Joel Koshy, Neha Narkhede and John Fung


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6cf46e7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6cf46e7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6cf46e7c

Branch: refs/heads/trunk
Commit: 6cf46e7cabd2041cb0c171bab4166d5f8b377a51
Parents: 25d77cc
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 4 09:13:22 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 4 09:13:22 2013 -0800

----------------------------------------------------------------------
 .../kafka/server/HighwaterMarkCheckpoint.scala     |   14 ++++++--------
 1 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6cf46e7c/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
index b726a7e..5aa0141 100644
--- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
@@ -77,8 +77,8 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging {
     try {
       hwFile.length() match {
         case 0 => 
-          warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
-               "partition %d. Returning 0 as the highwatermark".format(partition))
+          warn("No highwatermark file is found. Returning 0 as the highwatermark for topic %s partition %d."
+            .format(topic, partition))
           0L
         case _ =>
           val hwFileReader = new BufferedReader(new FileReader(hwFile))
@@ -90,12 +90,10 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging {
                 for(i <- 0 until numberOfHighWatermarks) yield {
                   val nextHwEntry = hwFileReader.readLine()
                   val partitionHwInfo = nextHwEntry.split(" ")
-                  val highwaterMark = partitionHwInfo.last.toLong
-                  val partitionId = partitionHwInfo.takeRight(2).head
-                  // find the index of partition
-                  val partitionIndex = nextHwEntry.indexOf(partitionId)
-                  val topic = nextHwEntry.substring(0, partitionIndex-1)
-                  (TopicAndPartition(topic, partitionId.toInt) -> highwaterMark)
+                  val topic = partitionHwInfo(0)
+                  val partitionId = partitionHwInfo(1).toInt
+                  val highWatermark = partitionHwInfo(2).toLong
+                  (TopicAndPartition(topic, partitionId) -> highWatermark)
                 }
               hwFileReader.close()
               val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))