You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/08/22 05:24:06 UTC

[2/2] incubator-gearpump git commit: fix GEARPUMP-32, introduce source watermark

fix GEARPUMP-32, introduce source watermark

This is for early review and contains some example codes which will be removed before merge.

Author: manuzhang <ow...@gmail.com>

Closes #67 from manuzhang/watermark.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/529799cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/529799cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/529799cc

Branch: refs/heads/master
Commit: 529799cc400a72ae9e0d2385044ce1fd5e329bb3
Parents: 23daf0c
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 22 13:23:53 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 22 13:23:53 2016 +0800

----------------------------------------------------------------------
 .../streaming/examples/complexdag/Source.scala  |  8 +-
 .../examples/fsio/SeqFileStreamProducer.scala   |  5 +-
 .../examples/sol/SOLStreamProducer.scala        | 10 +--
 .../examples/state/MessageCountApp.scala        |  6 +-
 .../state/processor/CountProcessor.scala        |  5 +-
 .../processor/NumberGeneratorProcessor.scala    |  5 +-
 .../state/processor/CountProcessorSpec.scala    |  9 +--
 .../NumberGeneratorProcessorSpec.scala          |  5 +-
 .../processor/WindowAverageProcessorSpec.scala  | 11 +--
 .../streaming/examples/wordcountjava/Split.java | 11 +--
 .../streaming/examples/wordcount/Split.scala    |  5 +-
 .../examples/wordcount/SplitSpec.scala          |  4 +-
 .../storm/producer/StormProducer.scala          |  5 +-
 .../storm/producer/StormProducerSpec.scala      |  5 +-
 .../streaming/kafka/util/KafkaConfig.java       | 18 +----
 .../kafka/lib/KafkaMessageDecoder.scala         | 36 +++++++++
 .../kafka/lib/source/AbstractKafkaSource.scala  | 52 ++++++-------
 .../lib/source/DefaultKafkaMessageDecoder.scala | 34 ++++++++
 .../lib/source/DefaultMessageDecoder.scala      | 38 ---------
 .../streaming/kafka/lib/util/KafkaClient.scala  |  2 -
 .../streaming/kafka/KafkaSourceSpec.scala       | 17 ++--
 .../source/DefaultKafkaMessageDecoderSpec.scala | 43 ++++++++++
 .../lib/source/DefaultMessageDecoderSpec.scala  | 52 -------------
 .../kafka/lib/util/KafkaClientSpec.scala        |  4 +-
 .../checklist/DynamicDagSpec.scala              |  4 +-
 .../streaming/appmaster/AppMaster.scala         |  2 +-
 .../streaming/appmaster/ClockService.scala      | 79 +++++++++----------
 .../streaming/appmaster/TaskManager.scala       |  2 -
 .../gearpump/streaming/dsl/StreamApp.scala      |  4 +-
 .../gearpump/streaming/source/DataSource.scala  |  8 ++
 .../streaming/source/DataSourceTask.scala       | 16 +++-
 .../source/DefaultTimeStampFilter.scala         | 31 --------
 .../gearpump/streaming/source/Watermark.scala   | 29 +++++++
 .../streaming/state/api/PersistentTask.scala    | 44 ++++-------
 .../apache/gearpump/streaming/task/Task.scala   | 12 ++-
 .../gearpump/streaming/task/TaskActor.scala     | 82 ++++++++++++--------
 .../gearpump/streaming/task/TaskWrapper.scala   |  6 +-
 .../transaction/api/MessageDecoder.scala        | 34 --------
 .../streaming/appmaster/AppMasterSpec.scala     | 10 +--
 .../streaming/appmaster/ClockServiceSpec.scala  |  5 --
 .../streaming/appmaster/TaskManagerSpec.scala   |  2 -
 .../source/DefaultTimeStampFilterSpec.scala     | 48 ------------
 42 files changed, 368 insertions(+), 440 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
index 7abb3fc..074b389 100644
--- a/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
+++ b/examples/streaming/complexdag/src/main/scala/org/apache/gearpump/streaming/examples/complexdag/Source.scala
@@ -22,19 +22,21 @@ import java.time.Instant
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Source(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
   override def onStart(startTime: Instant): Unit = {
-    self ! Message("start")
+    self ! Watermark(Instant.now)
   }
 
   override def onNext(msg: Message): Unit = {
     val list = Vector(getClass.getCanonicalName)
-    output(new Message(list, System.currentTimeMillis))
-    self ! Message("continue", System.currentTimeMillis())
+    val now = Instant.now
+    output(new Message(list, now.toEpochMilli))
+    self ! Watermark(now)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
index 4106a2c..a3b4d97 100644
--- a/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
+++ b/examples/streaming/fsio/src/main/scala/org/apache/gearpump/streaming/examples/fsio/SeqFileStreamProducer.scala
@@ -19,6 +19,7 @@ package org.apache.gearpump.streaming.examples.fsio
 
 import java.time.Instant
 
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.SequenceFile._
 import org.apache.hadoop.io.{SequenceFile, Text}
@@ -64,6 +65,6 @@ class SeqFileStreamProducer(taskContext: TaskContext, config: UserConfig)
 object SeqFileStreamProducer {
   def INPUT_PATH: String = "inputpath"
 
-  val Start = Message("start")
-  val Continue = Message("continue")
+  val Start = Watermark(Instant.now)
+  val Continue = Watermark(Instant.now)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
index c1b11e5..2b443e5 100644
--- a/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
+++ b/examples/streaming/sol/src/main/scala/org/apache/gearpump/streaming/examples/sol/SOLStreamProducer.scala
@@ -24,6 +24,7 @@ import java.util.Random
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.examples.sol.SOLStreamProducer._
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
@@ -39,7 +40,7 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
 
   override def onStart(startTime: Instant): Unit = {
     prepareRandomMessage
-    self ! Start
+    self ! Watermark(Instant.now)
   }
 
   private def prepareRandomMessage = {
@@ -62,18 +63,13 @@ class SOLStreamProducer(taskContext: TaskContext, conf: UserConfig)
     val message = messages(rand.nextInt(messages.length))
     output(new Message(message, System.currentTimeMillis()))
     messageCount = messageCount + 1L
-    self ! messageSourceMinClock
+    self ! Watermark(Instant.now)
   }
 
-  // messageSourceMinClock represent the min clock of the message source
-  private def messageSourceMinClock: Message = {
-    Message("tick", System.currentTimeMillis())
-  }
 }
 
 object SOLStreamProducer {
   val DEFAULT_MESSAGE_SIZE = 100
   // Bytes
   val BYTES_PER_MESSAGE = "bytesPerMessage"
-  val Start = Message("start")
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
index 5a3954a..9bd2bc5 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/MessageCountApp.scala
@@ -52,14 +52,14 @@ object MessageCountApp extends AkkaApp with ArgumentsParser {
 
   override val options: Array[(String, CLIOption[Any])] = Array(
     SOURCE_TASK -> CLIOption[Int]("<how many kafka source tasks>", required = false,
-    defaultValue = Some(1)),
+      defaultValue = Some(1)),
     COUNT_TASK -> CLIOption("<how many count tasks>", required = false, defaultValue = Some(1)),
     SINK_TASK -> CLIOption[Int]("<how many kafka sink tasks>", required = false,
-    defaultValue = Some(1)),
+      defaultValue = Some(1)),
     SOURCE_TOPIC -> CLIOption[String]("<kafka source topic>", required = true),
     SINK_TOPIC -> CLIOption[String]("<kafka sink topic>", required = true),
     ZOOKEEPER_CONNECT -> CLIOption[String]("<Zookeeper connect string, e.g. localhost:2181/kafka>",
-    required = true),
+      required = true),
     BROKER_LIST -> CLIOption[String]("<Kafka broker list, e.g. localhost:9092>", required = true),
     DEFAULT_FS -> CLIOption[String]("<name of the default file system, e.g. hdfs://localhost:9000>",
       required = true)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
index 9650a0a..2d31eeb 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessor.scala
@@ -29,13 +29,16 @@ import org.apache.gearpump.streaming.task.TaskContext
 class CountProcessor(taskContext: TaskContext, conf: UserConfig)
   extends PersistentTask[Int](taskContext, conf) {
 
+  private val serializer = new ChillSerializer[Int]
+
   override def persistentState: PersistentState[Int] = {
     import com.twitter.algebird.Monoid.intMonoid
-    new NonWindowState[Int](new AlgebirdMonoid(intMonoid), new ChillSerializer[Int])
+    new NonWindowState[Int](new AlgebirdMonoid(intMonoid), serializer)
   }
 
   override def processMessage(state: PersistentState[Int], message: Message): Unit = {
     state.update(message.timestamp, 1)
+    state.get.foreach(s => taskContext.output(Message(serializer.serialize(s), message.timestamp)))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
index 134afba..e6030d6 100644
--- a/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
+++ b/examples/streaming/state/src/main/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessor.scala
@@ -22,6 +22,7 @@ import java.time.Instant
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
@@ -31,7 +32,7 @@ class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
   private var num = 0L
   override def onStart(startTime: Instant): Unit = {
     num = startTime.toEpochMilli
-    self ! Message("start")
+    self ! Watermark(startTime)
   }
 
   override def onNext(msg: Message): Unit = {
@@ -39,6 +40,6 @@ class NumberGeneratorProcessor(taskContext: TaskContext, conf: UserConfig)
     num += 1
 
     import scala.concurrent.duration._
-    taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Message("next"))
+    taskContext.scheduleOnce(Duration(1, MILLISECONDS))(self ! Watermark(Instant.ofEpochMilli(num)))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
index b95d164..5affb5e 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/CountProcessorSpec.scala
@@ -66,16 +66,11 @@ class CountProcessorSpec extends PropSpec with PropertyChecks with Matchers {
 
         for (i <- 0L to num) {
           count.onNext(Message("", i))
-          count.state.get shouldBe Some(i + 1)
+          count.getState.get shouldBe Some(i + 1)
         }
-        // Next checkpoint time is not arrived yet
-        when(taskContext.upstreamMinClock).thenReturn(0L)
-        count.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectNoMsg(10.milliseconds)
 
         // Time to checkpoint
-        when(taskContext.upstreamMinClock).thenReturn(num)
-        count.onNext(PersistentTask.CHECKPOINT)
+        count.onWatermarkProgress(Instant.ofEpochMilli(num))
         // Only the state before checkpoint time is checkpointed
         appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
index d3f645c..b562b6b 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/NumberGeneratorProcessorSpec.scala
@@ -20,6 +20,8 @@ package org.apache.gearpump.streaming.examples.state.processor
 
 import java.time.Instant
 
+import org.apache.gearpump.streaming.source.Watermark
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -49,11 +51,10 @@ class NumberGeneratorProcessorSpec extends WordSpec with Matchers {
       val conf = UserConfig.empty
       val genNum = new NumberGeneratorProcessor(taskContext, conf)
       genNum.onStart(Instant.EPOCH)
-      mockTaskActor.expectMsgType[Message]
+      mockTaskActor.expectMsgType[Watermark]
 
       genNum.onNext(Message("next"))
       verify(taskContext).output(MockitoMatchers.any[Message])
-      // mockTaskActor.expectMsgType[Message]
 
       system.terminate()
       Await.result(system.whenTerminated, Duration.Inf)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
index 255f869..f3706e2 100644
--- a/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
+++ b/examples/streaming/state/src/test/scala/org/apache/gearpump/streaming/examples/state/processor/WindowAverageProcessorSpec.scala
@@ -34,7 +34,6 @@ import org.scalatest.{Matchers, PropSpec}
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.state.api.PersistentTask
 import org.apache.gearpump.streaming.state.impl.{InMemoryCheckpointStoreFactory, PersistentStateConfig, WindowConfig}
 import org.apache.gearpump.streaming.task.UpdateCheckpointClock
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
@@ -68,17 +67,11 @@ class WindowAverageProcessorSpec extends PropSpec with PropertyChecks with Match
 
         for (i <- 0L until num) {
           windowAverage.onNext(Message("" + data, i))
-          windowAverage.state.get shouldBe Some(AveragedValue(i + 1, data))
+          windowAverage.getState.get shouldBe Some(AveragedValue(i + 1, data))
         }
 
-        // Next checkpoint time is not arrived yet
-        when(taskContext.upstreamMinClock).thenReturn(0L)
-        windowAverage.onNext(PersistentTask.CHECKPOINT)
-        appMaster.expectNoMsg(10.milliseconds)
-
         // Time to checkpoint
-        when(taskContext.upstreamMinClock).thenReturn(num)
-        windowAverage.onNext(PersistentTask.CHECKPOINT)
+        windowAverage.onWatermarkProgress(Instant.ofEpochMilli(num))
         appMaster.expectMsg(UpdateCheckpointClock(taskContext.taskId, num))
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
index 0a8fb4f..a0996b3 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/Split.java
@@ -21,6 +21,7 @@ package org.apache.gearpump.streaming.examples.wordcountjava;
 import org.apache.gearpump.Message;
 import org.apache.gearpump.cluster.UserConfig;
 import org.apache.gearpump.streaming.javaapi.Task;
+import org.apache.gearpump.streaming.source.Watermark;
 import org.apache.gearpump.streaming.task.TaskContext;
 
 import java.time.Instant;
@@ -33,13 +34,9 @@ public class Split extends Task {
     super(taskContext, userConf);
   }
 
-  private Long now() {
-    return System.currentTimeMillis();
-  }
-
   @Override
   public void onStart(Instant startTime) {
-    self().tell(new Message("start", now()), self());
+    self().tell(new Watermark(Instant.now()), self());
   }
 
   @Override
@@ -48,8 +45,8 @@ public class Split extends Task {
     // Split the TEXT to words
     String[] words = TEXT.split(" ");
     for (int i = 0; i < words.length; i++) {
-      context.output(new Message(words[i], now()));
+      context.output(new Message(words[i], Instant.now().toEpochMilli()));
     }
-    self().tell(new Message("next", now()), self());
+    self().tell(new Watermark(Instant.now()), self());
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
index af3c04c..44cf211 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/Split.scala
@@ -23,13 +23,14 @@ import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext, conf) {
   import taskContext.output
 
   override def onStart(startTime: Instant): Unit = {
-    self ! Message("start")
+    self ! Watermark(Instant.now)
   }
 
   override def onNext(msg: Message): Unit = {
@@ -41,7 +42,7 @@ class Split(taskContext: TaskContext, conf: UserConfig) extends Task(taskContext
 
     import scala.concurrent.duration._
     taskContext.scheduleOnce(Duration(100, TimeUnit.MILLISECONDS))(self !
-      Message("continue", System.currentTimeMillis()))
+      Watermark(Instant.now))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
index 8b50890..46d9e97 100644
--- a/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
+++ b/examples/streaming/wordcount/src/test/scala/org/apache/gearpump/streaming/examples/wordcount/SplitSpec.scala
@@ -19,6 +19,8 @@ package org.apache.gearpump.streaming.examples.wordcount
 
 import java.time.Instant
 
+import org.apache.gearpump.streaming.source.Watermark
+
 import scala.concurrent.Await
 import scala.concurrent.duration.Duration
 
@@ -49,7 +51,7 @@ class SplitSpec extends WordSpec with Matchers {
       val conf = UserConfig.empty
       val split = new Split(taskContext, conf)
       split.onStart(Instant.EPOCH)
-      mockTaskActor.expectMsgType[Message]
+      mockTaskActor.expectMsgType[Watermark]
 
       val expectedWordCount = Split.TEXT_TO_SPLIT.split( """[\s\n]+""").count(_.nonEmpty)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
index b92f037..bc665c9 100644
--- a/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
+++ b/experiments/storm/src/main/scala/org/apache/gearpump/experiments/storm/producer/StormProducer.scala
@@ -26,6 +26,7 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
 import org.apache.gearpump.experiments.storm.util._
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task._
 
 import scala.concurrent.duration.Duration
@@ -55,7 +56,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
       getCheckpointClock
     }
     timeoutMillis.foreach(scheduleTimeout)
-    self ! Message("start")
+    self ! Watermark(Instant.now)
   }
 
   override def onNext(msg: Message): Unit = {
@@ -68,7 +69,7 @@ private[storm] class StormProducer(gearpumpSpout: GearpumpSpout,
       case _ =>
         gearpumpSpout.next(msg)
     }
-    self ! Message("continue")
+    self ! Watermark(Instant.now)
   }
 
   override def receiveUnManagedMessage: Receive = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
index ee89a4a..2e304a1 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/producer/StormProducerSpec.scala
@@ -25,6 +25,7 @@ import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.GearpumpSpout
 import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.source.Watermark
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{Matchers, WordSpec}
@@ -46,7 +47,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
       stormProducer.onStart(startTime)
 
       verify(gearpumpSpout).start(startTime)
-      taskActor.expectMsg(Message("start"))
+      taskActor.expectMsgType[Watermark]
     }
 
     "pass message to GearpumpBolt onNext" in {
@@ -64,7 +65,7 @@ class StormProducerSpec extends WordSpec with Matchers with MockitoSugar {
       stormProducer.onNext(message)
 
       verify(gearpumpSpout).next(message)
-      taskActor.expectMsg(Message("continue"))
+      taskActor.expectMsgType[Watermark]
 
       stormProducer.onNext(StormProducer.TIMEOUT)
       verify(gearpumpSpout).timeout(timeout)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
index 0d5bec7..8c931cd 100644
--- a/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
+++ b/external/kafka/src/main/java/org/apache/gearpump/streaming/kafka/util/KafkaConfig.java
@@ -21,10 +21,9 @@ package org.apache.gearpump.streaming.kafka.util;
 import kafka.api.OffsetRequest;
 import kafka.common.TopicAndPartition;
 import kafka.consumer.ConsumerConfig;
-import org.apache.gearpump.streaming.kafka.lib.source.DefaultMessageDecoder;
+import org.apache.gearpump.streaming.kafka.lib.source.DefaultKafkaMessageDecoder;
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient;
 import org.apache.gearpump.streaming.kafka.lib.source.grouper.DefaultPartitionGrouper;
-import org.apache.gearpump.streaming.source.DefaultTimeStampFilter;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 
@@ -87,10 +86,6 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
   private static final String MESSAGE_DECODER_CLASS_DOC =
       "Message decoder class that implements the <code>MessageDecoder</code> interface.";
 
-  public static final String TIMESTAMP_FILTER_CLASS_CONFIG = "timestamp.filter.class";
-  private static final String TIMESTAMP_FILTER_CLASS_DOC =
-      "Timestamp filter class that implements the <code>TimeStampFilter</code> interface";
-
   public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper";
   private static final String PARTITION_GROUPER_CLASS_DOC =
       "Partition grouper class that implements the <code>KafkaGrouper</code> interface.";
@@ -119,8 +114,9 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
             "",
             ConfigDef.Importance.HIGH,
             GROUP_ID_DOC)
-        .define(ZOOKEEPER_CONNECT_CONFIG, // required with no default value
+        .define(ZOOKEEPER_CONNECT_CONFIG,
             ConfigDef.Type.STRING,
+            "",
             ConfigDef.Importance.HIGH,
             ZOOKEEPER_CONNECT_DOC)
         .define(REPLICATION_FACTOR_CONFIG,
@@ -131,14 +127,9 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
             REPLICATION_FACTOR_DOC)
         .define(MESSAGE_DECODER_CLASS_CONFIG,
             ConfigDef.Type.CLASS,
-            DefaultMessageDecoder.class.getName(),
+            DefaultKafkaMessageDecoder.class.getName(),
             ConfigDef.Importance.MEDIUM,
             MESSAGE_DECODER_CLASS_DOC)
-        .define(TIMESTAMP_FILTER_CLASS_CONFIG,
-            ConfigDef.Type.CLASS,
-            DefaultTimeStampFilter.class.getName(),
-            ConfigDef.Importance.MEDIUM,
-            TIMESTAMP_FILTER_CLASS_DOC)
         .define(PARTITION_GROUPER_CLASS_CONFIG,
             ConfigDef.Type.CLASS,
             DefaultPartitionGrouper.class.getName(),
@@ -228,7 +219,6 @@ public class KafkaConfig extends AbstractConfig implements Serializable {
     props.remove(FETCH_THRESHOLD_CONFIG);
     props.remove(PARTITION_GROUPER_CLASS_CONFIG);
     props.remove(MESSAGE_DECODER_CLASS_CONFIG);
-    props.remove(TIMESTAMP_FILTER_CLASS_CONFIG);
     props.remove(REPLICATION_FACTOR_CONFIG);
     props.remove(CHECKPOINT_STORE_NAME_PREFIX_CONFIG);
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaMessageDecoder.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaMessageDecoder.scala
new file mode 100644
index 0000000..9357781
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/KafkaMessageDecoder.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.kafka.lib
+
+import java.time.Instant
+
+import org.apache.gearpump._
+
+/**
+ * Decodes Kafka raw message of (key, value) bytes
+ */
+trait KafkaMessageDecoder extends java.io.Serializable {
+  /**
+   * @param key key of a kafka message, can be NULL
+   * @param value value of a kafka message
+   * @return a gearpump Message and watermark (i.e. event time progress)
+   */
+  def fromBytes(key: Array[Byte], value: Array[Byte]): MessageAndWatermark
+}
+
+case class MessageAndWatermark(message: Message, watermark: Instant)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
index 314eae8..ba49899 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/AbstractKafkaSource.scala
@@ -24,6 +24,7 @@ import java.util.Properties
 import com.twitter.bijection.Injection
 import kafka.common.TopicAndPartition
 import org.apache.gearpump.streaming.kafka.KafkaSource
+import org.apache.gearpump.streaming.kafka.lib.KafkaMessageDecoder
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
 import KafkaClient.KafkaClientFactory
@@ -46,17 +47,7 @@ object AbstractKafkaSource {
  * Contains implementation for Kafka source connectors, users should use
  * [[org.apache.gearpump.streaming.kafka.KafkaSource]].
  *
- * This is a TimeReplayableSource which is able to replay messages given a start time.
- * Each kafka message is tagged with a timestamp by
- * [[org.apache.gearpump.streaming.transaction.api.MessageDecoder]] and the (timestamp, offset)
- * mapping is stored to a [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]].
- * On recovery, we could retrieve the previously stored offset from the
- * [[org.apache.gearpump.streaming.transaction.api.CheckpointStore]] by timestamp and start to read
- * from there.
- *
- * kafka message is wrapped into gearpump [[org.apache.gearpump.Message]] and further filtered by a
- * [[org.apache.gearpump.streaming.transaction.api.TimeStampFilter]]
- * such that obsolete messages are dropped.
+ * This is a TimeReplayableSource which is able to replay messages from kafka given a start time.
  */
 abstract class AbstractKafkaSource(
     topic: String,
@@ -75,11 +66,9 @@ abstract class AbstractKafkaSource(
   private lazy val kafkaClient: KafkaClient = kafkaClientFactory.getKafkaClient(config)
   private lazy val fetchThread: FetchThread = fetchThreadFactory.getFetchThread(config, kafkaClient)
   private lazy val messageDecoder = config.getConfiguredInstance(
-    KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[MessageDecoder])
-  private lazy val timestampFilter = config.getConfiguredInstance(
-    KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG, classOf[TimeStampFilter])
+    KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG, classOf[KafkaMessageDecoder])
 
-  private var startTime: Long = 0L
+  private var watermark: Instant = Instant.EPOCH
   private var checkpointStoreFactory: Option[CheckpointStoreFactory] = None
   private var checkpointStores: Map[TopicAndPartition, CheckpointStore] =
     Map.empty[TopicAndPartition, CheckpointStore]
@@ -92,7 +81,7 @@ abstract class AbstractKafkaSource(
     import context.{parallelism, taskId}
 
     LOG.info("KafkaSource opened at start time {}", startTime)
-    this.startTime = startTime.toEpochMilli
+    this.watermark = startTime
     val topicList = topic.split(",", -1).toList
     val grouper = config.getConfiguredInstance(KafkaConfig.PARTITION_GROUPER_CLASS_CONFIG,
       classOf[PartitionGrouper])
@@ -102,7 +91,7 @@ abstract class AbstractKafkaSource(
 
     fetchThread.setTopicAndPartitions(topicAndPartitions)
     maybeSetupCheckpointStores(topicAndPartitions)
-    maybeRecover()
+    maybeRecover(startTime.toEpochMilli)
   }
 
   /**
@@ -111,7 +100,7 @@ abstract class AbstractKafkaSource(
    * @return a [[org.apache.gearpump.Message]] or null
    */
   override def read(): Message = {
-    fetchThread.poll.flatMap(filterAndCheckpointMessage).orNull
+    fetchThread.poll.map(decodeMessageAndCheckpointOffset).orNull
   }
 
   override def close(): Unit = {
@@ -120,22 +109,25 @@ abstract class AbstractKafkaSource(
     LOG.info("KafkaSource closed")
   }
 
+  override def getWatermark: Instant = watermark
+
   /**
    * 1. Decodes raw bytes into Message with timestamp
    * 2. Filters message against start time
    * 3. Checkpoints (timestamp, kafka_offset)
    */
-  private def filterAndCheckpointMessage(kafkaMsg: KafkaMessage): Option[Message] = {
-    val msg = messageDecoder.fromBytes(kafkaMsg.key.orNull, kafkaMsg.msg)
-    LOG.debug("read message {}", msg)
-    val filtered = timestampFilter.filter(msg, startTime)
-    filtered.foreach { m =>
-      val time = m.timestamp
-      val offset = kafkaMsg.offset
-      LOG.debug("checkpoint message state ({}, {})", time, offset)
-      checkpointOffsets(kafkaMsg.topicAndPartition, time, offset)
-    }
-    filtered
+  private def decodeMessageAndCheckpointOffset(kafkaMsg: KafkaMessage): Message = {
+    val msgAndWmk = messageDecoder.fromBytes(kafkaMsg.key.orNull, kafkaMsg.msg)
+    LOG.debug("read message and watermark {}", msgAndWmk)
+
+    val msg = msgAndWmk.message
+    this.watermark = msgAndWmk.watermark
+    val time = msg.timestamp
+    val offset = kafkaMsg.offset
+    checkpointOffsets(kafkaMsg.topicAndPartition, time, offset)
+    LOG.debug("checkpoint message state ({}, {})", time, offset)
+
+    msg
   }
 
   private def checkpointOffsets(tp: TopicAndPartition, time: TimeStamp, offset: Long): Unit = {
@@ -153,7 +145,7 @@ abstract class AbstractKafkaSource(
     }
   }
 
-  private def maybeRecover(): Unit = {
+  private def maybeRecover(startTime: TimeStamp): Unit = {
     checkpointStores.foreach { case (tp, store) =>
       for {
         bytes <- store.recover(startTime)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
new file mode 100644
index 0000000..5e13230
--- /dev/null
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoder.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.kafka.lib.{MessageAndWatermark, KafkaMessageDecoder}
+
+class DefaultKafkaMessageDecoder extends KafkaMessageDecoder {
+
+  override def fromBytes(key: Array[Byte], value: Array[Byte]): MessageAndWatermark = {
+    val time = Instant.now()
+    MessageAndWatermark(Message(value, time.toEpochMilli), time)
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala
deleted file mode 100644
index 1c1214d..0000000
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoder.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.source
-
-import com.twitter.bijection.Injection
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.transaction.api.MessageDecoder
-
-import scala.util.{Failure, Success}
-
-class DefaultMessageDecoder extends MessageDecoder {
-  override def fromBytes(key: Array[Byte], value: Array[Byte]): Message = {
-    Message(value, System.currentTimeMillis())
-  }
-}
-
-class StringMessageDecoder extends MessageDecoder {
-  override def fromBytes(key: Array[Byte], value: Array[Byte]): Message = {
-    Message(Injection.invert[String, Array[Byte]](value).get,
-      System.currentTimeMillis())
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
index 417b6de..581be6a 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClient.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.streaming.kafka.lib.util
 
-import java.util.Properties
-
 import kafka.admin.AdminUtils
 import kafka.cluster.Broker
 import kafka.common.TopicAndPartition

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
index 6ccb231..88d7420 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaSourceSpec.scala
@@ -24,15 +24,15 @@ import java.util.Properties
 import com.twitter.bijection.Injection
 import kafka.common.TopicAndPartition
 import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.kafka.lib.{MessageAndWatermark, KafkaMessageDecoder}
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.FetchThread.FetchThreadFactory
 import org.apache.gearpump.streaming.kafka.lib.source.grouper.PartitionGrouper
-import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
-import KafkaClient.KafkaClientFactory
+import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient.KafkaClientFactory
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, FetchThread}
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
 import org.apache.gearpump.streaming.kafka.util.KafkaConfig
 import org.apache.gearpump.streaming.kafka.util.KafkaConfig.KafkaConfigFactory
-import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory, MessageDecoder, TimeStampFilter}
+import org.apache.gearpump.streaming.transaction.api.{CheckpointStore, CheckpointStoreFactory}
 import org.apache.gearpump.Message
 import org.mockito.Matchers._
 import org.mockito.Mockito._
@@ -140,8 +140,7 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
       val properties = mock[Properties]
       val config = mock[KafkaConfig]
       val configFactory = mock[KafkaConfigFactory]
-      val timestampFilter = mock[TimeStampFilter]
-      val messageDecoder = mock[MessageDecoder]
+      val messageDecoder = mock[KafkaMessageDecoder]
       val kafkaClient = mock[KafkaClient]
       val clientFactory = mock[KafkaClientFactory]
       val fetchThread = mock[FetchThread]
@@ -156,10 +155,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
           .getCheckpointStoreNameSuffix(tp))).thenReturn(store)
       }
       when(configFactory.getKafkaConfig(properties)).thenReturn(config)
-      when(config.getConfiguredInstance(KafkaConfig.TIMESTAMP_FILTER_CLASS_CONFIG,
-        classOf[TimeStampFilter])).thenReturn(timestampFilter)
       when(config.getConfiguredInstance(KafkaConfig.MESSAGE_DECODER_CLASS_CONFIG,
-        classOf[MessageDecoder])).thenReturn(messageDecoder)
+        classOf[KafkaMessageDecoder])).thenReturn(messageDecoder)
       when(clientFactory.getKafkaClient(config)).thenReturn(kafkaClient)
       when(threadFactory.getFetchThread(config, kafkaClient)).thenReturn(fetchThread)
 
@@ -174,8 +171,8 @@ class KafkaSourceSpec extends PropSpec with PropertyChecks with Matchers with Mo
         msgQueue.foreach { kafkaMsg =>
           when(fetchThread.poll).thenReturn(Option(kafkaMsg))
           val message = Message(kafkaMsg.msg, kafkaMsg.offset)
-          when(messageDecoder.fromBytes(kafkaMsg.key.get, kafkaMsg.msg)).thenReturn(message)
-          when(timestampFilter.filter(message, 0)).thenReturn(Some(message))
+          val msgAndWmk = MessageAndWatermark(message, Instant.ofEpochMilli(kafkaMsg.offset))
+          when(messageDecoder.fromBytes(kafkaMsg.key.get, kafkaMsg.msg)).thenReturn(msgAndWmk)
           source.read() shouldBe Message(kafkaMsg.msg, kafkaMsg.offset)
           verify(checkpointStores(kafkaMsg.topicAndPartition)).persist(
             kafkaMsg.offset, Injection[Long, Array[Byte]](kafkaMsg.offset))

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
new file mode 100644
index 0000000..81b2661
--- /dev/null
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultKafkaMessageDecoderSpec.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.kafka.lib.source
+
+import com.twitter.bijection.Injection
+import org.scalacheck.Gen
+import org.scalatest.prop.PropertyChecks
+import org.scalatest.{Matchers, PropSpec}
+
+class DefaultKafkaMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
+  property("DefaultMessageDecoder should keep the original bytes data in Message") {
+    val decoder = new DefaultKafkaMessageDecoder()
+    forAll(Gen.chooseNum[Int](0, 100), Gen.alphaStr) { (k: Int, v: String) =>
+      val kbytes = Injection[Int, Array[Byte]](k)
+      val vbytes = Injection[String, Array[Byte]](v)
+      val timestamp = System.currentTimeMillis()
+      val msgAndWmk = decoder.fromBytes(kbytes, vbytes)
+      val message = msgAndWmk.message
+      val watermark = msgAndWmk.watermark
+      message.msg shouldBe vbytes
+      // processing time as message timestamp and watermark
+      message.timestamp shouldBe watermark.toEpochMilli
+      message.timestamp should be >= timestamp
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala
deleted file mode 100644
index 843aab7..0000000
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/source/DefaultMessageDecoderSpec.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.kafka.lib.source
-
-import com.twitter.bijection.Injection
-import org.scalacheck.Gen
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class DefaultMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
-  property("DefaultMessageDecoder should keep the original bytes data in Message") {
-    val decoder = new DefaultMessageDecoder()
-    forAll(Gen.chooseNum[Int](0, 100), Gen.alphaStr) { (k: Int, v: String) =>
-      val kbytes = Injection[Int, Array[Byte]](k)
-      val vbytes = Injection[String, Array[Byte]](v)
-      val timestamp = System.currentTimeMillis()
-      val message = decoder.fromBytes(kbytes, vbytes)
-      message.msg shouldBe vbytes
-      message.timestamp should be >= timestamp
-    }
-  }
-}
-
-class StringMessageDecoderSpec extends PropSpec with PropertyChecks with Matchers {
-  property("StringMessageDecoder should decode original bytes data into string") {
-    val decoder = new StringMessageDecoder()
-    forAll(Gen.alphaStr, Gen.alphaStr) { (k: String, v: String) =>
-      val kbytes = Injection[String, Array[Byte]](k)
-      val vbytes = Injection[String, Array[Byte]](v)
-      val timestamp = System.currentTimeMillis()
-      val message = decoder.fromBytes(kbytes, vbytes)
-      message.msg shouldBe v
-      message.timestamp should be >= timestamp
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
index b2db243..8d2579f 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/lib/util/KafkaClientSpec.scala
@@ -27,8 +27,10 @@ import org.apache.gearpump.streaming.kafka.util.{KafkaConfig, KafkaServerHarness
 import org.apache.kafka.clients.producer.KafkaProducer
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, BeforeAndAfterEach, PropSpec}
+import org.scalatest.{Ignore, Matchers, BeforeAndAfterEach, PropSpec}
 
+// Ignore since KafkaClientSpec randomly fails on Travis
+@Ignore
 class KafkaClientSpec extends PropSpec with PropertyChecks
   with BeforeAndAfterEach with Matchers with KafkaServerHarness {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
----------------------------------------------------------------------
diff --git a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
index 56b33c1..81e7b2a 100644
--- a/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
+++ b/integrationtest/core/src/it/scala/org/apache/gearpump/integrationtest/checklist/DynamicDagSpec.scala
@@ -42,7 +42,7 @@ class DynamicDagSpec extends TestSpecBase {
       // todo: blocked by #1450
     }
 
-    "can replace down stream with wordcount's sum processor (new processor will have metrics)" in {
+    "can replace downstream with wordcount's sum processor (new processor will have metrics)" in {
       // setup
       val appId = expectSolJarSubmittedWithAppId()
 
@@ -57,7 +57,7 @@ class DynamicDagSpec extends TestSpecBase {
       processorHasThroughput(appId, laterProcessors.keySet.max, "receiveThroughput")
     }
 
-    "can replace up stream with wordcount's split processor (new processor will have metrics)" in {
+    "can replace upstream with wordcount's split processor (new processor will have metrics)" in {
       // setup
       val appId = expectSolJarSubmittedWithAppId()
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index d1a03de..31e1151 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -147,7 +147,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
   /** Handles messages from Tasks */
   def taskMessageHandler: Receive = {
     case clock: ClockEvent =>
-      taskManager.foreach(_ forward clock)
+      clockService.foreach(_ forward clock)
     case register: RegisterTask =>
       taskManager.foreach(_ forward register)
     case unRegister: UnRegisterTask =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 6fc0782..68db354 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -61,7 +61,7 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with
       minCheckpointClock = Some(startClock)
 
       // Recover the application by restarting from last persisted startClock.
-      // Only messge after startClock will be replayed.
+      // Only message after startClock will be replayed.
       self ! StoredStartClock(startClock)
       LOG.info(s"Start Clock Retrieved, starting ClockService, startClock: $startClock")
     }
@@ -70,8 +70,8 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with
   }
 
   override def postStop(): Unit = {
-    Option(healthCheckScheduler).map(_.cancel())
-    Option(snapshotScheduler).map(_.cancel())
+    Option(healthCheckScheduler).foreach(_.cancel())
+    Option(snapshotScheduler).foreach(_.cancel())
   }
 
   // Keep track of clock value of all processors.
@@ -118,13 +118,7 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with
         (processorId, clock)
       }
 
-    this.upstreamClocks = clocks.map { pair =>
-      val (processorId, _) = pair
-
-      val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
-      val upstreamClocks = upstreams.flatMap(clocks.get)
-      (processorId, upstreamClocks.toArray)
-    }
+    this.upstreamClocks = getUpstreamClocks(clocks)
 
     this.processorClocks = clocks.toArray.map(_._2)
 
@@ -147,24 +141,19 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with
 
     this.clocks = newClocks
 
-    this.upstreamClocks = newClocks.map { pair =>
-      val (processorId, _) = pair
-
-      val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
-      val upstreamClocks = upstreams.flatMap(newClocks.get)
-      (processorId, upstreamClocks.toArray)
-    }
+    this.upstreamClocks = getUpstreamClocks(clocks)
 
     // Inits the clock of all processors.
-    newClocks.foreach { pair =>
+    clocks.foreach { pair =>
       val (processorId, processorClock) = pair
       val upstreamClock = getUpStreamMinClock(processorId)
       val birth = processorClock.life.birth
 
-      if (dag.graph.inDegreeOf(processorId) == 0) {
-        processorClock.init(Longs.max(birth, startClock))
-      } else {
-        processorClock.init(upstreamClock)
+      upstreamClock match {
+        case Some(clock) =>
+          processorClock.init(clock)
+        case None =>
+          processorClock.init(Longs.max(birth, startClock))
       }
     }
 
@@ -195,33 +184,40 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with
       stash()
   }
 
-  private def getUpStreamMinClock(processorId: ProcessorId): TimeStamp = {
-    val clocks = upstreamClocks.get(processorId)
-    if (clocks.isDefined) {
-      if (clocks.get == null || clocks.get.length == 0) {
-        Long.MaxValue
-      } else {
-        ProcessorClocks.minClock(clocks.get)
-      }
-    } else {
-      Long.MaxValue
+  private def getUpstreamClocks(
+      clocks: Map[ProcessorId, ProcessorClock]): Map[ProcessorId, Array[ProcessorClock]] = {
+    clocks.foldLeft(Map.empty[ProcessorId, Array[ProcessorClock]]) {
+      case (accum, (processorId, clock)) =>
+        val upstreams = dag.graph.incomingEdgesOf(processorId).map(_._1)
+        if (upstreams.nonEmpty) {
+          val upstreamClocks = upstreams.collect(clocks)
+          if (upstreamClocks.nonEmpty) {
+            accum + (processorId -> upstreamClocks.toArray)
+          } else {
+            accum
+          }
+        } else {
+          accum
+        }
     }
   }
 
+  private def getUpStreamMinClock(processorId: ProcessorId): Option[TimeStamp] = {
+    upstreamClocks.get(processorId).map(ProcessorClocks.minClock)
+  }
+
   def clockService: Receive = {
     case GetUpstreamMinClock(task) =>
-      sender ! UpstreamMinClock(getUpStreamMinClock(task.processorId))
+      getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_))
 
     case update@UpdateClock(task, clock) =>
-      val upstreamMinClock = getUpStreamMinClock(task.processorId)
-
       val processorClock = clocks.get(task.processorId)
       if (processorClock.isDefined) {
         processorClock.get.updateMinClock(task.index, clock)
       } else {
         LOG.error(s"Cannot updateClock for task $task")
       }
-      sender ! UpstreamMinClock(upstreamMinClock)
+      getUpStreamMinClock(task.processorId).foreach(sender ! UpstreamMinClock(_))
 
     case GetLatestMinClock =>
       sender ! LatestMinClock(minClock)
@@ -265,9 +261,9 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with
         dynamicDAG(dag, getStartClock)
       } else {
         // Restarts current dag.
-        recoverDag(dag, getStartClock)
+        recoverDag(newDag, getStartClock)
       }
-      LOG.info(s"Change to new DAG(dag = ${dag.version}), send back ChangeToNewDAGSuccess")
+      LOG.info(s"Change to new DAG(dag = ${newDag.version}), send back ChangeToNewDAGSuccess")
       sender ! ChangeToNewDAGSuccess(clocks.map { pair =>
         val (id, clock) = pair
         (id, clock.min)
@@ -287,6 +283,7 @@ class ClockService(private var dag: DAG, store: AppDataStore) extends Actor with
     upstreamClocks = upstreamClocks - processorId
 
     // Removes dead processor from checkpoints.
+    checkpointClocks = checkpointClocks.filter(_._1.processorId != processorId)
     checkpointClocks = checkpointClocks.filter { kv =>
       val (taskId, _) = kv
       taskId.processorId != processorId
@@ -329,11 +326,11 @@ object ClockService {
 
   case object HealthCheck
 
-  class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallism: Int,
+  class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int,
       private var _min: TimeStamp = 0L, private var _taskClocks: Array[TimeStamp] = null) {
 
     def copy(life: LifeTime): ProcessorClock = {
-      new ProcessorClock(processorId, life, parallism, _min, _taskClocks)
+      new ProcessorClock(processorId, life, parallelism, _min, _taskClocks)
     }
 
     def min: TimeStamp = _min
@@ -342,7 +339,7 @@ object ClockService {
     def init(startClock: TimeStamp): Unit = {
       if (taskClocks == null) {
         this._min = startClock
-        this._taskClocks = new Array(parallism)
+        this._taskClocks = new Array(parallelism)
         util.Arrays.fill(taskClocks, startClock)
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 48cc50e..085b3f0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -103,8 +103,6 @@ private[appmaster] class TaskManager(
   def receive: Receive = applicationReady(DagReadyState.empty)
 
   private def onClientQuery(taskRegistry: TaskRegistry): Receive = {
-    case clock: ClockEvent =>
-      clockService forward clock
     case GetTaskList =>
       sender ! TaskList(taskRegistry.getTaskExecutorMap)
     case LookupTaskActorRef(taskId) =>

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index b6c087e..d45737b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -111,6 +111,8 @@ object StreamApp {
 class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
   private lazy val iterator: Iterator[T] = seq.iterator
 
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
   override def read(): Message = {
     if (iterator.hasNext) {
       Message(iterator.next())
@@ -121,5 +123,5 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
 
   override def close(): Unit = {}
 
-  override def open(context: TaskContext, startTime: Instant): Unit = {}
+  override def getWatermark: Instant = Instant.now()
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
index f55d102..f4c87da 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSource.scala
@@ -45,6 +45,7 @@ import org.apache.gearpump.Message
  */
 trait DataSource extends java.io.Serializable {
 
+
   /**
    * Opens connection to data source
    * invoked in onStart() method of [[org.apache.gearpump.streaming.source.DataSourceTask]]
@@ -67,4 +68,11 @@ trait DataSource extends java.io.Serializable {
    * invoked in onStop() method of [[org.apache.gearpump.streaming.source.DataSourceTask]]
    */
   def close(): Unit
+
+  /**
+   * Returns a watermark
+   * no timestamp earlier than the watermark
+   * should enter the system
+   */
+  def getWatermark: Instant
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 468ae3b..5d1a11e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -19,10 +19,13 @@
 package org.apache.gearpump.streaming.source
 
 import java.time.Instant
+import java.util.concurrent.TimeUnit
 
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.{UpstreamMinClock, Task, TaskContext}
+
+import scala.concurrent.duration._
 
 object DataSourceTask {
   val DATA_SOURCE = "data_source"
@@ -51,18 +54,23 @@ class DataSourceTask private[source](context: TaskContext, conf: UserConfig, sou
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
-    self ! Message("start", System.currentTimeMillis())
+
+    self ! Watermark(source.getWatermark)
   }
 
   override def onNext(message: Message): Unit = {
     0.until(batchSize).foreach { _ =>
-      Option(source.read()).foreach(context.output)
+      Option(source.read()).foreach { msg =>
+        context.output(msg)
+      }
     }
-    self ! Message("continue", System.currentTimeMillis())
+
+    self ! Watermark(source.getWatermark)
   }
 
   override def onStop(): Unit = {
     LOG.info("closing data source...")
     source.close()
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala
deleted file mode 100644
index df54cc2..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DefaultTimeStampFilter.scala
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.source
-
-import org.apache.gearpump.streaming.transaction.api.TimeStampFilter
-import org.apache.gearpump.{Message, TimeStamp}
-
-/**
- * TimeStampFilter filters out messages which have obsolete (smaller) timestamp.
- */
-class DefaultTimeStampFilter extends TimeStampFilter {
-  override def filter(msg: Message, predicate: TimeStamp): Option[Message] = {
-    Option(msg).find(_.timestamp >= predicate)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
new file mode 100644
index 0000000..36099c1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.source
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+
+/**
+ * message used by source task to report source watermark.
+ */
+case class Watermark(instant: Instant) {
+  def toMessage: Message = Message("watermark", instant.toEpochMilli)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index aceff4a..5eaed40 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -19,8 +19,6 @@
 package org.apache.gearpump.streaming.state.api
 
 import java.time.Instant
-import java.util.concurrent.TimeUnit
-import scala.concurrent.duration.FiniteDuration
 
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
@@ -30,7 +28,6 @@ import org.apache.gearpump.util.LogUtil
 import org.apache.gearpump.{Message, TimeStamp}
 
 object PersistentTask {
-  val CHECKPOINT = Message("checkpoint")
   val LOG = LogUtil.getLogger(getClass)
 }
 
@@ -52,8 +49,6 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
     s"app$appId-task${taskId.processorId}_${taskId.index}")
   val checkpointInterval = conf.getLong(PersistentStateConfig.STATE_CHECKPOINT_INTERVAL_MS).get
   val checkpointManager = new CheckpointManager(checkpointInterval, checkpointStore)
-  // System time interval to attempt checkpoint
-  private val checkpointAttemptInterval = 1000L
 
   /**
    * Subclass should override this method to pass in a PersistentState. the framework has already
@@ -69,48 +64,41 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
   def processMessage(state: PersistentState[T], message: Message): Unit
 
   /** Persistent state that will be stored (by checkpointing) automatically to storage like HDFS */
-  val state = persistentState
+  private var state: PersistentState[T] = _
+
+  def getState: PersistentState[T] = state
 
   final override def onStart(startTime: Instant): Unit = {
+    state = persistentState
     val timestamp = startTime.toEpochMilli
     checkpointManager
       .recover(timestamp)
       .foreach(state.recover(timestamp, _))
 
     reportCheckpointClock(timestamp)
-    scheduleCheckpoint(checkpointAttemptInterval)
   }
 
   final override def onNext(message: Message): Unit = {
-    message match {
-      case CHECKPOINT =>
-        val upstreamMinClock = taskContext.upstreamMinClock
-        if (checkpointManager.shouldCheckpoint(upstreamMinClock)) {
-          checkpointManager.getCheckpointTime.foreach { checkpointTime =>
-            val serialized = state.checkpoint()
-            checkpointManager.checkpoint(checkpointTime, serialized)
-              .foreach(state.setNextCheckpointTime)
-            taskContext.output(Message(serialized, checkpointTime))
-            reportCheckpointClock(checkpointTime)
-          }
-        }
-        scheduleCheckpoint(checkpointAttemptInterval)
-      case _ =>
-        checkpointManager.update(message.timestamp)
+    checkpointManager.update(message.timestamp)
+      .foreach(state.setNextCheckpointTime)
+    processMessage(state, message)
+  }
+
+  final override def onWatermarkProgress(watermark: Instant): Unit = {
+    if (checkpointManager.shouldCheckpoint(watermark.toEpochMilli)) {
+      checkpointManager.getCheckpointTime.foreach { checkpointTime =>
+        val serialized = state.checkpoint()
+        checkpointManager.checkpoint(checkpointTime, serialized)
           .foreach(state.setNextCheckpointTime)
-        processMessage(state, message)
+        reportCheckpointClock(checkpointTime)
+      }
     }
   }
 
-
   final override def onStop(): Unit = {
     checkpointManager.close()
   }
 
-  private def scheduleCheckpoint(interval: Long): Unit = {
-    scheduleOnce(new FiniteDuration(interval, TimeUnit.MILLISECONDS))(self ! CHECKPOINT)
-  }
-
   private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
     appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
index c94dec4..5b174bd 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Task.scala
@@ -21,7 +21,6 @@ package org.apache.gearpump.streaming.task
 import java.time.Instant
 
 import scala.concurrent.duration.FiniteDuration
-
 import akka.actor.Actor.Receive
 import akka.actor.{ActorRef, ActorSystem, Cancellable, Props}
 import org.slf4j.Logger
@@ -157,6 +156,14 @@ trait TaskInterface {
    * @return the handler
    */
   def receiveUnManagedMessage: Receive = null
+
+  /**
+   * Method called on watermark update.
+   * Usually safe to output or checkpoint states earlier than watermark.
+   *
+   * @param watermark represents event time progress.
+   */
+  def onWatermarkProgress(watermark: Instant): Unit
 }
 
 abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends TaskInterface {
@@ -188,4 +195,7 @@ abstract class Task(taskContext: TaskContext, userConf: UserConfig) extends Task
     case msg =>
       LOG.error("Failed! Received unknown message " + "taskId: " + taskId + ", " + msg.toString)
   }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {}
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index 30a24fa..c0b6a29 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -23,6 +23,7 @@ import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
+import org.apache.gearpump.streaming.source.{Watermark, DataSourceTask}
 import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
@@ -30,7 +31,7 @@ import org.apache.gearpump.metrics.Metrics
 import org.apache.gearpump.serializer.SerializationFramework
 import org.apache.gearpump.streaming.AppMasterToExecutor._
 import org.apache.gearpump.streaming.ExecutorToAppMaster._
-import org.apache.gearpump.streaming.{Constants, ProcessorId}
+import org.apache.gearpump.streaming.ProcessorId
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
 import org.apache.gearpump.{Message, TimeStamp}
 
@@ -47,6 +48,7 @@ class TaskActor(
     extends Actor with ExpressTransport with TimeOutScheduler {
   var upstreamMinClock: TimeStamp = 0L
   private var _minClock: TimeStamp = 0L
+  private var minClockReported: Boolean = true
 
   def serializerPool: SerializationFramework = inputSerializerPool
 
@@ -194,7 +196,7 @@ class TaskActor(
     // Put this as the last step so that the subscription is already initialized.
     // Message sending in current Task before onStart will not be delivered to
     // target
-    onStart(Instant.ofEpochMilli(upstreamMinClock))
+    onStart(Instant.ofEpochMilli(_minClock))
 
     appMaster ! GetUpstreamMinClock(taskId)
     context.become(handleMessages(sender))
@@ -203,7 +205,7 @@ class TaskActor(
   def waitForTaskRegistered: Receive = {
     case start@TaskRegistered(_, sessionId, startClock) =>
       this.sessionId = sessionId
-      this.upstreamMinClock = startClock
+      this._minClock = startClock
       context.become(waitForStartClock)
   }
 
@@ -240,34 +242,15 @@ class TaskActor(
       receiveMessage(message, sender)
     case inputMessage: Message =>
       receiveMessage(inputMessage, sender)
-    case upstream@UpstreamMinClock(upstreamClock) =>
-      this.upstreamMinClock = upstreamClock
-
-      val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
-        val subMin = sub._2.minClock
-        // A subscription is holding back the _minClock;
-        // we send AckRequest to its tasks to push _minClock forward
-        if (subMin == _minClock) {
-          sub._2.sendAckRequestOnStallingTime(_minClock)
-        }
-        Math.min(min, subMin)
-      }
-
-      _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
-
-      val update = UpdateClock(taskId, _minClock)
-      context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
-        appMaster ! update
+    case watermark@Watermark(instant) =>
+      if (self.eq(sender) && minClockReported) {
+        updateUpstreamMinClock(instant.toEpochMilli)
+        minClockReported = false
       }
+      receiveMessage(watermark.toMessage, sender)
 
-      // Checks whether current task is dead.
-      if (_minClock > life.death) {
-        // There will be no more message received...
-        val unRegister = UnRegisterTask(taskId, executorId)
-        executor ! unRegister
-
-        LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life")
-      }
+    case upstream@UpstreamMinClock(upstreamClock) =>
+      updateUpstreamMinClock(upstreamClock)
 
     case ChangeTask(_, dagVersion, life, subscribers) =>
       this.life = life
@@ -310,8 +293,8 @@ class TaskActor(
   private def receiveMessage(msg: Message, sender: ActorRef): Unit = {
     val messageAfterCheck = securityChecker.checkMessage(msg, sender)
     messageAfterCheck match {
-      case Some(msg) =>
-        queue.add(msg)
+      case Some(m) =>
+        queue.add(m)
         doHandleMessage()
       case None =>
       // TODO: Indicate the error and avoid the LOG flood
@@ -322,11 +305,44 @@ class TaskActor(
   private def getSubscription(processorId: ProcessorId): Option[Subscription] = {
     subscriptions.find(_._1 == processorId).map(_._2)
   }
+
+  private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = {
+    if (upstreamClock > this.upstreamMinClock) {
+      task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock))
+    }
+    this.upstreamMinClock = upstreamClock
+
+    val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) =>
+      val subMin = sub._2.minClock
+      // A subscription is holding back the _minClock;
+      // we send AckRequest to its tasks to push _minClock forward
+      if (subMin == _minClock) {
+        sub._2.sendAckRequestOnStallingTime(_minClock)
+      }
+      Math.min(min, subMin)
+    }
+
+    _minClock = Math.max(life.birth, Math.min(upstreamMinClock, subMinClock))
+
+    val update = UpdateClock(taskId, _minClock)
+    context.system.scheduler.scheduleOnce(CLOCK_REPORT_INTERVAL) {
+      appMaster ! update
+      minClockReported = true
+    }
+
+
+    // Checks whether current task is dead.
+    if (_minClock > life.death) {
+      // There will be no more message received...
+      val unRegister = UnRegisterTask(taskId, executorId)
+      executor ! unRegister
+
+      LOG.info(s"Sending $unRegister, current minclock: ${_minClock}, life: $life")
+    }
+  }
 }
 
 object TaskActor {
-  // 3 seconds
-  val CLOCK_SYNC_TIMEOUT_INTERVAL = 3 * 1000
 
   // If the message comes from an unknown source, securityChecker will drop it
   class SecurityChecker(task_id: TaskId, self: ActorRef) {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
index 31c991e..1b3f30c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala
@@ -36,7 +36,7 @@ import org.apache.gearpump.{TimeStamp, Message}
  * @param userConf user config
  */
 class TaskWrapper(
-    val taskId: TaskId, taskClass: Class[_ <: Task], context: TaskContextData,
+    val taskId: TaskId, val taskClass: Class[_ <: Task], context: TaskContextData,
     userConf: UserConfig) extends TaskContext with TaskInterface {
 
   private val LOG = LogUtil.getLogger(taskClass, task = taskId)
@@ -131,4 +131,8 @@ class TaskWrapper(
    * containing environment.
    */
   override def logger: Logger = LOG
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    task.foreach(_.onWatermarkProgress(watermark))
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/529799cc/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
deleted file mode 100644
index 3ea33a5..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/transaction/api/MessageDecoder.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.transaction.api
-
-import org.apache.gearpump.Message
-
-/**
- * Decodes raw bytes to Message.
- * It is usually written by end user and passed into TimeReplayableSource
- */
-trait MessageDecoder extends java.io.Serializable {
-  /**
-   * @param key key of a kafka message, can be NULL
-   * @param value value of a kafka message
-   * @return a gearpump Message
-   */
-  def fromBytes(key: Array[Byte], value: Array[Byte]): Message
-}