You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2016/04/26 11:42:31 UTC

[21/49] incubator-gearpump git commit: GEARPUMP-17, fix KafkaStorage lookup timestamp

GEARPUMP-17, fix KafkaStorage lookup timestamp


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/678a5096
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/678a5096
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/678a5096

Branch: refs/heads/master
Commit: 678a5096d02cdd770326ab246e23116c5701e852
Parents: 4bde18c
Author: manuzhang <ow...@gmail.com>
Authored: Wed Apr 6 16:57:46 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Tue Apr 26 14:26:52 2016 +0800

----------------------------------------------------------------------
 .../scala/io/gearpump/streaming/kafka/KafkaStorage.scala  | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/678a5096/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
index c768253..eacd267 100644
--- a/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
+++ b/external/kafka/src/main/scala/io/gearpump/streaming/kafka/KafkaStorage.scala
@@ -94,7 +94,13 @@ class KafkaStorage private[kafka](
     }
   }
 
-
+  /**
+   * offsets with timestamp < `time` have already been processed by the system
+   * so we look up the storage for the first offset with timestamp >= `time` on replay
+   *
+   * @param time the timestamp to look up for the earliest unprocessed offset
+   * @return the earliest unprocessed offset if `time` is in the range, otherwise failure
+   */
   override def lookUp(time: TimeStamp): Try[Array[Byte]] = {
     if (dataByTime.isEmpty) {
       Failure(StorageEmpty)
@@ -106,7 +112,7 @@ class KafkaStorage private[kafka](
       } else if (time > max._1) {
         Failure(Overflow(max._2))
       } else {
-        Success(dataByTime.reverse.find(_._1 <= time).get._2)
+        Success(dataByTime.find(_._1 >= time).get._2)
       }
     }
   }