You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/04 02:12:37 UTC

[1/2] incubator-gearpump git commit: [GEARPUMP-338] Improve time related types and constants

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master b6f5ccd6e -> f96aca995


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 1fb61bd..fb2aaed 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -24,6 +24,8 @@ import java.util.concurrent.TimeUnit
 
 import akka.actor._
 import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.source.Watermark
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
@@ -35,7 +37,6 @@ import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.streaming.task.TaskActor._
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
-import org.apache.gearpump.{Message, TimeStamp}
 
 import scala.collection.JavaConverters._
 import scala.concurrent.duration._
@@ -141,7 +142,7 @@ class TaskActor(
       context.become(waitForStartTask(startClock))
   }
 
-  def waitForStartTask(startClock: TimeStamp): Receive = {
+  def waitForStartTask(startClock: MilliSeconds): Receive = {
     case start@StartTask(tid) =>
       assert(tid == this.taskId, s"$start sent to the wrong task ${this.taskId}")
       onStartTask(startClock)
@@ -227,7 +228,7 @@ class TaskActor(
   /**
    * Returns min clock of upstream task
    */
-  def getUpstreamMinClock: TimeStamp = upstreamWatermark.toEpochMilli
+  def getUpstreamMinClock: MilliSeconds = upstreamWatermark.toEpochMilli
 
   def getProcessingWatermark: Instant = processingWatermark
 
@@ -265,7 +266,7 @@ class TaskActor(
     count
   }
 
-  private def onStartTask(startClock: TimeStamp): Unit = {
+  private def onStartTask(startClock: MilliSeconds): Unit = {
     LOG.info(s"received start, clock: $startClock, sessionId: $sessionId")
     subscriptions = taskContextData.subscribers.map { subscriber =>
       (subscriber.processorId,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
index c2e2faa..4ba9315 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskControlMessage.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.task
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.ProcessorId
 
 /*
@@ -42,25 +42,25 @@ case class Ack(taskId: TaskId, seq: Short, actualReceivedNum: Short, sessionId:
 
 sealed trait ClockEvent
 
-case class UpdateClock(taskId: TaskId, time: TimeStamp) extends ClockEvent
+case class UpdateClock(taskId: TaskId, time: MilliSeconds) extends ClockEvent
 
 object GetLatestMinClock extends ClockEvent
 
 case class GetUpstreamMinClock(taskId: TaskId) extends ClockEvent
 
-case class UpdateCheckpointClock(taskId: TaskId, clock: TimeStamp) extends ClockEvent
+case class UpdateCheckpointClock(taskId: TaskId, clock: MilliSeconds) extends ClockEvent
 
 case object GetCheckpointClock extends ClockEvent
 
-case class CheckpointClock(clock: Option[TimeStamp])
+case class CheckpointClock(clock: Option[MilliSeconds])
 
-case class UpstreamMinClock(latestMinClock: Option[TimeStamp])
+case class UpstreamMinClock(latestMinClock: Option[MilliSeconds])
 
-case class LatestMinClock(clock: TimeStamp)
+case class LatestMinClock(clock: MilliSeconds)
 
 case object GetStartClock
 
-case class StartClock(clock: TimeStamp)
+case class StartClock(clock: MilliSeconds)
 
 case object EndingClock
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index 82cae96..1e4430b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -23,10 +23,11 @@ import java.time.Instant
 import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor._
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
-import org.slf4j.Logger
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.slf4j.Logger
 
 /**
  * This provides TaskContext for user defined tasks
@@ -107,7 +108,7 @@ class TaskWrapper(
     task.map(_.receiveUnManagedMessage).getOrElse(defaultMessageHandler)
   }
 
-  override def upstreamMinClock: TimeStamp = {
+  override def upstreamMinClock: MilliSeconds = {
     actor.getUpstreamMinClock
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
index 1ef255e..8d026db 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/CheckpointStore.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.transaction.api
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * CheckpointStore persistently stores mapping of timestamp to checkpoint
@@ -27,9 +27,9 @@ import org.apache.gearpump.TimeStamp
  */
 trait CheckpointStore {
 
-  def persist(timeStamp: TimeStamp, checkpoint: Array[Byte]): Unit
+  def persist(timeStamp: MilliSeconds, checkpoint: Array[Byte]): Unit
 
-  def recover(timestamp: TimeStamp): Option[Array[Byte]]
+  def recover(timestamp: MilliSeconds): Option[Array[Byte]]
 
   def close(): Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
index 2ddca3a..856b1c5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/TimeStampFilter.scala
@@ -18,12 +18,13 @@
 
 package org.apache.gearpump.streaming.transaction.api
 
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
- * TimeStampFilter filters out messages that are obsolete.
+ * MilliSecondsFilter filters out messages that are obsolete.
  */
 trait TimeStampFilter extends java.io.Serializable {
-  def filter(msg: Message, predicate: TimeStamp): Option[Message]
+  def filter(msg: Message, predicate: MilliSeconds): Option[Message]
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index c223a53..d3bd51b 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -41,7 +41,7 @@ import org.apache.gearpump.streaming.{DAG, LifeTime, ProcessorDescription, Proce
 import org.apache.gearpump.transport.HostPort
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph._
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterEach, FlatSpec, Matchers}
 
@@ -238,7 +238,7 @@ class TaskManagerSpec extends FlatSpec with Matchers with BeforeAndAfterEach {
 
     // Step11: Tell ClockService to update DAG.
     clockService.expectMsgType[ChangeToNewDAG]
-    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, TimeStamp]))
+    clockService.reply(ChangeToNewDAGSuccess(Map.empty[ProcessorId, MilliSeconds]))
 
     // Step12: start all tasks
     import scala.concurrent.duration._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
index 9e42e85..bf50fad 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/CheckpointManagerSpec.scala
@@ -25,7 +25,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.transaction.api.CheckpointStore
 
 class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -34,7 +34,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w
   val checkpointIntervalGen = Gen.chooseNum[Long](100L, 10000L)
   property("CheckpointManager should recover from CheckpointStore") {
     forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)
@@ -47,7 +47,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w
   property("CheckpointManager should write checkpoint to CheckpointStore") {
     val checkpointGen = Gen.alphaStr.map(_.getBytes("UTF-8"))
     forAll(timestampGen, checkpointIntervalGen, checkpointGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long, checkpoint: Array[Byte]) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long, checkpoint: Array[Byte]) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)
@@ -70,7 +70,7 @@ class CheckpointManagerSpec extends PropSpec with PropertyChecks with Matchers w
 
   property("CheckpointManager should update checkpoint time according to max message timestamp") {
     forAll(timestampGen, checkpointIntervalGen) {
-      (timestamp: TimeStamp, checkpointInterval: Long) =>
+      (timestamp: MilliSeconds, checkpointInterval: Long) =>
         val checkpointStore = mock[CheckpointStore]
         val checkpointManager =
           new CheckpointManager(checkpointInterval, checkpointStore)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
index 4cdff95..41b0624 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/NonWindowStateSpec.scala
@@ -26,7 +26,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.state.api.{Monoid, Serializer}
 
 class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
@@ -35,7 +35,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with
 
   property("NonWindowState should recover checkpointed state at given timestamp") {
     forAll(longGen) {
-      (timestamp: TimeStamp) =>
+      (timestamp: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
         val bytes = Array.empty[Byte]
@@ -61,7 +61,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with
 
   property("NonWindowState checkpoints state") {
     forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
+      (checkpointTime: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
 
@@ -95,7 +95,7 @@ class NonWindowStateSpec extends PropSpec with PropertyChecks with Matchers with
 
   property("NonWindowState updates state") {
     forAll(longGen) {
-      (checkpointTime: TimeStamp) =>
+      (checkpointTime: MilliSeconds) =>
         val monoid = mock[Monoid[AnyRef]]
         val serializer = mock[Serializer[AnyRef]]
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
index d9282ae..9975f49 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowSpec.scala
@@ -23,7 +23,7 @@ import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class WindowSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
@@ -32,7 +32,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito
   val timestampGen = Gen.chooseNum[Long](0L, 1000L)
   property("Window should only slide when time passes window end") {
     forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+      (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
         val window = new Window(windowSize, windowStep)
         window.shouldSlide shouldBe false
         window.update(timestamp)
@@ -42,7 +42,7 @@ class WindowSpec extends PropSpec with PropertyChecks with Matchers with Mockito
 
   property("Window should slide by one or to given timestamp") {
     forAll(timestampGen, windowSizeGen, windowStepGen) {
-      (timestamp: TimeStamp, windowSize: Long, windowStep: Long) =>
+      (timestamp: MilliSeconds, windowSize: Long, windowStep: Long) =>
         val window = new Window(windowSize, windowStep)
         window.range shouldBe(0L, windowSize)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
index 299a626..2b784bf 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/state/impl/WindowStateSpec.scala
@@ -18,16 +18,15 @@
 
 package org.apache.gearpump.streaming.state.impl
 
+import org.apache.gearpump.Time.MilliSeconds
+
 import scala.collection.immutable.TreeMap
 import scala.util.Success
-
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-
-import org.apache.gearpump._
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.state.api.{Group, Serializer}
 
@@ -74,7 +73,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
   }
 
   property("WindowState checkpoints") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
+    forAll(longGen) { (checkpointTime: MilliSeconds) =>
       val window = mock[Window]
       val taskContext = MockUtil.mockTaskContext
       val group = mock[Group[AnyRef]]
@@ -120,7 +119,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
   }
 
   property("WindowState updates state") {
-    forAll(longGen) { (checkpointTime: TimeStamp) =>
+    forAll(longGen) { (checkpointTime: MilliSeconds) =>
       val window = mock[Window]
       val taskContext = MockUtil.mockTaskContext
       val group = mock[Group[AnyRef]]
@@ -205,7 +204,7 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
 
   property("WindowState gets interval for timestamp") {
     forAll(longGen, longGen, longGen, longGen) {
-      (timestamp: TimeStamp, checkpointTime: TimeStamp, windowSize: Long, windowStep: Long) =>
+      (timestamp: MilliSeconds, checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long) =>
         val windowManager = new Window(windowSize, windowStep)
         val taskContext = MockUtil.mockTaskContext
         val group = mock[Group[AnyRef]]
@@ -225,8 +224,8 @@ class WindowStateSpec extends PropSpec with PropertyChecks with Matchers with Mo
         interval.endTime shouldBe nextInterval.startTime
     }
 
-    def intervalSpec(interval: Interval, timestamp: TimeStamp,
-        checkpointTime: TimeStamp, windowSize: Long, windowStep: Long): Unit = {
+    def intervalSpec(interval: Interval, timestamp: MilliSeconds,
+        checkpointTime: MilliSeconds, windowSize: Long, windowStep: Long): Unit = {
       interval.startTime should be <= interval.endTime
       timestamp / windowStep * windowStep should (be <= interval.startTime)
       (timestamp - windowSize) / windowStep * windowStep should (be <= interval.startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index 285bf44..b05befa 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,7 +24,7 @@ import java.util.Random
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.source.Watermark
@@ -73,14 +73,14 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
     subscription.sendMessage(msg1)
 
     verify(sender, times(1)).transport(msg1, TaskId(1, 1))
-    assert(subscription.watermark == MIN_TIME_MILLIS)
+    assert(subscription.watermark == Time.MIN_TIME_MILLIS)
 
     val msg2 = Message("0", timestamp = Instant.ofEpochMilli(50))
     when(sender.getProcessingWatermark).thenReturn(msg2.timestamp)
     subscription.sendMessage(msg2)
 
     verify(sender, times(1)).transport(msg2, TaskId(1, 0))
-    assert(subscription.watermark == MIN_TIME_MILLIS)
+    assert(subscription.watermark == Time.MIN_TIME_MILLIS)
 
     val initialMinClock = subscription.watermark
 


[2/2] incubator-gearpump git commit: [GEARPUMP-338] Improve time related types and constants

Posted by ma...@apache.org.
[GEARPUMP-338] Improve time related types and constants

Author: manuzhang <ow...@gmail.com>

Closes #208 from manuzhang/fix_time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/f96aca99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/f96aca99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/f96aca99

Branch: refs/heads/master
Commit: f96aca995d6acd0770529a39fb25a4c4ef18c31e
Parents: b6f5ccd
Author: manuzhang <ow...@gmail.com>
Authored: Fri Aug 4 10:11:58 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Fri Aug 4 10:12:12 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/gearpump/Message.scala     |  6 ++-
 .../main/scala/org/apache/gearpump/Time.scala   | 34 +++++++++++++
 .../gearpump/cluster/ClusterMessage.scala       | 20 ++++----
 .../appmaster/ApplicationRuntimeInfo.scala      | 12 ++---
 .../gearpump/cluster/master/AppManager.scala    |  4 +-
 .../gearpump/cluster/scheduler/Scheduler.scala  |  4 +-
 .../scala/org/apache/gearpump/package.scala     | 29 -----------
 .../gearpump/util/HistoryMetricsService.scala   |  4 +-
 .../gearpump/util/RestartPolicySpec.scala       |  2 -
 .../producer/StormSpoutOutputCollector.scala    |  8 +--
 .../storm/topology/GearpumpStormComponent.scala |  5 +-
 .../storm/topology/GearpumpTuple.scala          |  6 +--
 .../storm/util/StormOutputCollector.scala       |  9 ++--
 .../storm/topology/GearpumpTupleSpec.scala      |  4 +-
 external/hadoopfs/README.md                     |  2 +-
 .../hadoop/HadoopCheckpointStore.scala          |  6 +--
 .../lib/HadoopCheckpointStoreReader.scala       |  8 +--
 .../lib/HadoopCheckpointStoreWriter.scala       |  4 +-
 .../lib/rotation/FileSizeRotationSpec.scala     |  4 +-
 .../kafka/lib/source/AbstractKafkaSource.scala  |  5 +-
 .../streaming/kafka/lib/store/KafkaStore.scala  | 10 ++--
 .../streaming/kafka/KafkaStoreSpec.scala        | 16 +++---
 .../gearpump/streaming/ClusterMessage.scala     |  4 +-
 .../gearpump/streaming/StreamApplication.scala  | 12 ++---
 .../streaming/appmaster/AppMaster.scala         |  6 +--
 .../streaming/appmaster/ClockService.scala      | 53 ++++++++++----------
 .../streaming/appmaster/JarScheduler.scala      |  6 +--
 .../appmaster/StreamAppMasterSummary.scala      |  8 +--
 .../streaming/appmaster/TaskManager.scala       |  6 +--
 .../dsl/window/api/WindowFunction.scala         |  9 ++--
 .../streaming/dsl/window/impl/Window.scala      |  4 +-
 .../streaming/metrics/ProcessorAggregator.scala | 12 ++---
 .../gearpump/streaming/source/Watermark.scala   | 11 ++--
 .../streaming/state/api/MonoidState.scala       |  6 +--
 .../streaming/state/api/PersistentState.scala   |  8 +--
 .../streaming/state/api/PersistentTask.scala    |  5 +-
 .../state/impl/CheckpointManager.scala          | 14 +++---
 .../state/impl/InMemoryCheckpointStore.scala    | 10 ++--
 .../streaming/state/impl/NonWindowState.scala   |  6 +--
 .../gearpump/streaming/state/impl/Window.scala  | 10 ++--
 .../streaming/state/impl/WindowState.scala      | 16 +++---
 .../streaming/task/SerializedMessage.scala      |  4 +-
 .../gearpump/streaming/task/Subscription.scala  | 13 ++---
 .../apache/gearpump/streaming/task/Task.scala   |  5 +-
 .../gearpump/streaming/task/TaskActor.scala     |  9 ++--
 .../streaming/task/TaskControlMessage.scala     | 14 +++---
 .../gearpump/streaming/task/TaskWrapper.scala   |  7 +--
 .../transaction/api/CheckpointStore.scala       |  6 +--
 .../transaction/api/TimeStampFilter.scala       |  7 +--
 .../streaming/appmaster/TaskManagerSpec.scala   |  4 +-
 .../state/impl/CheckpointManagerSpec.scala      |  8 +--
 .../state/impl/NonWindowStateSpec.scala         |  8 +--
 .../streaming/state/impl/WindowSpec.scala       |  6 +--
 .../streaming/state/impl/WindowStateSpec.scala  | 15 +++---
 .../streaming/task/SubscriptionSpec.scala       |  6 +--
 55 files changed, 270 insertions(+), 250 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/Message.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Message.scala b/core/src/main/scala/org/apache/gearpump/Message.scala
index 4dc5c09..7051396 100644
--- a/core/src/main/scala/org/apache/gearpump/Message.scala
+++ b/core/src/main/scala/org/apache/gearpump/Message.scala
@@ -20,6 +20,8 @@ package org.apache.gearpump
 
 import java.time.Instant
 
+import org.apache.gearpump.Time.MilliSeconds
+
 trait Message {
 
   val value: Any
@@ -35,7 +37,7 @@ trait Message {
  *
  * @param value Accept any type except Null, Nothing and Unit
  */
-case class DefaultMessage(value: Any, timeInMillis: TimeStamp) extends Message {
+case class DefaultMessage(value: Any, timeInMillis: MilliSeconds) extends Message {
 
   /**
    * @param value Accept any type except Null, Nothing and Unit
@@ -74,7 +76,7 @@ object Message {
    * @param value Accept any type except Null, Nothing and Unit
    * @param timestamp timestamp must be smaller than Long.MaxValue
    */
-  def apply(value: Any, timestamp: TimeStamp): Message = {
+  def apply(value: Any, timestamp: MilliSeconds): Message = {
     DefaultMessage(value, timestamp)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/Time.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/Time.scala b/core/src/main/scala/org/apache/gearpump/Time.scala
new file mode 100644
index 0000000..054becf
--- /dev/null
+++ b/core/src/main/scala/org/apache/gearpump/Time.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump
+
+/**
+ * Types and constants of time in gearpump
+ */
+object Time {
+  type MilliSeconds = Long
+
+  // maximum valid time that won't overflow when being converted to milli-seconds
+  // Long.MaxValue is reserved for unreachable time
+  val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
+
+  // minimum valid time won't overflow when being converted to milli-seconds
+  val MIN_TIME_MILLIS: Long = Long.MinValue
+
+  val UNREACHABLE: Long = Long.MaxValue
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
index e8956ac..8a067b5 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/ClusterMessage.scala
@@ -23,7 +23,7 @@ import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
 import scala.util.Try
 import akka.actor.ActorRef
 import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.appmaster.WorkerInfo
 import org.apache.gearpump.cluster.master.MasterSummary
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
@@ -142,7 +142,7 @@ object MasterToClient {
 
   case class MasterConfig(config: Config)
 
-  case class HistoryMetricsItem(time: TimeStamp, value: MetricType)
+  case class HistoryMetricsItem(time: MilliSeconds, value: MetricType)
 
   /**
    * History metrics returned from master, worker, or app master.
@@ -157,7 +157,7 @@ object MasterToClient {
   case class HistoryMetrics(path: String, metrics: List[HistoryMetricsItem])
 
   /** Return the last error of this streaming application job */
-  case class LastFailure(time: TimeStamp, error: String)
+  case class LastFailure(time: MilliSeconds, error: String)
 
   sealed trait ApplicationResult
 
@@ -208,8 +208,8 @@ object AppMasterToMaster {
     def appName: String
     def actorPath: String
     def status: ApplicationStatus
-    def startTime: TimeStamp
-    def uptime: TimeStamp
+    def startTime: MilliSeconds
+    def uptime: MilliSeconds
     def user: String
   }
 
@@ -220,8 +220,8 @@ object AppMasterToMaster {
       appName: String = null,
       actorPath: String = null,
       status: ApplicationStatus = ApplicationStatus.ACTIVE,
-      startTime: TimeStamp = 0L,
-      uptime: TimeStamp = 0L,
+      startTime: MilliSeconds = 0L,
+      uptime: MilliSeconds = 0L,
       user: String = null)
     extends AppMasterSummary
 
@@ -244,7 +244,7 @@ object AppMasterToMaster {
    * Denotes the application state change of an app.
    */
   case class ApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
-      timeStamp: TimeStamp, error: Throwable)
+      timeStamp: MilliSeconds, error: Throwable)
 }
 
 object MasterToAppMaster {
@@ -263,8 +263,8 @@ object MasterToAppMaster {
 
   sealed trait StreamingType
   case class AppMasterData(status: ApplicationStatus, appId: Int = 0, appName: String = null,
-      appMasterPath: String = null, workerPath: String = null, submissionTime: TimeStamp = 0,
-      startTime: TimeStamp = 0, finishTime: TimeStamp = 0, user: String = null)
+      appMasterPath: String = null, workerPath: String = null, submissionTime: MilliSeconds = 0,
+      startTime: MilliSeconds = 0, finishTime: MilliSeconds = 0, user: String = null)
 
   case class AppMasterDataRequest(appId: Int, detail: Boolean = false)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
index d9b73e2..1054628 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/appmaster/ApplicationRuntimeInfo.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.cluster.appmaster
 
 import akka.actor.ActorRef
 import com.typesafe.config.{Config, ConfigFactory}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus}
 
 /** Run time info of Application */
@@ -31,9 +31,9 @@ case class ApplicationRuntimeInfo(
     appMaster: ActorRef = ActorRef.noSender,
     worker: ActorRef = ActorRef.noSender,
     user: String = "",
-    submissionTime: TimeStamp = 0,
-    startTime: TimeStamp = 0,
-    finishTime: TimeStamp = 0,
+    submissionTime: MilliSeconds = 0,
+    startTime: MilliSeconds = 0,
+    finishTime: MilliSeconds = 0,
     config: Config = ConfigFactory.empty(),
     status: ApplicationStatus = ApplicationStatus.NONEXIST) {
 
@@ -41,11 +41,11 @@ case class ApplicationRuntimeInfo(
     this.copy(appMaster = appMaster, worker = worker)
   }
 
-  def onAppMasterActivated(timeStamp: TimeStamp): ApplicationRuntimeInfo = {
+  def onAppMasterActivated(timeStamp: MilliSeconds): ApplicationRuntimeInfo = {
     this.copy(startTime = timeStamp, status = ApplicationStatus.ACTIVE)
   }
 
-  def onFinalStatus(timeStamp: TimeStamp, finalStatus: ApplicationTerminalStatus):
+  def onFinalStatus(timeStamp: MilliSeconds, finalStatus: ApplicationTerminalStatus):
     ApplicationRuntimeInfo = {
     this.copy(finishTime = timeStamp, status = finalStatus)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
index b00cc17..450d512 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/master/AppManager.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.cluster.master
 import akka.actor._
 import akka.pattern.ask
 import com.typesafe.config.ConfigFactory
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppMasterToMaster.{AppDataSaved, SaveAppDataFailed, _}
 import org.apache.gearpump.cluster.AppMasterToWorker._
 import org.apache.gearpump.cluster.{ApplicationStatus, ApplicationTerminalStatus}
@@ -227,7 +227,7 @@ private[cluster] class AppManager(kvService: ActorRef, launcher: AppMasterLaunch
   }
 
   private def onApplicationStatusChanged(appId: Int, newStatus: ApplicationStatus,
-      timeStamp: TimeStamp, error: Throwable): Unit = {
+      timeStamp: MilliSeconds, error: Throwable): Unit = {
     applicationRegistry.get(appId) match {
       case Some(appRuntimeInfo) =>
         if (appRuntimeInfo.status.canTransitTo(newStatus)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
index ec9f1ba..1329127 100644
--- a/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
+++ b/core/src/main/scala/org/apache/gearpump/cluster/scheduler/Scheduler.scala
@@ -18,7 +18,7 @@
 package org.apache.gearpump.cluster.scheduler
 
 import akka.actor.{Actor, ActorRef}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.MasterToWorker.{UpdateResourceFailed, UpdateResourceSucceed, WorkerRegistered}
 import org.apache.gearpump.cluster.WorkerToMaster.ResourceUpdate
 import org.apache.gearpump.cluster.master.Master.WorkerTerminated
@@ -71,7 +71,7 @@ abstract class Scheduler extends Actor {
 
 object Scheduler {
   case class PendingRequest(
-      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: TimeStamp)
+      appId: Int, appMaster: ActorRef, request: ResourceRequest, timeStamp: MilliSeconds)
 
   case class ApplicationFinished(appId: Int)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/package.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/package.scala b/core/src/main/scala/org/apache/gearpump/package.scala
deleted file mode 100644
index 6e20277..0000000
--- a/core/src/main/scala/org/apache/gearpump/package.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache
-
-package object gearpump {
-  type TimeStamp = Long
-
-  // maximum time won't overflow when converted to milli-seconds
-  val MAX_TIME_MILLIS: Long = Long.MaxValue - 1
-
-  // minimum time won't overflow when converted to milli-seconds
-  val MIN_TIME_MILLIS: Long = Long.MinValue
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
index ee59678..d45d761 100644
--- a/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
+++ b/core/src/main/scala/org/apache/gearpump/util/HistoryMetricsService.scala
@@ -25,7 +25,7 @@ import akka.actor.Actor
 import com.typesafe.config.Config
 import org.slf4j.Logger
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.ClientToMaster.{QueryHistoryMetrics, ReadOption}
 import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, HistoryMetricsItem}
 import org.apache.gearpump.metrics.Metrics._
@@ -217,7 +217,7 @@ object HistoryMetricsService {
       add(inputMetrics, System.currentTimeMillis())
     }
 
-    def add(inputMetrics: MetricType, now: TimeStamp): Unit = {
+    def add(inputMetrics: MetricType, now: MilliSeconds): Unit = {
 
       val metrics = HistoryMetricsItem(now, inputMetrics)
       latest = List(metrics)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
index 5d0c66d..2dcae2f 100644
--- a/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
+++ b/core/src/test/scala/org/apache/gearpump/util/RestartPolicySpec.scala
@@ -20,8 +20,6 @@ package org.apache.gearpump.util
 
 import org.scalatest.{FlatSpec, Matchers}
 
-import scala.concurrent.duration._
-
 class RestartPolicySpec extends FlatSpec with Matchers {
 
   "RestartPolicy" should "forbid too many restarts" in {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
index 5794b1d..9b9bea7 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormSpoutOutputCollector.scala
@@ -21,10 +21,10 @@ package org.apache.gearpump.experiments.storm.producer
 import java.util.{List => JList}
 
 import backtype.storm.spout.{ISpout, ISpoutOutputCollector}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector
 
-case class PendingMessage(id: Object, messageTime: TimeStamp, startTime: TimeStamp)
+case class PendingMessage(id: Object, messageTime: MilliSeconds, startTime: MilliSeconds)
 
 /**
  * this is used by Storm Spout to emit messages
@@ -57,7 +57,7 @@ private[storm] class StormSpoutOutputCollector(
     setPendingOrAck(messageId, curTime, curTime)
   }
 
-  def ackPendingMessage(checkpointClock: TimeStamp): Unit = {
+  def ackPendingMessage(checkpointClock: MilliSeconds): Unit = {
     this.checkpointClock = checkpointClock
     nextPendingMessage.foreach { case PendingMessage(_, messageTime, _) =>
       if (messageTime <= this.checkpointClock) {
@@ -83,7 +83,7 @@ private[storm] class StormSpoutOutputCollector(
     nextPendingMessage = None
   }
 
-  private def setPendingOrAck(messageId: Object, startTime: TimeStamp, messageTime: TimeStamp)
+  private def setPendingOrAck(messageId: Object, startTime: MilliSeconds, messageTime: MilliSeconds)
     : Unit = {
     if (ackEnabled) {
       val newPendingMessage = PendingMessage(messageId, messageTime, startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
index 248ca44..6aa5dc9 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponent.scala
@@ -43,7 +43,8 @@ import org.apache.gearpump.experiments.storm.util.{StormOutputCollector, StormUt
 import org.apache.gearpump.streaming.DAG
 import org.apache.gearpump.streaming.task.{GetDAG, TaskContext, TaskId}
 import org.apache.gearpump.util.{Constants, LogUtil}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.slf4j.Logger
 
 import scala.collection.JavaConverters._
@@ -149,7 +150,7 @@ object GearpumpStormComponent {
       }
     }
 
-    def checkpoint(clock: TimeStamp): Unit = {
+    def checkpoint(clock: MilliSeconds): Unit = {
       collector.ackPendingMessage(clock)
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
index eb61acb..9f2fa1f 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTuple.scala
@@ -23,7 +23,7 @@ import java.util.{List => JList}
 import backtype.storm.task.GeneralTopologyContext
 import backtype.storm.tuple.{Tuple, TupleImpl}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * this carries Storm tuple values in the Gearpump world
@@ -42,7 +42,7 @@ private[storm] class GearpumpTuple(
    * @param topologyContext topology context used for all tasks
    * @return a Tuple
    */
-  def toTuple(topologyContext: GeneralTopologyContext, timestamp: TimeStamp): Tuple = {
+  def toTuple(topologyContext: GeneralTopologyContext, timestamp: MilliSeconds): Tuple = {
     TimedTuple(topologyContext, values, sourceTaskId, sourceStreamId, timestamp)
   }
 
@@ -64,6 +64,6 @@ private[storm] class GearpumpTuple(
 }
 
 case class TimedTuple(topologyContext: GeneralTopologyContext, tuple: JList[AnyRef],
-    sourceTaskId: Integer, sourceStreamId: String, timestamp: TimeStamp)
+    sourceTaskId: Integer, sourceStreamId: String, timestamp: MilliSeconds)
   extends TupleImpl(topologyContext, tuple, sourceTaskId, sourceStreamId, null)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
index fd023a9..a95725e 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollector.scala
@@ -28,7 +28,8 @@ import backtype.storm.task.TopologyContext
 import backtype.storm.tuple.Fields
 import backtype.storm.utils.Utils
 import org.slf4j.Logger
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.experiments.storm.util.StormUtil._
 import org.apache.gearpump.streaming.ProcessorId
@@ -56,7 +57,7 @@ object StormOutputCollector {
         streamGroupers, componentToProcessorId, values)
     }
     new StormOutputCollector(stormTaskId, taskToComponent, targets, getTargetPartitionsFn,
-      taskContext, MIN_TIME_MILLIS)
+      taskContext, Time.MIN_TIME_MILLIS)
   }
 
   /**
@@ -164,7 +165,7 @@ class StormOutputCollector(
     targets: JMap[String, JMap[String, Grouping]],
     getTargetPartitionsFn: (String, JList[AnyRef]) => (Map[String, Array[Int]], JList[Integer]),
     val taskContext: TaskContext,
-    private var timestamp: TimeStamp) {
+    private var timestamp: MilliSeconds) {
   import org.apache.gearpump.experiments.storm.util.StormOutputCollector._
 
   /**
@@ -213,7 +214,7 @@ class StormOutputCollector(
   /**
    * set timestamp from each incoming Message if not attached.
    */
-  def setTimestamp(timestamp: TimeStamp): Unit = {
+  def setTimestamp(timestamp: MilliSeconds): Unit = {
     this.timestamp = timestamp
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
index f12e54f..dacbdfd 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpTupleSpec.scala
@@ -21,7 +21,7 @@ import java.util.{List => JList}
 
 import backtype.storm.task.GeneralTopologyContext
 import backtype.storm.tuple.Fields
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -40,7 +40,7 @@ class GearpumpTupleSpec extends PropSpec with PropertyChecks with Matchers with
     } yield new GearpumpTuple(values, new Integer(sourceTaskId), sourceStreamId, null)
 
     forAll(tupleGen, Gen.alphaStr, Gen.chooseNum[Long](0, Long.MaxValue)) {
-      (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: TimeStamp) =>
+      (gearpumpTuple: GearpumpTuple, componentId: String, timestamp: MilliSeconds) =>
         val topologyContext = mock[GeneralTopologyContext]
         val fields = new Fields(gearpumpTuple.values.asScala.map(_.asInstanceOf[String]): _*)
         when(topologyContext.getComponentId(gearpumpTuple.sourceTaskId)).thenReturn(componentId)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/README.md
----------------------------------------------------------------------
diff --git a/external/hadoopfs/README.md b/external/hadoopfs/README.md
index 7a9aeef..b02c378 100644
--- a/external/hadoopfs/README.md
+++ b/external/hadoopfs/README.md
@@ -7,7 +7,7 @@ Gearpump components for interacting with HDFS file systems.
 1. File Rotation interface
 ```scala
 trait Rotation extends Serializable {
-  def mark(timestamp: TimeStamp, offset: Long): Unit
+  def mark(timestamp: MilliSeconds, offset: Long): Unit
   def shouldRotate: Boolean
   def rotate: Unit
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
index e26a2ee..5f3ca74 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/HadoopCheckpointStore.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.slf4j.Logger
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.hadoop.lib.rotation.Rotation
 import org.apache.gearpump.streaming.hadoop.lib.{HadoopCheckpointStoreReader, HadoopCheckpointStoreWriter}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStore
@@ -72,7 +72,7 @@ class HadoopCheckpointStore(
    *     b. closes current writer and reset
    *     c. rotation rotates
    */
-  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+  override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = {
     curTime = timestamp
     if (curWriter.isEmpty) {
       curStartTime = curTime
@@ -110,7 +110,7 @@ class HadoopCheckpointStore(
    *   5. looks for the checkpoint in the found store
    *   }}}
    */
-  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+  override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
     var checkpoint: Option[Array[Byte]] = None
 
     if (fs.exists(dir)) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
index 082e963..cce4b5d 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreReader.scala
@@ -23,15 +23,15 @@ import java.io.EOFException
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class HadoopCheckpointStoreReader(
     path: Path,
     hadoopConfig: Configuration)
-  extends Iterator[(TimeStamp, Array[Byte])] {
+  extends Iterator[(MilliSeconds, Array[Byte])] {
 
   private val stream = HadoopUtil.getInputStream(path, hadoopConfig)
-  private var nextTimeStamp: Option[TimeStamp] = None
+  private var nextTimeStamp: Option[MilliSeconds] = None
   private var nextData: Option[Array[Byte]] = None
 
   override def hasNext: Boolean = {
@@ -56,7 +56,7 @@ class HadoopCheckpointStoreReader(
     }
   }
 
-  override def next(): (TimeStamp, Array[Byte]) = {
+  override def next(): (MilliSeconds, Array[Byte]) = {
     val timeAndData = for {
       time <- nextTimeStamp
       data <- nextData

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
index 11c12c4..ce7154a 100644
--- a/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
+++ b/external/hadoopfs/src/main/scala/org/apache/gearpump/streaming/hadoop/lib/HadoopCheckpointStoreWriter.scala
@@ -21,12 +21,12 @@ package org.apache.gearpump.streaming.hadoop.lib
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class HadoopCheckpointStoreWriter(path: Path, hadoopConfig: Configuration) {
   private lazy val stream = HadoopUtil.getOutputStream(path, hadoopConfig)
 
-  def write(timestamp: TimeStamp, data: Array[Byte]): Long = {
+  def write(timestamp: MilliSeconds, data: Array[Byte]): Long = {
     stream.writeLong(timestamp)
     stream.writeInt(data.length)
     stream.write(data)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
----------------------------------------------------------------------
diff --git a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
index a469956..8d0170e 100644
--- a/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
+++ b/external/hadoopfs/src/test/scala/org/apache/gearpump/streaming/hadoop/lib/rotation/FileSizeRotationSpec.scala
@@ -23,7 +23,7 @@ import java.time.Instant
 import org.scalacheck.Gen
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
 
@@ -31,7 +31,7 @@ class FileSizeRotationSpec extends PropSpec with PropertyChecks with Matchers {
   val fileSizeGen = Gen.chooseNum[Long](1, Long.MaxValue)
 
   property("FileSize rotation rotates on file size") {
-    forAll(timestampGen, fileSizeGen) { (timestamp: TimeStamp, fileSize: Long) =>
+    forAll(timestampGen, fileSizeGen) { (timestamp: MilliSeconds, fileSize: Long) =>
       val rotation = new FileSizeRotation(fileSize)
       rotation.shouldRotate shouldBe false
       rotation.mark(Instant.ofEpochMilli(timestamp), rotation.maxBytes / 2)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index d5a8729..6633bf4 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -36,7 +36,8 @@ import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.streaming.transaction.api._
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.slf4j.Logger
 
 object AbstractKafkaSource {
@@ -147,7 +148,7 @@ abstract class AbstractKafkaSource(
     }
   }
 
-  private def maybeRecover(startTime: TimeStamp): Unit = {
+  private def maybeRecover(startTime: MilliSeconds): Unit = {
     checkpointStores.foreach { case (tp, store) =>
       for {
         bytes <- store.recover(startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
index e2450f4..dbbd0ea 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/store/KafkaStore.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import com.twitter.bijection.Injection
 import kafka.api.OffsetRequest
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.KafkaConsumer
 import org.apache.gearpump.streaming.kafka.util.KafkaConfig
 import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
@@ -82,9 +82,9 @@ class KafkaStore private[kafka](
   extends CheckpointStore {
   import org.apache.gearpump.streaming.kafka.lib.store.KafkaStore._
 
-  private var maxTime: TimeStamp = 0L
+  private var maxTime: MilliSeconds = 0L
 
-  override def persist(time: TimeStamp, checkpoint: Array[Byte]): Unit = {
+  override def persist(time: MilliSeconds, checkpoint: Array[Byte]): Unit = {
     // make sure checkpointed timestamp is monotonically increasing
     // hence (1, 1), (3, 2), (2, 3) is checkpointed as (1, 1), (3, 2), (3, 3)
     if (time > maxTime) {
@@ -98,14 +98,14 @@ class KafkaStore private[kafka](
     LOG.debug("KafkaStore persisted state ({}, {})", key, value)
   }
 
-  override def recover(time: TimeStamp): Option[Array[Byte]] = {
+  override def recover(time: MilliSeconds): Option[Array[Byte]] = {
     var checkpoint: Option[Array[Byte]] = None
     optConsumer.foreach { consumer =>
       while (consumer.hasNext && checkpoint.isEmpty) {
         val kafkaMsg = consumer.next()
         checkpoint = for {
           k <- kafkaMsg.key
-          t <- Injection.invert[TimeStamp, Array[Byte]](k).toOption
+          t <- Injection.invert[MilliSeconds, Array[Byte]](k).toOption
           c = kafkaMsg.msg if t >= time
         } yield c
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
index 67c64c4..da99d64 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import com.twitter.bijection.Injection
 import kafka.api.OffsetRequest
 import kafka.common.TopicAndPartition
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, KafkaConsumer}
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
@@ -92,7 +92,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
 
   property("KafkaStore should read checkpoint from timestamp on recover") {
     forAll(Gen.alphaStr, timestampGen) {
-      (topic: String, recoverTime: TimeStamp) =>
+      (topic: String, recoverTime: MilliSeconds) =>
         val consumer = mock[KafkaConsumer]
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -104,7 +104,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
     }
 
     forAll(Gen.alphaStr, timestampGen) {
-      (topic: String, recoverTime: TimeStamp) =>
+      (topic: String, recoverTime: MilliSeconds) =>
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, None)
 
@@ -113,12 +113,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
     }
 
     forAll(Gen.alphaStr, timestampGen, timestampGen) {
-      (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) =>
+      (topic: String, recoverTime: MilliSeconds, checkpointTime: MilliSeconds) =>
         val consumer = mock[KafkaConsumer]
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
 
-        val key = Injection[TimeStamp, Array[Byte]](checkpointTime)
+        val key = Injection[MilliSeconds, Array[Byte]](checkpointTime)
         val msg = key
         val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), msg)
 
@@ -139,7 +139,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
 
   property("KafkaStore persist should write checkpoint with monotonically increasing timestamp") {
     forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) {
-      (topic: String, checkpointTime: TimeStamp, data: String) =>
+      (topic: String, checkpointTime: MilliSeconds, data: String) =>
         val consumer = mock[KafkaConsumer]
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -155,12 +155,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
     }
 
     def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: Int,
-        topic: String, partition: Int, time: TimeStamp, data: String): Unit = {
+        topic: String, partition: Int, time: MilliSeconds, data: String): Unit = {
       verify(producer, times(count)).send(
         MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record =>
           record.topic() == topic
           && record.partition() == partition
-          && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time
+          && Injection.invert[MilliSeconds, Array[Byte]](record.key()).get == time
           && Injection.invert[String, Array[Byte]](record.value()).get == data
         ))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
index d85d042..f0e4f84 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming
 
 import akka.actor.ActorRef
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.appmaster.WorkerInfo
 import org.apache.gearpump.cluster.scheduler.Resource
 import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
@@ -61,7 +61,7 @@ object AppMasterToExecutor {
   case class StartAllTasks(dagVersion: Int)
 
   case class StartDynamicDag(dagVersion: Int)
-  case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp)
+  case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: MilliSeconds)
   case class TaskRejected(taskId: TaskId)
 
   case object RestartClockService

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 125612b..f15e1b3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -20,7 +20,8 @@ package org.apache.gearpump.streaming
 
 import scala.reflect.ClassTag
 import akka.actor.ActorSystem
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster._
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
 import org.apache.gearpump.streaming.appmaster.AppMaster
@@ -101,8 +102,8 @@ object Processor {
  * When input message's timestamp is beyond current processor's lifetime,
  * then it will not be processed by this processor.
  */
-case class LifeTime(birth: TimeStamp, death: TimeStamp) {
-  def contains(timestamp: TimeStamp): Boolean = {
+case class LifeTime(birth: MilliSeconds, death: MilliSeconds) {
+  def contains(timestamp: MilliSeconds): Boolean = {
     timestamp >= birth && timestamp < death
   }
 
@@ -112,8 +113,7 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) {
 }
 
 object LifeTime {
-  // MAX_TIME_MILLIS is Long.MaxValue - 1
-  val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1)
+  val Immortal = LifeTime(Time.MIN_TIME_MILLIS, Time.UNREACHABLE)
 }
 
 /**
@@ -158,7 +158,7 @@ object StreamApplication {
     val graph = dag.mapVertex { processor =>
       val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
       updatedProcessor
-    }.mapEdge { (node1, edge, node2) =>
+    }.mapEdge { (_, edge, _) =>
       PartitionerDescription(new PartitionerObject(
         Option(edge).getOrElse(StreamApplication.hashPartitioner)))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index ba4b058..3c5c7da 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster
 import java.lang.management.ManagementFactory
 
 import akka.actor._
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged
 import org.apache.gearpump.cluster.ClientToMaster._
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
@@ -67,7 +67,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
   import akka.pattern.ask
   private implicit val dispatcher = context.dispatcher
 
-  private val startTime: TimeStamp = System.currentTimeMillis()
+  private val startTime: MilliSeconds = System.currentTimeMillis()
 
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
   LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
@@ -322,7 +322,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
       context.stop(self)
   }
 
-  private def getMinClock: Future[TimeStamp] = {
+  private def getMinClock: Future[MilliSeconds] = {
     clockService match {
       case Some(service) =>
         (service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index b514d6f..90141d4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit
 
 import akka.actor.{Actor, ActorRef, Cancellable, Stash}
 import com.google.common.primitives.Longs
-import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
 import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
 import org.apache.gearpump.streaming._
@@ -60,8 +61,8 @@ class ClockService(
     LOG.info("Initializing Clock service, get snapshotted StartClock ....")
     store.get(START_CLOCK).map { clock =>
       // check for null first since
-      // (null).asInstanceOf[TimeStamp] is zero
-      val startClock = if (clock != null) clock.asInstanceOf[TimeStamp] else MIN_TIME_MILLIS
+      // (null).asInstanceOf[MilliSeconds] is zero
+      val startClock = if (clock != null) clock.asInstanceOf[MilliSeconds] else Time.MIN_TIME_MILLIS
 
       minCheckpointClock = Some(startClock)
 
@@ -88,32 +89,32 @@ class ClockService(
   // We use Array instead of List for Performance consideration
   private var processorClocks = Array.empty[ProcessorClock]
 
-  private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _
+  private var checkpointClocks: Map[TaskId, Vector[MilliSeconds]] = _
 
-  private var minCheckpointClock: Option[TimeStamp] = None
+  private var minCheckpointClock: Option[MilliSeconds] = None
 
   private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
     val taskConf = processor.taskConf
     taskConf != null && taskConf.getBoolean("state.checkpoint.enable").contains(true)
   }
 
-  private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
+  private def resetCheckpointClocks(dag: DAG, startClock: MilliSeconds): Unit = {
     this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death)
       .filter { case (_, processor) =>
         checkpointEnabled(processor)
       }.flatMap { case (id, processor) =>
-      (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp])
+      (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[MilliSeconds])
     }
     if (this.checkpointClocks.isEmpty) {
       minCheckpointClock = None
     }
   }
 
-  private def initDag(startClock: TimeStamp): Unit = {
+  private def initDag(startClock: MilliSeconds): Unit = {
     recoverDag(this.dag, startClock)
   }
 
-  private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
+  private def recoverDag(dag: DAG, startClock: MilliSeconds): Unit = {
     this.clocks = dag.processors.filter(startClock < _._2.life.death).
       map { pair =>
         val (processorId, processor) = pair
@@ -130,7 +131,7 @@ class ClockService(
     resetCheckpointClocks(dag, startClock)
   }
 
-  private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = {
+  private def dynamicDAG(dag: DAG, startClock: MilliSeconds): Unit = {
     val newClocks = dag.processors.filter(startClock < _._2.life.death).
       map { pair =>
         val (processorId, processor) = pair
@@ -207,7 +208,7 @@ class ClockService(
     }
   }
 
-  private def getUpStreamMinClock(processorId: ProcessorId): Option[TimeStamp] = {
+  private def getUpStreamMinClock(processorId: ProcessorId): Option[MilliSeconds] = {
     upstreamClocks.get(processorId).map(ProcessorClocks.minClock)
   }
 
@@ -303,7 +304,7 @@ class ClockService(
     }
   }
 
-  private def minClock: TimeStamp = {
+  private def minClock: MilliSeconds = {
     ProcessorClocks.minClock(processorClocks)
   }
 
@@ -313,7 +314,7 @@ class ClockService(
     healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
   }
 
-  private def getStartClock: TimeStamp = {
+  private def getStartClock: MilliSeconds = {
     minCheckpointClock.getOrElse(minClock)
   }
 
@@ -321,7 +322,7 @@ class ClockService(
     store.put(START_CLOCK, getStartClock)
   }
 
-  private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = {
+  private def updateCheckpointClocks(task: TaskId, time: MilliSeconds): Unit = {
     val clocks = checkpointClocks(task) :+ time
     checkpointClocks += task -> clocks
 
@@ -340,17 +341,17 @@ object ClockService {
   case object HealthCheck
 
   class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int,
-      private var _min: TimeStamp = MIN_TIME_MILLIS,
-      private var _taskClocks: Array[TimeStamp] = null) {
+      private var _min: MilliSeconds = Time.MIN_TIME_MILLIS,
+      private var _taskClocks: Array[MilliSeconds] = null) {
 
     def copy(life: LifeTime): ProcessorClock = {
       new ProcessorClock(processorId, life, parallelism, _min, _taskClocks)
     }
 
-    def min: TimeStamp = _min
-    def taskClocks: Array[TimeStamp] = _taskClocks
+    def min: MilliSeconds = _min
+    def taskClocks: Array[MilliSeconds] = _taskClocks
 
-    def init(startClock: TimeStamp): Unit = {
+    def init(startClock: MilliSeconds): Unit = {
       if (taskClocks == null) {
         this._min = startClock
         this._taskClocks = new Array(parallelism)
@@ -358,7 +359,7 @@ object ClockService {
       }
     }
 
-    def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
+    def updateMinClock(taskIndex: Int, clock: MilliSeconds): Unit = {
       taskClocks(taskIndex) = clock
       _min = Longs.min(taskClocks: _*)
     }
@@ -381,8 +382,8 @@ object ClockService {
 
     /** Check for stalling tasks */
     def check(
-        currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
-        dag: DAG, now: TimeStamp): Unit = {
+        currentMinClock: MilliSeconds, processorClocks: Map[ProcessorId, ProcessorClock],
+        dag: DAG, now: MilliSeconds): Unit = {
       var isClockStalling = false
       if (null == minClock || currentMinClock > minClock.appClock) {
         minClock = ClockValue(systemClock = now, appClock = currentMinClock)
@@ -423,7 +424,7 @@ object ClockService {
   }
 
   object HealthChecker {
-    case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) {
+    case class ClockValue(systemClock: MilliSeconds, appClock: MilliSeconds) {
       def prettyPrint: String = {
         "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")"
       }
@@ -433,7 +434,7 @@ object ClockService {
   object ProcessorClocks {
 
     // Get the Min clock of all processors
-    def minClock(clock: Array[ProcessorClock]): TimeStamp = {
+    def minClock(clock: Array[ProcessorClock]): MilliSeconds = {
       var i = 0
       var min = if (clock.length == 0) 0L else clock(0).min
       while (i < clock.length) {
@@ -445,7 +446,7 @@ object ClockService {
   }
 
   case class ChangeToNewDAG(dag: DAG)
-  case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp])
+  case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, MilliSeconds])
 
-  case class StoredStartClock(clock: TimeStamp)
+  case class StoredStartClock(clock: MilliSeconds)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
index e023cdf..e31f863 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
 import akka.actor._
 import akka.pattern.ask
 import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppJar
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
@@ -47,7 +47,7 @@ class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRe
   private implicit val timeout = Constants.FUTURE_TIMEOUT
 
   /** Set the current DAG version active */
-  def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
+  def setDag(dag: DAG, startClock: Future[MilliSeconds]): Unit = {
     actor ! TransitToNewDag
     startClock.map { start =>
       actor ! NewDag(dag, start)
@@ -82,7 +82,7 @@ object JarScheduler {
 
   case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
 
-  case class NewDag(dag: DAG, startTime: TimeStamp)
+  case class NewDag(dag: DAG, startTime: MilliSeconds)
 
   case object TransitToNewDag
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
index 126ab92..1214cd0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.appmaster
 
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary
 import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig}
 import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
@@ -32,10 +32,10 @@ case class StreamAppMasterSummary(
     appId: Int,
     appName: String = null,
     actorPath: String = null,
-    clock: TimeStamp = 0L,
+    clock: MilliSeconds = 0L,
     status: ApplicationStatus = ApplicationStatus.ACTIVE,
-    startTime: TimeStamp = 0L,
-    uptime: TimeStamp = 0L,
+    startTime: MilliSeconds = 0L,
+    uptime: MilliSeconds = 0L,
     user: String = null,
     homeDirectory: String = "",
     logFile: String = "",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 51c4de9..bae5c02 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
 
 import akka.actor._
 import akka.pattern.ask
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
 import org.apache.gearpump.streaming.AppMasterToExecutor._
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
@@ -86,11 +86,11 @@ private[appmaster] class TaskManager(
   dagManager ! WatchChange(watcher = self)
   executorManager ! SetTaskManager(self)
 
-  private def getStartClock: Future[TimeStamp] = {
+  private def getStartClock: Future[MilliSeconds] = {
     (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock)
   }
 
-  private var startClock: Future[TimeStamp] = getStartClock
+  private var startClock: Future[MilliSeconds] = getStartClock
 
   def receive: Receive = applicationReady(DagReadyState.empty)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index a2f51c7..85ca969 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -19,7 +19,8 @@ package org.apache.gearpump.streaming.dsl.window.api
 
 import java.time.{Duration, Instant}
 
-import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.dsl.window.impl.Window
 
 import scala.collection.mutable.ArrayBuffer
@@ -46,8 +47,8 @@ abstract class NonMergingWindowFunction extends WindowFunction {
 
 object GlobalWindowFunction {
 
-  val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
-    Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+  val globalWindow = Array(Window(Instant.ofEpochMilli(Time.MIN_TIME_MILLIS),
+    Instant.ofEpochMilli(Time.MAX_TIME_MILLIS)))
 }
 
 case class GlobalWindowFunction() extends NonMergingWindowFunction {
@@ -80,7 +81,7 @@ case class SlidingWindowFunction(size: Duration, step: Duration)
     windows.toArray
   }
 
-  private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+  private def lastStartFor(timestamp: MilliSeconds, windowStep: Long): MilliSeconds = {
     timestamp - (timestamp + windowStep) % windowStep
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 2425ff2..7536473 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -19,10 +19,10 @@ package org.apache.gearpump.streaming.dsl.window.impl
 
 import java.time.Instant
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 object Window {
-  def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = {
+  def ofEpochMilli(startTime: MilliSeconds, endTime: MilliSeconds): Window = {
     Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
index 8f8b7ab..058d36b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -22,7 +22,7 @@ import java.util
 
 import com.google.common.collect.Iterators
 import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.ClientToMaster.ReadOption
 import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
 import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
@@ -64,7 +64,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
   }
 
   def aggregate(
-      readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp)
+      readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: MilliSeconds)
     : List[HistoryMetricsItem] = {
     val (start, end, interval) = getTimeRange(readOption, now)
     val timeSlotsCount = ((end - start - 1) / interval + 1).toInt
@@ -103,8 +103,8 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
   }
 
   // Returns (start, end, interval)
-  private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp)
-    : (TimeStamp, TimeStamp, TimeStamp) = {
+  private def getTimeRange(readOption: ReadOption.ReadOption, now: MilliSeconds)
+    : (MilliSeconds, MilliSeconds, MilliSeconds) = {
     readOption match {
       case ReadOption.ReadRecent =>
         val end = now
@@ -229,7 +229,7 @@ object ProcessorAggregator {
     var p99: Double = 0
     var p999: Double = 0
 
-    var startTime: TimeStamp = Long.MaxValue
+    var startTime: MilliSeconds = Long.MaxValue
 
     override def aggregate(item: HistoryMetricsItem): Unit = {
       val input = item.value.asInstanceOf[Histogram]
@@ -263,7 +263,7 @@ object ProcessorAggregator {
     var m1: Double = 0
     var rateUnit: String = null
 
-    var startTime: TimeStamp = Long.MaxValue
+    var startTime: MilliSeconds = Long.MaxValue
 
     override def aggregate(item: HistoryMetricsItem): Unit = {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 14abff8..607af85 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.source
 
 import java.time.Instant
 
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
 
 /**
  * message used by source task to report source watermark.
@@ -28,9 +28,14 @@ case class Watermark(instant: Instant) {
   def toMessage: Message = Message("watermark", instant)
 }
 
+/**
+ * All input data with event times less than watermark have been observed
+ */
 object Watermark {
 
-  val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1)
+  // all input data have been observed
+  val MAX: Instant = Instant.ofEpochMilli(Time.MAX_TIME_MILLIS + 1)
 
-  val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
+  // no input data have been observed
+  val MIN: Instant = Instant.ofEpochMilli(Time.MIN_TIME_MILLIS)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
index 0e2f83a..0118c07 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.state.api
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * MonoidState uses Algebird Monoid to aggregate state
@@ -37,11 +37,11 @@ abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
 
   override def get: Option[T] = Option(monoid.plus(left, right))
 
-  override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = {
+  override def setNextCheckpointTime(nextCheckpointTime: MilliSeconds): Unit = {
     checkpointTime = nextCheckpointTime
   }
 
-  protected def updateState(timestamp: TimeStamp, t: T): Unit = {
+  protected def updateState(timestamp: MilliSeconds, t: T): Unit = {
     if (timestamp < checkpointTime) {
       left = monoid.plus(left, t)
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
index 906d331..39b17c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.state.api
 
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * PersistentState is part of the transaction API
@@ -33,19 +33,19 @@ trait PersistentState[T] {
    * Recovers state to a previous checkpoint
    * usually invoked by the framework
    */
-  def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
+  def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit
 
   /**
    * Updates state on a new message
    * this is invoked by user
    */
-  def update(timestamp: TimeStamp, t: T): Unit
+  def update(timestamp: MilliSeconds, t: T): Unit
 
   /**
    * Sets next checkpoint time
    * should be invoked by the framework
    */
-  def setNextCheckpointTime(timeStamp: TimeStamp): Unit
+  def setNextCheckpointTime(timeStamp: MilliSeconds): Unit
 
   /**
    * Gets a binary snapshot of state

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index df37ba1..3a3b0a7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -20,12 +20,13 @@ package org.apache.gearpump.streaming.state.api
 
 import java.time.Instant
 
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
 
 object PersistentTask {
   val LOG = LogUtil.getLogger(getClass)
@@ -97,7 +98,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
     checkpointManager.close()
   }
 
-  private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
+  private def reportCheckpointClock(timestamp: MilliSeconds): Unit = {
     appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
index 82b7952..7d9e92a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.state.impl
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.transaction.api.CheckpointStore
 
 /** Manage physical checkpoints to persitent storage like HDFS */
@@ -28,11 +28,11 @@ class CheckpointManager(checkpointInterval: Long,
   private var maxMessageTime: Long = 0L
   private var checkpointTime: Option[Long] = None
 
-  def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+  def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
     checkpointStore.recover(timestamp)
   }
 
-  def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = {
+  def checkpoint(timestamp: MilliSeconds, checkpoint: Array[Byte]): Option[MilliSeconds] = {
     checkpointStore.persist(timestamp, checkpoint)
     checkpointTime = checkpointTime.collect { case time if maxMessageTime > time =>
       time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
@@ -41,7 +41,7 @@ class CheckpointManager(checkpointInterval: Long,
     checkpointTime
   }
 
-  def update(messageTime: TimeStamp): Option[TimeStamp] = {
+  def update(messageTime: MilliSeconds): Option[MilliSeconds] = {
     maxMessageTime = Math.max(maxMessageTime, messageTime)
     if (checkpointTime.isEmpty) {
       checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval)
@@ -50,15 +50,15 @@ class CheckpointManager(checkpointInterval: Long,
     checkpointTime
   }
 
-  def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = {
+  def shouldCheckpoint(upstreamMinClock: MilliSeconds): Boolean = {
     checkpointTime.exists(time => upstreamMinClock >= time)
   }
 
-  def getCheckpointTime: Option[TimeStamp] = checkpointTime
+  def getCheckpointTime: Option[MilliSeconds] = checkpointTime
 
   def close(): Unit = {
     checkpointStore.close()
   }
 
-  private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime
+  private[impl] def getMaxMessageTime: MilliSeconds = maxMessageTime
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
index 8853d07..cecf127 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/InMemoryCheckpointStore.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.state.impl
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
 
 /**
@@ -26,18 +26,18 @@ import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, Checkpoin
  * should not be used in real cases
  */
 class InMemoryCheckpointStore extends CheckpointStore {
-  private var checkpoints = Map.empty[TimeStamp, Array[Byte]]
+  private var checkpoints = Map.empty[MilliSeconds, Array[Byte]]
 
-  override def persist(timestamp: TimeStamp, checkpoint: Array[Byte]): Unit = {
+  override def persist(timestamp: MilliSeconds, checkpoint: Array[Byte]): Unit = {
     checkpoints += timestamp -> checkpoint
   }
 
-  override def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+  override def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
     checkpoints.get(timestamp)
   }
 
   override def close(): Unit = {
-    checkpoints = Map.empty[TimeStamp, Array[Byte]]
+    checkpoints = Map.empty[MilliSeconds, Array[Byte]]
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
index b161713..1393f4a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/NonWindowState.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.state.impl
 
 import org.slf4j.Logger
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.state.api.{Monoid, MonoidState, Serializer}
 import org.apache.gearpump.streaming.state.impl.NonWindowState._
 import org.apache.gearpump.util.LogUtil
@@ -35,11 +35,11 @@ object NonWindowState {
 class NonWindowState[T](monoid: Monoid[T], serializer: Serializer[T])
   extends MonoidState[T](monoid) {
 
-  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+  override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = {
     serializer.deserialize(bytes).foreach(left = _)
   }
 
-  override def update(timestamp: TimeStamp, t: T): Unit = {
+  override def update(timestamp: MilliSeconds, t: T): Unit = {
     updateState(timestamp, t)
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
index c1f647e..0318d3d 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/Window.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.streaming.state.impl
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * Used in window applications
@@ -29,10 +29,10 @@ class Window(val windowSize: Long, val windowStep: Long) {
     this(windowConfig.windowSize, windowConfig.windowStep)
   }
 
-  private var clock: TimeStamp = 0L
+  private var clock: MilliSeconds = 0L
   private var startTime = 0L
 
-  def update(clock: TimeStamp): Unit = {
+  def update(clock: MilliSeconds): Unit = {
     this.clock = clock
   }
 
@@ -40,7 +40,7 @@ class Window(val windowSize: Long, val windowStep: Long) {
     startTime += windowStep
   }
 
-  def slideTo(timestamp: TimeStamp): Unit = {
+  def slideTo(timestamp: MilliSeconds): Unit = {
     startTime = timestamp / windowStep * windowStep
   }
 
@@ -48,7 +48,7 @@ class Window(val windowSize: Long, val windowStep: Long) {
     clock >= (startTime + windowSize)
   }
 
-  def range: (TimeStamp, TimeStamp) = {
+  def range: (MilliSeconds, MilliSeconds) = {
     startTime -> (startTime + windowSize)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
index 348f09e..a73b6db 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/WindowState.scala
@@ -22,7 +22,7 @@ import scala.collection.immutable.TreeMap
 
 import org.slf4j.Logger
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.state.api.{Group, MonoidState, Serializer}
 import org.apache.gearpump.streaming.state.impl.WindowState._
 import org.apache.gearpump.streaming.task.TaskContext
@@ -31,7 +31,7 @@ import org.apache.gearpump.util.LogUtil
 /**
  * an interval is a dynamic time range that is divided by window boundary and checkpoint time
  */
-case class Interval(startTime: TimeStamp, endTime: TimeStamp) extends Ordered[Interval] {
+case class Interval(startTime: MilliSeconds, endTime: MilliSeconds) extends Ordered[Interval] {
   override def compare(that: Interval): Int = {
     if (startTime < that.startTime) -1
     else if (startTime > that.startTime) 1
@@ -63,7 +63,7 @@ class WindowState[T](group: Group[T],
 
   private var lastCheckpointTime = 0L
 
-  override def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit = {
+  override def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit = {
     window.slideTo(timestamp)
     serializer.deserialize(bytes)
       .foreach { states =>
@@ -74,7 +74,7 @@ class WindowState[T](group: Group[T],
       }
   }
 
-  override def update(timestamp: TimeStamp, t: T): Unit = {
+  override def update(timestamp: MilliSeconds, t: T): Unit = {
     val (startTime, endTime) = window.range
     if (timestamp >= startTime && timestamp < endTime) {
       updateState(timestamp, t)
@@ -127,7 +127,7 @@ class WindowState[T](group: Group[T],
    * upperBound2 = step * Nmin2 + size > t
    * }}}
    */
-  private[impl] def getInterval(timestamp: TimeStamp, checkpointTime: TimeStamp): Interval = {
+  private[impl] def getInterval(timestamp: MilliSeconds, checkpointTime: MilliSeconds): Interval = {
     val windowSize = window.windowSize
     val windowStep = window.windowStep
     val lowerBound1 = timestamp / windowStep * windowStep
@@ -147,8 +147,8 @@ class WindowState[T](group: Group[T],
     }
   }
 
-  private[impl] def updateIntervalStates(timestamp: TimeStamp, t: T, checkpointTime: TimeStamp)
-  : Unit = {
+  private[impl] def updateIntervalStates(timestamp: MilliSeconds, t: T,
+      checkpointTime: MilliSeconds): Unit = {
     val interval = getInterval(timestamp, checkpointTime)
     intervalStates.get(interval) match {
       case Some(st) =>
@@ -158,7 +158,7 @@ class WindowState[T](group: Group[T],
     }
   }
 
-  private[impl] def getIntervalStates(startTime: TimeStamp, endTime: TimeStamp)
+  private[impl] def getIntervalStates(startTime: MilliSeconds, endTime: MilliSeconds)
   : TreeMap[Interval, T] = {
     intervalStates.dropWhile(_._1.endTime <= startTime).takeWhile(_._1.endTime <= endTime)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
index 675d5cc..c3e3b14 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializedMessage.scala
@@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.task
 
 import java.io.{DataInput, DataOutput}
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
-case class SerializedMessage(timeStamp: TimeStamp, bytes: Array[Byte])
+case class SerializedMessage(timeStamp: MilliSeconds, bytes: Array[Byte])
 
 class SerializedMessageSerializer extends TaskMessageSerializer[SerializedMessage] {
   override def getLength(obj: SerializedMessage): Int = 12 + obj.bytes.length

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 8a6f04f..24f1763 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -20,13 +20,14 @@ package org.apache.gearpump.streaming.task
 
 import org.slf4j.Logger
 import com.google.common.primitives.Shorts
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
 import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
 import org.apache.gearpump.streaming.LifeTime
 import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.Subscription._
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
 
 /**
  * Manages the output and message clock for single downstream processor
@@ -59,9 +60,9 @@ class Subscription(
   private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
   private val processingWatermarkSince: Array[Short] = new Array[Short](parallelism)
 
-  private val outputWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+  private val outputWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
     Watermark.MIN.toEpochMilli)
-  private val processingWatermark: Array[TimeStamp] = Array.fill(parallelism)(
+  private val processingWatermark: Array[MilliSeconds] = Array.fill(parallelism)(
     Watermark.MIN.toEpochMilli)
 
   private var maxPendingCount: Short = 0
@@ -135,7 +136,7 @@ class Subscription(
     }
   }
 
-  private var lastFlushTime: Long = MIN_TIME_MILLIS
+  private var lastFlushTime: Long = Time.MIN_TIME_MILLIS
   private val FLUSH_INTERVAL = 5 * 1000 // ms
   private def needFlush: Boolean = {
     System.currentTimeMillis() - lastFlushTime > FLUSH_INTERVAL &&
@@ -181,7 +182,7 @@ class Subscription(
     }
   }
 
-  def watermark: TimeStamp = {
+  def watermark: MilliSeconds = {
     outputWatermark.min
   }
 
@@ -189,7 +190,7 @@ class Subscription(
     maxPendingCount < maxPendingMessageCount
   }
 
-  def onStallingTime(stallingTime: TimeStamp): Unit = {
+  def onStallingTime(stallingTime: MilliSeconds): Unit = {
     outputWatermark.indices.foreach { i =>
       if (outputWatermark(i) == stallingTime &&
         pendingMessageCount(i) > 0 &&

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/f96aca99/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index dc80511..b587cc7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -23,10 +23,11 @@ import java.time.Instant
 import scala.concurrent.duration.FiniteDuration
 import akka.actor.Actor.Receive
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
 
 /**
  * This provides context information for a task.
@@ -113,7 +114,7 @@ trait TaskContext {
    *
    * @return the min clock
    */
-  def upstreamMinClock: TimeStamp
+  def upstreamMinClock: MilliSeconds
 
   /**
    * Update TaskActor with the processing progress (watermark)