You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/19 11:58:03 UTC

[2/3] incubator-gearpump git commit: [GEARPUMP-217] Merge master into sql branch

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
----------------------------------------------------------------------
diff --git a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
index 67c64c4..da99d64 100644
--- a/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
+++ b/external/kafka/src/test/scala/org/apache/gearpump/streaming/kafka/KafkaStoreSpec.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 import com.twitter.bijection.Injection
 import kafka.api.OffsetRequest
 import kafka.common.TopicAndPartition
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.kafka.lib.source.consumer.{KafkaMessage, KafkaConsumer}
 import org.apache.gearpump.streaming.kafka.lib.util.KafkaClient
@@ -92,7 +92,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
 
   property("KafkaStore should read checkpoint from timestamp on recover") {
     forAll(Gen.alphaStr, timestampGen) {
-      (topic: String, recoverTime: TimeStamp) =>
+      (topic: String, recoverTime: MilliSeconds) =>
         val consumer = mock[KafkaConsumer]
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -104,7 +104,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
     }
 
     forAll(Gen.alphaStr, timestampGen) {
-      (topic: String, recoverTime: TimeStamp) =>
+      (topic: String, recoverTime: MilliSeconds) =>
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, None)
 
@@ -113,12 +113,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
     }
 
     forAll(Gen.alphaStr, timestampGen, timestampGen) {
-      (topic: String, recoverTime: TimeStamp, checkpointTime: TimeStamp) =>
+      (topic: String, recoverTime: MilliSeconds, checkpointTime: MilliSeconds) =>
         val consumer = mock[KafkaConsumer]
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
 
-        val key = Injection[TimeStamp, Array[Byte]](checkpointTime)
+        val key = Injection[MilliSeconds, Array[Byte]](checkpointTime)
         val msg = key
         val kafkaMsg = KafkaMessage(TopicAndPartition(topic, 0), 0, Some(key), msg)
 
@@ -139,7 +139,7 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
 
   property("KafkaStore persist should write checkpoint with monotonically increasing timestamp") {
     forAll(Gen.alphaStr, timestampGen, Gen.alphaStr) {
-      (topic: String, checkpointTime: TimeStamp, data: String) =>
+      (topic: String, checkpointTime: MilliSeconds, data: String) =>
         val consumer = mock[KafkaConsumer]
         val producer = mock[KafkaProducer[Array[Byte], Array[Byte]]]
         val kafkaStore = new KafkaStore(topic, producer, Some(consumer))
@@ -155,12 +155,12 @@ class KafkaStoreSpec extends PropSpec with PropertyChecks with Matchers with Moc
     }
 
     def verifyProducer(producer: Producer[Array[Byte], Array[Byte]], count: Int,
-        topic: String, partition: Int, time: TimeStamp, data: String): Unit = {
+        topic: String, partition: Int, time: MilliSeconds, data: String): Unit = {
       verify(producer, times(count)).send(
         MockUtil.argMatch[ProducerRecord[Array[Byte], Array[Byte]]](record =>
           record.topic() == topic
           && record.partition() == partition
-          && Injection.invert[TimeStamp, Array[Byte]](record.key()).get == time
+          && Injection.invert[MilliSeconds, Array[Byte]](record.key()).get == time
           && Injection.invert[String, Array[Byte]](record.value()).get == data
         ))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildExamples.scala
----------------------------------------------------------------------
diff --git a/project/BuildExamples.scala b/project/BuildExamples.scala
index afb7459..b3a8e4a 100644
--- a/project/BuildExamples.scala
+++ b/project/BuildExamples.scala
@@ -106,7 +106,7 @@ object BuildExamples extends sbt.Build {
         "commons-io" % "commons-io" % commonsIOVersion,
         "io.spray" %% "spray-can" % sprayVersion,
         "io.spray" %% "spray-routing-shapeless2" % sprayVersion
-        )
+        ) ++ annotationDependencies
     ) ++ include("examples/distributeservice")
   ).dependsOn(core % "provided; test->test")
 
@@ -160,11 +160,12 @@ object BuildExamples extends sbt.Build {
         CrossVersion.binaryScalaVersion(scalaVersion.value)
     )
 
-  private def include(files: String*): Seq[Def.Setting[_]] = Seq(
-    assemblyExcludedJars in assembly := {
-      val cp = (fullClasspath in assembly).value
-      cp.filterNot(p =>
-        files.exists(p.data.getAbsolutePath.contains))
-    }
-  )
+  private def include(files: String*): Seq[Def.Setting[_]] =
+    Seq(
+      assemblyExcludedJars in assembly := {
+        val cp = (fullClasspath in assembly).value
+        cp.filterNot(p =>
+          files.exists(p.data.getAbsolutePath.contains))
+      }
+    )
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildExperiments.scala
----------------------------------------------------------------------
diff --git a/project/BuildExperiments.scala b/project/BuildExperiments.scala
index c603ec1..84c80f0 100644
--- a/project/BuildExperiments.scala
+++ b/project/BuildExperiments.scala
@@ -60,7 +60,7 @@ object BuildExperiments extends sbt.Build {
         libraryDependencies ++= Seq(
           "org.json4s" %% "json4s-jackson" % "3.2.11",
           "com.typesafe.akka" %% "akka-stream" % akkaVersion
-        ),
+        ) ++ annotationDependencies,
         mainClass in(Compile, packageBin) := Some("akka.stream.gearpump.example.Test")
       ))
     .dependsOn (core % "provided", streaming % "test->test; provided")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/BuildGearpump.scala
----------------------------------------------------------------------
diff --git a/project/BuildGearpump.scala b/project/BuildGearpump.scala
index 895c042..6c2acad 100644
--- a/project/BuildGearpump.scala
+++ b/project/BuildGearpump.scala
@@ -54,7 +54,17 @@ object BuildGearpump extends sbt.Build {
       useGpg := false,
       pgpSecretRing := file("./secring.asc"),
       pgpPublicRing := file("./pubring.asc"),
-      scalacOptions ++= Seq("-Yclosure-elim", "-Yinline"),
+      scalacOptions ++= Seq(
+        "-deprecation", // Emit warning and location for usages of deprecated APIs
+        "-encoding", "UTF-8", // Specify character encoding used by source files
+        "-feature", // Emit warning and location for usages of features
+                    // that should be imported explicitly
+        "-language:existentials", // Enable existential types
+        "-language:implicitConversions", // Enable implicit conversions
+        "-Yclosure-elim", // Perform closure elimination
+        "-Yinline", // Perform inlining when possible
+        "-Ywarn-unused-import" // Warn on unused imports
+      ),
       publishMavenStyle := true,
 
       pgpPassphrase := Option(System.getenv().get("PASSPHRASE")).map(_.toArray),
@@ -74,30 +84,32 @@ object BuildGearpump extends sbt.Build {
         }
       },
 
-      publishArtifact in Test := true,
-
       pomExtra := {
+        // scalastyle:off line.size.limit
         <url>https://github.com/apache/incubator-gearpump</url>
-          <licenses>
-            <license>
-              <name>Apache 2</name>
-              <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
-            </license>
-          </licenses>
-          <scm>
-            <connection>scm:git://git.apache.org/incubator-gearpump.git</connection>
-            <developerConnection>scm:git:git@github.com:apache/incubator-gearpump</developerConnection>
-            <url>github.com/apache/incubator-gearpump</url>
-          </scm>
-          <developers>
-            <developer>
-              <id>gearpump</id>
-              <name>Gearpump Team</name>
-              <url>http://gearpump.incubator.apache.org/community.html#who-we-are</url>
-            </developer>
-          </developers>
+        <licenses>
+          <license>
+            <name>Apache 2</name>
+            <url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
+          </license>
+        </licenses>
+        <scm>
+          <connection>scm:git://git.apache.org/incubator-gearpump.git</connection>
+          <developerConnection>scm:git:git@github.com:apache/incubator-gearpump</developerConnection>
+          <url>github.com/apache/incubator-gearpump</url>
+        </scm>
+        <developers>
+          <developer>
+            <id>gearpump</id>
+            <name>Gearpump Team</name>
+            <url>http://gearpump.incubator.apache.org/community.html#who-we-are</url>
+          </developer>
+        </developers>
+        // scalastyle:on line.size.limit
       },
 
+      publishArtifact in Test := true,
+
       pomPostProcess := {
         (node: xml.Node) => changeShadedDeps(
           Set(
@@ -194,7 +206,7 @@ object BuildGearpump extends sbt.Build {
 
         libraryDependencies ++= Seq(
           "com.goldmansachs" % "gs-collections" % gsCollectionsVersion
-        ),
+        ) ++ annotationDependencies,
 
         pomPostProcess := {
           (node: xml.Node) => changeShadedDeps(

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/project/Dependencies.scala
----------------------------------------------------------------------
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 40b6380..b146c08 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -56,6 +56,14 @@ object Dependencies {
   val rabbitmqVersion = "3.5.3"
   val calciteVersion = "1.12.0"
 
+  val annotationDependencies = Seq(
+    // work around for compiler warnings like
+    // "Class javax.annotation.CheckReturnValue not found - continuing with a stub"
+    // see https://issues.scala-lang.org/browse/SI-8978
+    // marked as "provided" to be excluded from assembling
+    "com.google.code.findbugs" % "jsr305" % "3.0.2" % "provided"
+  )
+
   val coreDependencies = Seq(
     libraryDependencies ++= Seq(
       "org.slf4j" % "slf4j-api" % slf4jVersion,
@@ -94,11 +102,12 @@ object Dependencies {
         exclude("org.slf4j", "slf4j-api"),
       "com.codahale.metrics" % "metrics-jvm" % codahaleVersion
         exclude("org.slf4j", "slf4j-api"),
+
       "com.typesafe.akka" %% "akka-testkit" % akkaVersion % "test",
       "org.scalatest" %% "scalatest" % scalaTestVersion % "test",
       "org.scalacheck" %% "scalacheck" % scalaCheckVersion % "test",
       "org.mockito" % "mockito-core" % mockitoVersion % "test",
       "junit" % "junit" % junitVersion % "test"
-    )
+    ) ++ annotationDependencies
   )
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
index be96577..1f6141a 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/MasterService.scala
@@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToClient.{HistoryMetrics, MasterConfig,
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.worker.WorkerSummary
 import org.apache.gearpump.cluster.{ClusterConfig, UserConfig}
-import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective, JarStoreServer}
+import org.apache.gearpump.jarstore.{JarStoreClient, FileDirective}
 import org.apache.gearpump.streaming.partitioner.{PartitionerByClassName, PartitionerDescription}
 import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
 // NOTE: This cannot be removed!!!

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
index 7b33987..762b9e4 100644
--- a/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
+++ b/services/jvm/src/main/scala/org/apache/gearpump/services/StaticService.scala
@@ -23,7 +23,6 @@ import akka.http.scaladsl.marshalling.ToResponseMarshallable
 import akka.http.scaladsl.model._
 import akka.http.scaladsl.server.Directives._
 import akka.http.scaladsl.marshalling.ToResponseMarshallable._
-import akka.http.scaladsl.server.{RejectionHandler, StandardRoute}
 import akka.stream.Materializer
 import org.apache.gearpump.util.Util
 // NOTE: This cannot be removed!!!

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
index 80264d2..4f2b642 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/AppMasterServiceSpec.scala
@@ -27,16 +27,14 @@ import akka.testkit.TestActor.{AutoPilot, KeepRunning}
 import akka.testkit.{TestKit, TestProbe}
 import com.typesafe.config.{Config, ConfigFactory}
 import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.slf4j.Logger
 import upickle.default.read
 import org.apache.gearpump.cluster.AppMasterToMaster.GeneralAppMasterSummary
 import org.apache.gearpump.cluster.ClientToMaster.{GetLastFailure, QueryAppMasterConfig, QueryHistoryMetrics, ResolveAppId}
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMasterDataDetailRequest, AppMasterDataRequest}
 import org.apache.gearpump.cluster.MasterToClient._
 import org.apache.gearpump.cluster.{ApplicationStatus, TestUtil}
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
+import org.apache.gearpump.jarstore.JarStoreClient
 import org.apache.gearpump.streaming.executor.Executor.{ExecutorConfig, ExecutorSummary, GetExecutorSummary, QueryExecutorConfig}
-import org.apache.gearpump.util.LogUtil
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
----------------------------------------------------------------------
diff --git a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
index 39c0de0..07e44c1 100644
--- a/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
+++ b/services/jvm/src/test/scala/org/apache/gearpump/services/MasterServiceSpec.scala
@@ -40,7 +40,7 @@ import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterData, AppMastersD
 import org.apache.gearpump.cluster.MasterToClient._
 import org.apache.gearpump.cluster.TestUtil
 import org.apache.gearpump.cluster.worker.{WorkerId, WorkerSummary}
-import org.apache.gearpump.jarstore.{JarStoreClient, JarStoreServer}
+import org.apache.gearpump.jarstore.JarStoreClient
 import org.apache.gearpump.services.MasterService.{BuiltinPartitioners, SubmitApplicationRequest}
 // NOTE: This cannot be removed!!!
 import org.apache.gearpump.services.util.UpickleUtil._
@@ -166,7 +166,7 @@ class MasterServiceSpec extends FlatSpec with ScalatestRouteTest
 
   private def entity(file: File)(implicit ec: ExecutionContext): Future[RequestEntity] = {
     val entity = HttpEntity(MediaTypes.`application/octet-stream`, file.length(),
-      FileIO.fromFile(file, chunkSize = 100000))
+      FileIO.fromPath(file.toPath, chunkSize = 100000))
 
     val body = Source.single(
       Multipart.FormData.BodyPart(

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
index 8a76916..f0e4f84 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/ClusterMessage.scala
@@ -18,11 +18,9 @@
 
 package org.apache.gearpump.streaming
 
-import scala.language.existentials
-
 import akka.actor.ActorRef
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.appmaster.WorkerInfo
 import org.apache.gearpump.cluster.scheduler.Resource
 import org.apache.gearpump.streaming.appmaster.TaskRegistry.TaskLocations
@@ -63,7 +61,7 @@ object AppMasterToExecutor {
   case class StartAllTasks(dagVersion: Int)
 
   case class StartDynamicDag(dagVersion: Int)
-  case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: TimeStamp)
+  case class TaskRegistered(taskId: TaskId, sessionId: Int, startClock: MilliSeconds)
   case class TaskRejected(taskId: TaskId)
 
   case object RestartClockService

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 435414b..f15e1b3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -18,10 +18,10 @@
 
 package org.apache.gearpump.streaming
 
-import scala.language.implicitConversions
 import scala.reflect.ClassTag
 import akka.actor.ActorSystem
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster._
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription, PartitionerObject}
 import org.apache.gearpump.streaming.appmaster.AppMaster
@@ -102,8 +102,8 @@ object Processor {
  * When input message's timestamp is beyond current processor's lifetime,
  * then it will not be processed by this processor.
  */
-case class LifeTime(birth: TimeStamp, death: TimeStamp) {
-  def contains(timestamp: TimeStamp): Boolean = {
+case class LifeTime(birth: MilliSeconds, death: MilliSeconds) {
+  def contains(timestamp: MilliSeconds): Boolean = {
     timestamp >= birth && timestamp < death
   }
 
@@ -113,8 +113,7 @@ case class LifeTime(birth: TimeStamp, death: TimeStamp) {
 }
 
 object LifeTime {
-  // MAX_TIME_MILLIS is Long.MaxValue - 1
-  val Immortal = LifeTime(MIN_TIME_MILLIS, MAX_TIME_MILLIS + 1)
+  val Immortal = LifeTime(Time.MIN_TIME_MILLIS, Time.UNREACHABLE)
 }
 
 /**
@@ -159,7 +158,7 @@ object StreamApplication {
     val graph = dag.mapVertex { processor =>
       val updatedProcessor = ProcessorToProcessorDescription(indices(processor), processor)
       updatedProcessor
-    }.mapEdge { (node1, edge, node2) =>
+    }.mapEdge { (_, edge, _) =>
       PartitionerDescription(new PartitionerObject(
         Option(edge).getOrElse(StreamApplication.hashPartitioner)))
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
index ba4b058..3c5c7da 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/AppMaster.scala
@@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.appmaster
 import java.lang.management.ManagementFactory
 
 import akka.actor._
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppMasterToMaster.ApplicationStatusChanged
 import org.apache.gearpump.cluster.ClientToMaster._
 import org.apache.gearpump.cluster.MasterToAppMaster.{AppMasterActivated, AppMasterDataDetailRequest, ReplayFromTimestampWindowTrailingEdge}
@@ -67,7 +67,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
   import akka.pattern.ask
   private implicit val dispatcher = context.dispatcher
 
-  private val startTime: TimeStamp = System.currentTimeMillis()
+  private val startTime: MilliSeconds = System.currentTimeMillis()
 
   private val LOG: Logger = LogUtil.getLogger(getClass, app = appId)
   LOG.info(s"AppMaster[$appId] is launched by $username, app: $app xxxxxxxxxxxxxxxxx")
@@ -322,7 +322,7 @@ class AppMaster(appContext: AppMasterContext, app: AppDescription) extends Appli
       context.stop(self)
   }
 
-  private def getMinClock: Future[TimeStamp] = {
+  private def getMinClock: Future[MilliSeconds] = {
     clockService match {
       case Some(service) =>
         (service ? GetLatestMinClock).asInstanceOf[Future[LatestMinClock]].map(_.clock)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
index 77a966a..90141d4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/ClockService.scala
@@ -25,7 +25,8 @@ import java.util.concurrent.TimeUnit
 
 import akka.actor.{Actor, ActorRef, Cancellable, Stash}
 import com.google.common.primitives.Longs
-import org.apache.gearpump.{MIN_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.ClientToMaster.GetStallingTasks
 import org.apache.gearpump.streaming.AppMasterToMaster.StallingTasks
 import org.apache.gearpump.streaming._
@@ -38,7 +39,6 @@ import org.apache.gearpump.util.LogUtil
 import org.slf4j.Logger
 
 import scala.concurrent.duration.FiniteDuration
-import scala.language.implicitConversions
 
 /**
  * Maintains a global view of message timestamp in the application
@@ -61,8 +61,8 @@ class ClockService(
     LOG.info("Initializing Clock service, get snapshotted StartClock ....")
     store.get(START_CLOCK).map { clock =>
       // check for null first since
-      // (null).asInstanceOf[TimeStamp] is zero
-      val startClock = if (clock != null) clock.asInstanceOf[TimeStamp] else MIN_TIME_MILLIS
+      // (null).asInstanceOf[MilliSeconds] is zero
+      val startClock = if (clock != null) clock.asInstanceOf[MilliSeconds] else Time.MIN_TIME_MILLIS
 
       minCheckpointClock = Some(startClock)
 
@@ -89,32 +89,32 @@ class ClockService(
   // We use Array instead of List for Performance consideration
   private var processorClocks = Array.empty[ProcessorClock]
 
-  private var checkpointClocks: Map[TaskId, Vector[TimeStamp]] = _
+  private var checkpointClocks: Map[TaskId, Vector[MilliSeconds]] = _
 
-  private var minCheckpointClock: Option[TimeStamp] = None
+  private var minCheckpointClock: Option[MilliSeconds] = None
 
   private def checkpointEnabled(processor: ProcessorDescription): Boolean = {
     val taskConf = processor.taskConf
     taskConf != null && taskConf.getBoolean("state.checkpoint.enable").contains(true)
   }
 
-  private def resetCheckpointClocks(dag: DAG, startClock: TimeStamp): Unit = {
+  private def resetCheckpointClocks(dag: DAG, startClock: MilliSeconds): Unit = {
     this.checkpointClocks = dag.processors.filter(startClock < _._2.life.death)
       .filter { case (_, processor) =>
         checkpointEnabled(processor)
       }.flatMap { case (id, processor) =>
-      (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[TimeStamp])
+      (0 until processor.parallelism).map(TaskId(id, _) -> Vector.empty[MilliSeconds])
     }
     if (this.checkpointClocks.isEmpty) {
       minCheckpointClock = None
     }
   }
 
-  private def initDag(startClock: TimeStamp): Unit = {
+  private def initDag(startClock: MilliSeconds): Unit = {
     recoverDag(this.dag, startClock)
   }
 
-  private def recoverDag(dag: DAG, startClock: TimeStamp): Unit = {
+  private def recoverDag(dag: DAG, startClock: MilliSeconds): Unit = {
     this.clocks = dag.processors.filter(startClock < _._2.life.death).
       map { pair =>
         val (processorId, processor) = pair
@@ -131,7 +131,7 @@ class ClockService(
     resetCheckpointClocks(dag, startClock)
   }
 
-  private def dynamicDAG(dag: DAG, startClock: TimeStamp): Unit = {
+  private def dynamicDAG(dag: DAG, startClock: MilliSeconds): Unit = {
     val newClocks = dag.processors.filter(startClock < _._2.life.death).
       map { pair =>
         val (processorId, processor) = pair
@@ -208,7 +208,7 @@ class ClockService(
     }
   }
 
-  private def getUpStreamMinClock(processorId: ProcessorId): Option[TimeStamp] = {
+  private def getUpStreamMinClock(processorId: ProcessorId): Option[MilliSeconds] = {
     upstreamClocks.get(processorId).map(ProcessorClocks.minClock)
   }
 
@@ -304,7 +304,7 @@ class ClockService(
     }
   }
 
-  private def minClock: TimeStamp = {
+  private def minClock: MilliSeconds = {
     ProcessorClocks.minClock(processorClocks)
   }
 
@@ -314,7 +314,7 @@ class ClockService(
     healthChecker.check(minTimestamp, clocks, dag, System.currentTimeMillis())
   }
 
-  private def getStartClock: TimeStamp = {
+  private def getStartClock: MilliSeconds = {
     minCheckpointClock.getOrElse(minClock)
   }
 
@@ -322,7 +322,7 @@ class ClockService(
     store.put(START_CLOCK, getStartClock)
   }
 
-  private def updateCheckpointClocks(task: TaskId, time: TimeStamp): Unit = {
+  private def updateCheckpointClocks(task: TaskId, time: MilliSeconds): Unit = {
     val clocks = checkpointClocks(task) :+ time
     checkpointClocks += task -> clocks
 
@@ -341,17 +341,17 @@ object ClockService {
   case object HealthCheck
 
   class ProcessorClock(val processorId: ProcessorId, val life: LifeTime, val parallelism: Int,
-      private var _min: TimeStamp = MIN_TIME_MILLIS,
-      private var _taskClocks: Array[TimeStamp] = null) {
+      private var _min: MilliSeconds = Time.MIN_TIME_MILLIS,
+      private var _taskClocks: Array[MilliSeconds] = null) {
 
     def copy(life: LifeTime): ProcessorClock = {
       new ProcessorClock(processorId, life, parallelism, _min, _taskClocks)
     }
 
-    def min: TimeStamp = _min
-    def taskClocks: Array[TimeStamp] = _taskClocks
+    def min: MilliSeconds = _min
+    def taskClocks: Array[MilliSeconds] = _taskClocks
 
-    def init(startClock: TimeStamp): Unit = {
+    def init(startClock: MilliSeconds): Unit = {
       if (taskClocks == null) {
         this._min = startClock
         this._taskClocks = new Array(parallelism)
@@ -359,7 +359,7 @@ object ClockService {
       }
     }
 
-    def updateMinClock(taskIndex: Int, clock: TimeStamp): Unit = {
+    def updateMinClock(taskIndex: Int, clock: MilliSeconds): Unit = {
       taskClocks(taskIndex) = clock
       _min = Longs.min(taskClocks: _*)
     }
@@ -382,8 +382,8 @@ object ClockService {
 
     /** Check for stalling tasks */
     def check(
-        currentMinClock: TimeStamp, processorClocks: Map[ProcessorId, ProcessorClock],
-        dag: DAG, now: TimeStamp): Unit = {
+        currentMinClock: MilliSeconds, processorClocks: Map[ProcessorId, ProcessorClock],
+        dag: DAG, now: MilliSeconds): Unit = {
       var isClockStalling = false
       if (null == minClock || currentMinClock > minClock.appClock) {
         minClock = ClockValue(systemClock = now, appClock = currentMinClock)
@@ -424,7 +424,7 @@ object ClockService {
   }
 
   object HealthChecker {
-    case class ClockValue(systemClock: TimeStamp, appClock: TimeStamp) {
+    case class ClockValue(systemClock: MilliSeconds, appClock: MilliSeconds) {
       def prettyPrint: String = {
         "(system clock: " + new Date(systemClock).toString + ", app clock: " + appClock + ")"
       }
@@ -434,7 +434,7 @@ object ClockService {
   object ProcessorClocks {
 
     // Get the Min clock of all processors
-    def minClock(clock: Array[ProcessorClock]): TimeStamp = {
+    def minClock(clock: Array[ProcessorClock]): MilliSeconds = {
       var i = 0
       var min = if (clock.length == 0) 0L else clock(0).min
       while (i < clock.length) {
@@ -446,7 +446,7 @@ object ClockService {
   }
 
   case class ChangeToNewDAG(dag: DAG)
-  case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, TimeStamp])
+  case class ChangeToNewDAGSuccess(clocks: Map[ProcessorId, MilliSeconds])
 
-  case class StoredStartClock(clock: TimeStamp)
+  case class StoredStartClock(clock: MilliSeconds)
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
index e023cdf..e31f863 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/JarScheduler.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
 import akka.actor._
 import akka.pattern.ask
 import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppJar
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
@@ -47,7 +47,7 @@ class JarScheduler(appId: Int, appName: String, config: Config, factory: ActorRe
   private implicit val timeout = Constants.FUTURE_TIMEOUT
 
   /** Set the current DAG version active */
-  def setDag(dag: DAG, startClock: Future[TimeStamp]): Unit = {
+  def setDag(dag: DAG, startClock: Future[MilliSeconds]): Unit = {
     actor ! TransitToNewDag
     startClock.map { start =>
       actor ! NewDag(dag, start)
@@ -82,7 +82,7 @@ object JarScheduler {
 
   case class ResourceRequestDetail(jar: AppJar, requests: Array[ResourceRequest])
 
-  case class NewDag(dag: DAG, startTime: TimeStamp)
+  case class NewDag(dag: DAG, startTime: MilliSeconds)
 
   case object TransitToNewDag
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
index 126ab92..1214cd0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/StreamAppMasterSummary.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.appmaster
 
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.AppMasterToMaster.AppMasterSummary
 import org.apache.gearpump.cluster.{ApplicationStatus, UserConfig}
 import org.apache.gearpump.streaming.appmaster.AppMaster.ExecutorBrief
@@ -32,10 +32,10 @@ case class StreamAppMasterSummary(
     appId: Int,
     appName: String = null,
     actorPath: String = null,
-    clock: TimeStamp = 0L,
+    clock: MilliSeconds = 0L,
     status: ApplicationStatus = ApplicationStatus.ACTIVE,
-    startTime: TimeStamp = 0L,
-    uptime: TimeStamp = 0L,
+    startTime: MilliSeconds = 0L,
+    uptime: MilliSeconds = 0L,
     user: String = null,
     homeDirectory: String = "",
     logFile: String = "",

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
index 51c4de9..bae5c02 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/appmaster/TaskManager.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
 
 import akka.actor._
 import akka.pattern.ask
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.MasterToAppMaster.ReplayFromTimestampWindowTrailingEdge
 import org.apache.gearpump.streaming.AppMasterToExecutor._
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, RegisterTask, UnRegisterTask}
@@ -86,11 +86,11 @@ private[appmaster] class TaskManager(
   dagManager ! WatchChange(watcher = self)
   executorManager ! SetTaskManager(self)
 
-  private def getStartClock: Future[TimeStamp] = {
+  private def getStartClock: Future[MilliSeconds] = {
     (clockService ? GetStartClock).asInstanceOf[Future[StartClock]].map(_.clock)
   }
 
-  private var startClock: Future[TimeStamp] = getStartClock
+  private var startClock: Future[MilliSeconds] = getStartClock
 
   def receive: Receive = applicationReady(DagReadyState.empty)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index 25a0929..8d3ffb3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -17,8 +17,6 @@
  */
 package org.apache.gearpump.streaming.dsl.api.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
 object FilterFunction {
 
   def apply[T](fn: T => Boolean): FilterFunction[T] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
index 9ff44a8..1525d6e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.streaming.dsl.api.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
 /**
  * Combines input into an accumulator.
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index a4fdca6..7880c2f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -17,8 +17,6 @@
  */
 package org.apache.gearpump.streaming.dsl.api.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
 object MapFunction {
 
   def apply[T, R](fn: T => R): MapFunction[T, R] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
new file mode 100644
index 0000000..b90ba28
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def teardown(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index 11e2416..adad878 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.streaming.dsl.javaapi.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+import org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction
 
 /**
  * Transforms one input into zero or more outputs of possibly different types.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
new file mode 100644
index 0000000..6d43f16
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+
+// scalastyle:off line.size.limit
+/**
+ *
+ * The architecture of Gearpump Streaming DSL consists of several layers:
+ *
+ *   * User facing [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. Stream is created by [[org.apache.gearpump.streaming.dsl.scalaapi.StreamApp]]
+ *     from input source like Kafka or by applying high level operations (e.g. flatMap, window, groupBy) to user defined functions(UDFs). UDFs are subclasses
+ *     of [[org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction]], represented by [[org.apache.gearpump.streaming.dsl.plan.Op]]
+ *     in the underlying [[org.apache.gearpump.util.Graph]].
+ *   * [[org.apache.gearpump.streaming.dsl.plan.Planner]], responsible for interpreting the Op Graph, optimizing it and building a low level Graph of
+ *     [[org.apache.gearpump.streaming.Processor]]. Finally, it creates a runnable Graph of [[org.apache.gearpump.streaming.task.Task]].
+ *   * The execution layer is usually composed of the following four tasks.
+ *
+ *     * [[org.apache.gearpump.streaming.source.DataSourceTask]] for [[org.apache.gearpump.streaming.source.DataSource]] to ingest data into Gearpump
+ *     * [[org.apache.gearpump.streaming.sink.DataSinkTask]] for [[org.apache.gearpump.streaming.sink.DataSink]] to write data out.
+ *     * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]]
+ *     * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops.
+ *
+ *     All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally
+ *     runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with
+ *     [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually
+ *     executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]].
+ *
+ */
+// scalastyle:on line.size.limit
+package object dsl {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 2a45a8f..c37ced6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -35,16 +35,29 @@ import scala.reflect.ClassTag
 
 object Op {
 
+  /**
+   * Concatenates two descriptions with "." or returns one if the other is empty.
+   */
   def concatenate(desc1: String, desc2: String): String = {
     if (desc1 == null || desc1.isEmpty) desc2
     else if (desc2 == null || desc2.isEmpty) desc1
     else desc1 + "." + desc2
   }
 
+  /**
+   * Concatenates two configs according to the following rules
+   *   1. The first config cannot be null.
+   *   2. The first config is returned if the second config is null
+   *   3. The second config takes precedence for overlapping config keys
+   */
   def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
     config1.withConfig(config2)
   }
 
+  /**
+   * This adds a [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] in
+   * [[GlobalWindows]] if a targeting [[Task]] has no executable UDF.
+   */
   def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
       processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
     if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
@@ -59,22 +72,37 @@ object Op {
 }
 
 /**
- * This is a vertex on the logical plan.
+ * This is a vertex on the logical Graph, representing user defined functions in
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL.
  */
 sealed trait Op {
 
+  /**
+   * This comes from user function description and is used to display it on front end.
+   */
   def description: String
 
+  /**
+   * This will ship user function to [[org.apache.gearpump.streaming.task.Task]] to be executed.
+   */
   def userConfig: UserConfig
 
+  /**
+   * This creates a new Op by merging their user functions, user configs and descriptions.
+   */
   def chain(op: Op)(implicit system: ActorSystem): Op
 
+  /**
+   *  This creates a Processor after chaining.
+   */
   def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
 }
 
 /**
- * This represents a low level Processor.
+ * This represents a low level Processor. It is deprecated since it
+ * doesn't work with other Ops.
  */
+@deprecated
 case class ProcessorOp[T <: Task](
     processor: Class[T],
     parallelism: Int,
@@ -99,7 +127,8 @@ case class ProcessorOp[T <: Task](
 }
 
 /**
- * This represents a DataSource.
+ * This represents a DataSource and creates a
+ * [[org.apache.gearpump.streaming.source.DataSourceTask]]
  */
 case class DataSourceOp(
     dataSource: DataSource,
@@ -142,7 +171,7 @@ case class DataSourceOp(
 }
 
 /**
- * This represents a DataSink.
+ * This represents a DataSink and creates a [[org.apache.gearpump.streaming.sink.DataSinkTask]].
  */
 case class DataSinkOp(
     dataSink: DataSink,
@@ -163,7 +192,7 @@ case class DataSinkOp(
 /**
  * This represents operations that can be chained together
  * (e.g. flatMap, map, filter, reduce) and further chained
- * to another Op to be used
+ * to another Op to be executed
  */
 case class TransformOp[IN, OUT](
     fn: FunctionRunner[IN, OUT],
@@ -201,61 +230,7 @@ case class TransformOp[IN, OUT](
   }
 }
 
-/**
- * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
- * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
- * Otherwise, it will be translated to a Processor of TransformTask.
- */
-case class WindowTransformOp[IN, OUT](
-    windowRunner: WindowRunner[IN, OUT],
-    description: String,
-    userConfig: UserConfig) extends Op {
-
-  override def chain(other: Op)(implicit system: ActorSystem): Op = {
-    other match {
-      case op: WindowTransformOp[OUT, _] =>
-        WindowTransformOp(
-          WindowRunnerAT(windowRunner, op.windowRunner),
-          Op.concatenate(description, op.description),
-          Op.concatenate(userConfig, op.userConfig)
-        )
-      case _ =>
-        throw new OpChainException(this, other)
-    }
-  }
-
-  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
-    Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
-      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
-  }
-}
-
-/**
- * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
- * It will later be chained to a WindowOp, which results in two WindowTransformOps.
- * Finally, they will be chained to a single WindowTransformOp.
- */
-case class TransformWindowTransformOp[IN, MIDDLE, OUT](
-    transformOp: TransformOp[IN, MIDDLE],
-    windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
-
-  override def description: String = {
-    throw new UnsupportedOperationException(s"description is not supported on $this")
-  }
-
-  override def userConfig: UserConfig = {
-    throw new UnsupportedOperationException(s"userConfig is not supported on $this")
-  }
-
-  override def chain(op: Op)(implicit system: ActorSystem): Op = {
-    throw new UnsupportedOperationException(s"chain is not supported on $this")
-  }
 
-  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    WindowOp(GlobalWindows()).chain(this).toProcessor
-  }
-}
 
 /**
  * This represents a window aggregation, together with a following TransformOp
@@ -290,7 +265,14 @@ case class WindowOp(
 }
 
 /**
- * This represents a Processor with groupBy and window aggregation
+ * This represents an operation with groupBy followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]].
+ * However, it's possible a window function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
  */
 case class GroupByOp[IN, GROUP] private(
     groupBy: IN => GROUP,
@@ -325,7 +307,14 @@ case class GroupByOp[IN, GROUP] private(
 }
 
 /**
- * This represents a Processor transforming merged streams
+ * This represents an operation with merge followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ * However, it's possible a merge function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
  */
 case class MergeOp(
     parallelism: Int = 1,
@@ -357,7 +346,65 @@ case class MergeOp(
 }
 
 /**
- * This is an edge on the logical plan.
+ * This is an intermediate operation, produced by chaining [[WindowOp]] and [[TransformOp]].
+ * Usually, it will be chained to a [[DataSourceOp]], [[GroupByOp]] or [[MergeOp]]. Nonetheless,
+ * Op with more than 1 outgoing edge or incoming edge cannot be chained. In that case,
+ * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ */
+private case class WindowTransformOp[IN, OUT](
+    windowRunner: WindowRunner[IN, OUT],
+    description: String,
+    userConfig: UserConfig) extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: WindowTransformOp[OUT, _] =>
+        WindowTransformOp(
+          WindowRunnerAT(windowRunner, op.windowRunner),
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig, op.userConfig)
+        )
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+    Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+  }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining [[TransformOp]] and
+ * [[WindowTransformOp]]. It will later be chained to a [[WindowOp]], which results in
+ * two [[WindowTransformOp]]s. Finally, they will be chained to a single WindowTransformOp.
+ */
+private case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+    transformOp: TransformOp[IN, MIDDLE],
+    windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+  override def description: String = {
+    throw new UnsupportedOperationException(s"description is not supported on $this")
+  }
+
+  override def userConfig: UserConfig = {
+    throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+  }
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new UnsupportedOperationException(s"chain is not supported on $this")
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    WindowOp(GlobalWindows()).chain(this).toProcessor
+  }
+}
+
+/**
+ * This is an edge on the logical plan. It defines whether data should be transported locally
+ * or shuffled remotely between [[Op]].
  */
 trait OpEdge
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index b1b39c9..04b5337 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -24,11 +24,26 @@ import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph
 
+/**
+ * This class is responsible for turning the high level
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL into low level
+ * [[org.apache.gearpump.streaming.Processor]] API.
+ */
 class Planner {
 
   /**
-   * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
-   * level Graph API.
+   * This method interprets a Graph of [[Op]] and creates a Graph of
+   * [[org.apache.gearpump.streaming.Processor]].
+   *
+   * It firstly reversely traverses the Graph from a leaf Op and merges it with
+   * its downstream Op according to the following rules.
+   *
+   *   1. The Op has only one outgoing edge and the downstream Op has only one incoming edge
+   *   2. Neither Op is [[ProcessorOp]]
+   *   3. The edge is [[Direct]]
+   *
+   * Finally the vertices of the optimized Graph are translated to Processors
+   * and the edges to Partitioners.
    */
   def plan(dag: Graph[Op, OpEdge])
     (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
@@ -43,6 +58,7 @@ class Planner {
             case _ => new HashPartitioner
           }
         case Direct =>
+          // FIXME: This is never used
           new CoLocationPartitioner
       }
     }.mapVertex(_.toProcessor)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index ef2753e..d0c733e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -210,6 +210,7 @@ class Stream[T](
    * @param parallelism  parallelism level
    * @return  new stream after processing with type [R]
    */
+  @deprecated
   def process[R](
       processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
       description: String = "process"): Stream[R] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index bce8c0c..17d77bc 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -62,6 +62,17 @@ class StreamApp(
     val dag = planner.plan(graph)
     StreamApplication(name, dag, userConfig)
   }
+
+  def source[T](dataSource: DataSource, parallelism: Int = 1,
+      conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
+    val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
+    graph.addVertex(sourceOp)
+    new Stream[T](graph, sourceOp)
+  }
+
+  def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+    this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
+  }
 }
 
 object StreamApp {
@@ -73,20 +84,6 @@ object StreamApp {
   implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
     streamApp.plan()
   }
-
-  implicit class Source(app: StreamApp) extends java.io.Serializable {
-
-    def source[T](dataSource: DataSource, parallelism: Int = 1,
-        conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
-      implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
-
-    def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
-      this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
-    }
-  }
 }
 
 /** A test message source which generated message sequence repeatedly. */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
index 252b5bd..2d26df6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.streaming.dsl.scalaapi.functions
 
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, SerializableFunction}
 import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
 
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
deleted file mode 100644
index ab88bf1..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.scalaapi.functions
-
-/**
- * Superclass for all user defined function interfaces.
- * This ensures all functions are serializable and provides common methods
- * like setup and teardown. Users should not extend this class directly
- * but subclasses like [[FlatMapFunction]].
- */
-abstract class SerializableFunction extends java.io.Serializable {
-
-  def setup(): Unit = {}
-
-  def teardown(): Unit = {}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
index a4524a8..46b8e92 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
@@ -17,8 +17,18 @@
  */
 package org.apache.gearpump.streaming.dsl.window.api
 
+
+/**
+ * Determines relationship between multiple results for the same window.
+ */
 sealed trait AccumulationMode
 
+/**
+ * Window results are accumulated.
+ */
 case object Accumulating extends AccumulationMode
 
+/**
+ * Window results are independent.
+ */
 case object Discarding extends AccumulationMode

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
index 02d52a0..b9a8695 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -17,7 +17,16 @@
  */
 package org.apache.gearpump.streaming.dsl.window.api
 
+/**
+ * Determines when window results are emitted.
+ * For now, [[EventTimeTrigger]] is used for all applications.
+ */
+// TODO: Make this a public API
 sealed trait Trigger
 
+/**
+ * Triggers emitting when watermark past the end of window on event time.
+ */
+// FIXME: This is no more than a tag now and the logic is hard corded in WindowRunner
 case object EventTimeTrigger extends Trigger
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index a2f51c7..4db02e7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -19,7 +19,8 @@ package org.apache.gearpump.streaming.dsl.window.api
 
 import java.time.{Duration, Instant}
 
-import org.apache.gearpump.{MIN_TIME_MILLIS, MAX_TIME_MILLIS, TimeStamp}
+import org.apache.gearpump.Time
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.dsl.window.impl.Window
 
 import scala.collection.mutable.ArrayBuffer
@@ -32,8 +33,14 @@ object WindowFunction {
   }
 }
 
+/**
+ * Determines how elements are assigned to windows for calculation.
+ */
 trait WindowFunction {
 
+  /**
+   * Assigns elements into windows.
+   */
   def apply[T](context: WindowFunction.Context[T]): Array[Window]
 
   def isNonMerging: Boolean
@@ -46,10 +53,13 @@ abstract class NonMergingWindowFunction extends WindowFunction {
 
 object GlobalWindowFunction {
 
-  val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
-    Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+  val globalWindow = Array(Window(Instant.ofEpochMilli(Time.MIN_TIME_MILLIS),
+    Instant.ofEpochMilli(Time.MAX_TIME_MILLIS)))
 }
 
+/**
+ * All elements are assigned to the same global window for calculation.
+ */
 case class GlobalWindowFunction() extends NonMergingWindowFunction {
 
   override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
@@ -57,6 +67,12 @@ case class GlobalWindowFunction() extends NonMergingWindowFunction {
   }
 }
 
+/**
+ * Elements are assigned to non-merging sliding windows for calculation.
+ *
+ * @param size window size
+ * @param step window step to slide forward
+ */
 case class SlidingWindowFunction(size: Duration, step: Duration)
   extends NonMergingWindowFunction {
 
@@ -80,11 +96,17 @@ case class SlidingWindowFunction(size: Duration, step: Duration)
     windows.toArray
   }
 
-  private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+  private def lastStartFor(timestamp: MilliSeconds, windowStep: Long): MilliSeconds = {
     timestamp - (timestamp + windowStep) % windowStep
   }
 }
 
+/**
+ * Elements are assigned to merging windows for calculation. Windows are merged
+ * if their distance is within the defined gap.
+ *
+ * @param gap session gap
+ */
 case class SessionWindowFunction(gap: Duration) extends WindowFunction {
 
   override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index d53bc96..e15b5c4 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -20,11 +20,14 @@ package org.apache.gearpump.streaming.dsl.window.api
 import java.time.Duration
 
 /**
- * Defines how to apply window functions.
+ * User facing Window DSL.
+ * Defines how to apply [[WindowFunction]], [[Trigger]]
+ * and [[AccumulationMode]].
  *
  * @param windowFn how to divide windows
  * @param trigger when to trigger window result
- * @param accumulationMode whether to accumulate results across windows
+ * @param accumulationMode whether to accumulate window results
+ * @param description window description
  */
 case class Windows(
     windowFn: WindowFunction,
@@ -47,6 +50,11 @@ case class Windows(
 
 object GlobalWindows {
 
+  /**
+   * Defines a [[GlobalWindowFunction]].
+   *
+   * @return a Window definition
+   */
   def apply(): Windows = {
     Windows(GlobalWindowFunction(), description = "globalWindows")
   }
@@ -55,7 +63,7 @@ object GlobalWindows {
 object FixedWindows {
 
   /**
-   * Defines a FixedWindow.
+   * Defines a non-overlapping [[SlidingWindowFunction]].
    *
    * @param size window size
    * @return a Window definition
@@ -68,7 +76,7 @@ object FixedWindows {
 object SlidingWindows {
 
   /**
-   * Defines a SlidingWindow.
+   * Defines a overlapping [[SlidingWindowFunction]].
    *
    * @param size window size
    * @param step window step to slide forward
@@ -82,7 +90,7 @@ object SlidingWindows {
 object SessionWindows {
 
   /**
-   * Defines a SessionWindow.
+   * Defines a [[SessionWindowFunction]].
    *
    * @param gap session gap
    * @return a Window definition

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
deleted file mode 100644
index e978983..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.window.impl
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.window.api.Trigger
-
-trait ReduceFnRunner {
-
-  def process(message: Message): Unit
-
-  def onTrigger(trigger: Trigger): Unit
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 870c334..d6d08c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -19,22 +19,16 @@ package org.apache.gearpump.streaming.dsl.window.impl
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.{Message, TimeStamp}
-import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.task.Task
+import org.apache.gearpump.Time.MilliSeconds
 
 object Window {
-  def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Window = {
+  def ofEpochMilli(startTime: MilliSeconds, endTime: MilliSeconds): Window = {
     Window(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
   }
 }
 
 /**
- * A window unit including startTime and excluding endTime.
+ * A window unit from startTime(including) to endTime(excluding).
  */
 case class Window(startTime: Instant, endTime: Instant) extends Comparable[Window] {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 17a9525..ee3c067 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -31,11 +31,23 @@ import org.apache.gearpump.streaming.task.TaskUtil
 
 import scala.collection.mutable.ArrayBuffer
 
+/**
+ * Inputs for [[WindowRunner]].
+ */
 case class TimestampedValue[T](value: T, timestamp: Instant)
 
+/**
+ * Outputs triggered by [[WindowRunner]]
+ */
 case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]],
     watermark: Instant)
 
+/**
+ * This is responsible for executing window calculation.
+ *   1. Groups elements into windows as defined by window function
+ *   2. Applies window calculation to each group
+ *   3. Emits results on triggering
+ */
 trait WindowRunner[IN, OUT] extends java.io.Serializable {
 
   def process(timestampedValue: TimestampedValue[IN]): Unit
@@ -43,6 +55,10 @@ trait WindowRunner[IN, OUT] extends java.io.Serializable {
   def trigger(time: Instant): TriggeredOutputs[OUT]
 }
 
+/**
+ * A composite WindowRunner that first executes its left child and feeds results
+ * into result child.
+ */
 case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
     right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
 
@@ -57,6 +73,9 @@ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
   }
 }
 
+/**
+ * Default implementation for [[WindowRunner]].
+ */
 class DefaultWindowRunner[IN, OUT](
     windows: Windows,
     fnRunner: FunctionRunner[IN, OUT])
@@ -137,11 +156,15 @@ class DefaultWindowRunner[IN, OUT](
           }
           onTrigger(outputs, newWmk)
         } else {
-          // minimum of end of last triggered window and start of first un-triggered window
+          // The output watermark is the minimum of end of last triggered window
+          // and start of first un-triggered window
           TriggeredOutputs(outputs, TaskUtil.min(wmk, firstWin.startTime))
         }
       } else {
+        // All windows have been triggered.
         if (time == Watermark.MAX) {
+          // This means there will be no more inputs
+          // so it's safe to advance to the maximum watermark.
           TriggeredOutputs(outputs, Watermark.MAX)
         } else {
           TriggeredOutputs(outputs, wmk)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
index 8f8b7ab..058d36b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -22,7 +22,7 @@ import java.util
 
 import com.google.common.collect.Iterators
 import com.typesafe.config.Config
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.ClientToMaster.ReadOption
 import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
 import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
@@ -64,7 +64,7 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
   }
 
   def aggregate(
-      readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: TimeStamp)
+      readOption: ReadOption.ReadOption, inputs: Iterator[HistoryMetricsItem], now: MilliSeconds)
     : List[HistoryMetricsItem] = {
     val (start, end, interval) = getTimeRange(readOption, now)
     val timeSlotsCount = ((end - start - 1) / interval + 1).toInt
@@ -103,8 +103,8 @@ class ProcessorAggregator(historyMetricConfig: HistoryMetricsConfig) extends Met
   }
 
   // Returns (start, end, interval)
-  private def getTimeRange(readOption: ReadOption.ReadOption, now: TimeStamp)
-    : (TimeStamp, TimeStamp, TimeStamp) = {
+  private def getTimeRange(readOption: ReadOption.ReadOption, now: MilliSeconds)
+    : (MilliSeconds, MilliSeconds, MilliSeconds) = {
     readOption match {
       case ReadOption.ReadRecent =>
         val end = now
@@ -229,7 +229,7 @@ object ProcessorAggregator {
     var p99: Double = 0
     var p999: Double = 0
 
-    var startTime: TimeStamp = Long.MaxValue
+    var startTime: MilliSeconds = Long.MaxValue
 
     override def aggregate(item: HistoryMetricsItem): Unit = {
       val input = item.value.asInstanceOf[Histogram]
@@ -263,7 +263,7 @@ object ProcessorAggregator {
     var m1: Double = 0
     var rateUnit: String = null
 
-    var startTime: TimeStamp = Long.MaxValue
+    var startTime: MilliSeconds = Long.MaxValue
 
     override def aggregate(item: HistoryMetricsItem): Unit = {
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
index 0db44f2..932c750 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala
@@ -52,4 +52,8 @@ class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: D
     LOG.info("closing data sink...")
     sink.close()
   }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    context.updateWatermark(watermark)
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 14abff8..607af85 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -19,7 +19,7 @@ package org.apache.gearpump.streaming.source
 
 import java.time.Instant
 
-import org.apache.gearpump.{MAX_TIME_MILLIS, MIN_TIME_MILLIS, Message}
+import org.apache.gearpump.{Message, Time}
 
 /**
  * message used by source task to report source watermark.
@@ -28,9 +28,14 @@ case class Watermark(instant: Instant) {
   def toMessage: Message = Message("watermark", instant)
 }
 
+/**
+ * All input data with event times less than watermark have been observed
+ */
 object Watermark {
 
-  val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1)
+  // all input data have been observed
+  val MAX: Instant = Instant.ofEpochMilli(Time.MAX_TIME_MILLIS + 1)
 
-  val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
+  // no input data have been observed
+  val MIN: Instant = Instant.ofEpochMilli(Time.MIN_TIME_MILLIS)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
index 0e2f83a..0118c07 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/MonoidState.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.state.api
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * MonoidState uses Algebird Monoid to aggregate state
@@ -37,11 +37,11 @@ abstract class MonoidState[T](monoid: Monoid[T]) extends PersistentState[T] {
 
   override def get: Option[T] = Option(monoid.plus(left, right))
 
-  override def setNextCheckpointTime(nextCheckpointTime: TimeStamp): Unit = {
+  override def setNextCheckpointTime(nextCheckpointTime: MilliSeconds): Unit = {
     checkpointTime = nextCheckpointTime
   }
 
-  protected def updateState(timestamp: TimeStamp, t: T): Unit = {
+  protected def updateState(timestamp: MilliSeconds, t: T): Unit = {
     if (timestamp < checkpointTime) {
       left = monoid.plus(left, t)
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
index 906d331..39b17c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentState.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.state.api
 
-import org.apache.gearpump._
+import org.apache.gearpump.Time.MilliSeconds
 
 /**
  * PersistentState is part of the transaction API
@@ -33,19 +33,19 @@ trait PersistentState[T] {
    * Recovers state to a previous checkpoint
    * usually invoked by the framework
    */
-  def recover(timestamp: TimeStamp, bytes: Array[Byte]): Unit
+  def recover(timestamp: MilliSeconds, bytes: Array[Byte]): Unit
 
   /**
    * Updates state on a new message
    * this is invoked by user
    */
-  def update(timestamp: TimeStamp, t: T): Unit
+  def update(timestamp: MilliSeconds, t: T): Unit
 
   /**
    * Sets next checkpoint time
    * should be invoked by the framework
    */
-  def setNextCheckpointTime(timeStamp: TimeStamp): Unit
+  def setNextCheckpointTime(timeStamp: MilliSeconds): Unit
 
   /**
    * Gets a binary snapshot of state

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
index d3ffaa9..3a3b0a7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/api/PersistentTask.scala
@@ -20,12 +20,13 @@ package org.apache.gearpump.streaming.state.api
 
 import java.time.Instant
 
+import org.apache.gearpump.Message
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.state.impl.{CheckpointManager, PersistentStateConfig}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, UpdateCheckpointClock}
 import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
 import org.apache.gearpump.util.LogUtil
-import org.apache.gearpump.{Message, TimeStamp}
 
 object PersistentTask {
   val LOG = LogUtil.getLogger(getClass)
@@ -41,8 +42,6 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
   extends Task(taskContext, conf) {
   import taskContext._
 
-  import org.apache.gearpump.streaming.state.api.PersistentTask._
-
   val checkpointStoreFactory = conf.getValue[CheckpointStoreFactory](
     PersistentStateConfig.STATE_CHECKPOINT_STORE_FACTORY).get
   val checkpointStore = checkpointStoreFactory.getCheckpointStore(
@@ -99,7 +98,7 @@ abstract class PersistentTask[T](taskContext: TaskContext, conf: UserConfig)
     checkpointManager.close()
   }
 
-  private def reportCheckpointClock(timestamp: TimeStamp): Unit = {
+  private def reportCheckpointClock(timestamp: MilliSeconds): Unit = {
     appMaster ! UpdateCheckpointClock(taskContext.taskId, timestamp)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1cf87bf7/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
index 82b7952..7d9e92a 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/state/impl/CheckpointManager.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.state.impl
 
-import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.streaming.transaction.api.CheckpointStore
 
 /** Manage physical checkpoints to persitent storage like HDFS */
@@ -28,11 +28,11 @@ class CheckpointManager(checkpointInterval: Long,
   private var maxMessageTime: Long = 0L
   private var checkpointTime: Option[Long] = None
 
-  def recover(timestamp: TimeStamp): Option[Array[Byte]] = {
+  def recover(timestamp: MilliSeconds): Option[Array[Byte]] = {
     checkpointStore.recover(timestamp)
   }
 
-  def checkpoint(timestamp: TimeStamp, checkpoint: Array[Byte]): Option[TimeStamp] = {
+  def checkpoint(timestamp: MilliSeconds, checkpoint: Array[Byte]): Option[MilliSeconds] = {
     checkpointStore.persist(timestamp, checkpoint)
     checkpointTime = checkpointTime.collect { case time if maxMessageTime > time =>
       time + (1 + (maxMessageTime - time) / checkpointInterval) * checkpointInterval
@@ -41,7 +41,7 @@ class CheckpointManager(checkpointInterval: Long,
     checkpointTime
   }
 
-  def update(messageTime: TimeStamp): Option[TimeStamp] = {
+  def update(messageTime: MilliSeconds): Option[MilliSeconds] = {
     maxMessageTime = Math.max(maxMessageTime, messageTime)
     if (checkpointTime.isEmpty) {
       checkpointTime = Some((1 + messageTime / checkpointInterval) * checkpointInterval)
@@ -50,15 +50,15 @@ class CheckpointManager(checkpointInterval: Long,
     checkpointTime
   }
 
-  def shouldCheckpoint(upstreamMinClock: TimeStamp): Boolean = {
+  def shouldCheckpoint(upstreamMinClock: MilliSeconds): Boolean = {
     checkpointTime.exists(time => upstreamMinClock >= time)
   }
 
-  def getCheckpointTime: Option[TimeStamp] = checkpointTime
+  def getCheckpointTime: Option[MilliSeconds] = checkpointTime
 
   def close(): Unit = {
     checkpointStore.close()
   }
 
-  private[impl] def getMaxMessageTime: TimeStamp = maxMessageTime
+  private[impl] def getMaxMessageTime: MilliSeconds = maxMessageTime
 }