You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:31 UTC
[21/49] incubator-gearpump git commit: GEARPUMP-17,
fix KafkaStorage lookup timestamp
GEARPUMP-17, fix KafkaStorage lookup timestamp
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/678a5096
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/678a5096
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/678a5096
Branch: refs/heads/master
Commit: 678a5096d02cdd770326ab246e23116c5701e852
Parents: 4bde18c
Author: manuzhang <ow...@gmail.com>
Authored: Wed Apr 6 16:57:46 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:26:52 2016 +0800
----------------------------------------------------------------------
.../scala/io/gearpump/streaming/kafka/KafkaStorage.scala | 10 ++++++++--
1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/678a5096/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
index c768253..eacd267 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
@@ -94,7 +94,13 @@ class KafkaStorage private[kafka](
}
}
-
+ /**
+ * offsets with timestamp < `time` have already been processed by the system
+ * so we look up the storage for the first offset with timestamp >= `time` on replay
+ *
+ * @param time the timestamp to look up for the earliest unprocessed offset
+ * @return the earliest unprocessed offset if `time` is in the range, otherwise failure
+ */
override def lookUp(time: TimeStamp): Try[Array[Byte]] = {
if (dataByTime.isEmpty) {
Failure(StorageEmpty)
@@ -106,7 +112,7 @@ class KafkaStorage private[kafka](
} else if (time > max._1) {
Failure(Overflow(max._2))
} else {
- Success(dataByTime.reverse.find(_._1 <= time).get._2)
+ Success(dataByTime.find(_._1 >= time).get._2)
}
}
}