You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/01/04 18:15:59 UTC
git commit: High watermark recovered incorrrectly from file; kafka-681;
patched by Jun Rao; reviewed by Joel Koshy, Neha Narkhede and John Fung
Updated Branches:
refs/heads/0.8 25d77cc69 -> 6cf46e7ca
High watermark recovered incorrrectly from file; kafka-681; patched by Jun Rao; reviewed by Joel Koshy, Neha Narkhede and John Fung
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6cf46e7c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6cf46e7c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6cf46e7c
Branch: refs/heads/0.8
Commit: 6cf46e7cabd2041cb0c171bab4166d5f8b377a51
Parents: 25d77cc
Author: Jun Rao <ju...@gmail.com>
Authored: Fri Jan 4 09:13:22 2013 -0800
Committer: Jun Rao <ju...@gmail.com>
Committed: Fri Jan 4 09:13:22 2013 -0800
----------------------------------------------------------------------
.../kafka/server/HighwaterMarkCheckpoint.scala | 14 ++++++--------
1 files changed, 6 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/6cf46e7c/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
index b726a7e..5aa0141 100644
--- a/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
+++ b/core/src/main/scala/kafka/server/HighwaterMarkCheckpoint.scala
@@ -77,8 +77,8 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging {
try {
hwFile.length() match {
case 0 =>
- warn("No previously checkpointed highwatermark value found for topic %s ".format(topic) +
- "partition %d. Returning 0 as the highwatermark".format(partition))
+ warn("No highwatermark file is found. Returning 0 as the highwatermark for topic %s partition %d."
+ .format(topic, partition))
0L
case _ =>
val hwFileReader = new BufferedReader(new FileReader(hwFile))
@@ -90,12 +90,10 @@ class HighwaterMarkCheckpoint(val path: String) extends Logging {
for(i <- 0 until numberOfHighWatermarks) yield {
val nextHwEntry = hwFileReader.readLine()
val partitionHwInfo = nextHwEntry.split(" ")
- val highwaterMark = partitionHwInfo.last.toLong
- val partitionId = partitionHwInfo.takeRight(2).head
- // find the index of partition
- val partitionIndex = nextHwEntry.indexOf(partitionId)
- val topic = nextHwEntry.substring(0, partitionIndex-1)
- (TopicAndPartition(topic, partitionId.toInt) -> highwaterMark)
+ val topic = partitionHwInfo(0)
+ val partitionId = partitionHwInfo(1).toInt
+ val highWatermark = partitionHwInfo(2).toLong
+ (TopicAndPartition(topic, partitionId) -> highWatermark)
}
hwFileReader.close()
val hwOpt = partitionHighWatermarks.toMap.get(TopicAndPartition(topic, partition))