You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/06/14 02:07:18 UTC

[1/2] incubator-gearpump git commit: [GEARPUMP-316] Decouple groupBy from window

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master c1370d9bf -> 24e1a4546


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index d007e09..ca0135d 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,13 +25,13 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec}
 import org.scalatest.mock.MockitoSugar
@@ -61,16 +61,16 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
 
   "DataSourceOp" should {
 
-    "chain ChainableOp" in {
+    "chain TransformOp" in {
       val dataSource = new AnySource
       val dataSourceOp = DataSourceOp(dataSource)
-      val chainableOp = mock[ChainableOp[Any, Any]]
+      val transformOp = mock[TransformOp[Any, Any]]
       val fn = mock[FunctionRunner[Any, Any]]
+      when(transformOp.fn).thenReturn(fn)
 
-      val chainedOp = dataSourceOp.chain(chainableOp)
+      val chainedOp = dataSourceOp.chain(transformOp)
 
       chainedOp shouldBe a[DataSourceOp]
-      verify(chainableOp).fn
 
       unchainableOps.foreach { op =>
         intercept[OpChainException] {
@@ -79,13 +79,13 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor of DataSource" in {
+    "be translated into processor" in {
       val dataSource = new AnySource
       val dataSourceOp = DataSourceOp(dataSource)
-      val processor = dataSourceOp.getProcessor
+      val processor = dataSourceOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe dataSourceOp.parallelism
-      processor.description shouldBe dataSourceOp.description
+      processor.description shouldBe s"${dataSourceOp.description}.globalWindows"
     }
   }
 
@@ -94,7 +94,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
     "not chain any Op" in {
       val dataSink = new AnySink
       val dataSinkOp = DataSinkOp(dataSink)
-      val chainableOp = mock[ChainableOp[Any, Any]]
+      val chainableOp = mock[TransformOp[Any, Any]]
       val ops = chainableOp +: unchainableOps
       ops.foreach { op =>
         intercept[OpChainException] {
@@ -103,10 +103,10 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor of DataSink" in {
+    "be translated to processor" in {
       val dataSink = new AnySink
       val dataSinkOp = DataSinkOp(dataSink)
-      val processor = dataSinkOp.getProcessor
+      val processor = dataSinkOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe dataSinkOp.parallelism
       processor.description shouldBe dataSinkOp.description
@@ -117,7 +117,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
 
     "not chain any Op" in {
       val processorOp = new ProcessorOp[AnyTask]
-      val chainableOp = mock[ChainableOp[Any, Any]]
+      val chainableOp = mock[TransformOp[Any, Any]]
       val ops = chainableOp +: unchainableOps
       ops.foreach { op =>
         intercept[OpChainException] {
@@ -126,41 +126,41 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor" in {
+    "be translated into processor" in {
       val processorOp = new ProcessorOp[AnyTask]
-      val processor = processorOp.getProcessor
+      val processor = processorOp.toProcessor
       processor shouldBe a [DefaultProcessor[_]]
       processor.parallelism shouldBe processorOp.parallelism
       processor.description shouldBe processorOp.description
     }
   }
 
-  "ChainableOp" should {
+  "TransformOp" should {
 
-    "chain ChainableOp" in {
+    "chain TransformOp" in {
       val fn1 = mock[FunctionRunner[Any, Any]]
-      val chainableOp1 = ChainableOp[Any, Any](fn1)
+      val transformOp1 = TransformOp[Any, Any](fn1)
 
       val fn2 = mock[FunctionRunner[Any, Any]]
-      val chainableOp2 = ChainableOp[Any, Any](fn2)
+      val transformOp2 = TransformOp[Any, Any](fn2)
 
-      val chainedOp = chainableOp1.chain(chainableOp2)
+      val chainedOp = transformOp1.chain(transformOp2)
 
-      chainedOp shouldBe a[ChainableOp[_, _]]
+      chainedOp shouldBe a[TransformOp[_, _]]
 
       unchainableOps.foreach { op =>
         intercept[OpChainException] {
-          chainableOp1.chain(op)
+          transformOp1.chain(op)
         }
       }
     }
 
-    "get Processor" in {
+    "be translated to processor" in {
       val fn = mock[FlatMapFunction[Any, Any]]
       val flatMapper = new FlatMapper(fn, "flatMap")
-      val chainableOp = ChainableOp[Any, Any](flatMapper)
+      val transformOp = TransformOp[Any, Any](flatMapper)
 
-      val processor = chainableOp.getProcessor
+      val processor = transformOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe 1
     }
@@ -168,14 +168,16 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
 
   "GroupByOp" should {
 
-    "chain ChainableOp" in {
-      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-      val groupByOp = GroupByOp[Any, Any](groupBy)
-      val fn = mock[FunctionRunner[Any, Any]]
-      val chainableOp = mock[ChainableOp[Any, Any]]
-      when(chainableOp.fn).thenReturn(fn)
+    val groupBy = (any: Any) => any
+    val groupByOp = GroupByOp[Any, Any](groupBy)
+
+    "chain WindowTransformOp" in {
 
-      val chainedOp = groupByOp.chain(chainableOp)
+      val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner())
+      val windowTransformOp = mock[WindowTransformOp[Any, Any]]
+      when(windowTransformOp.windowRunner).thenReturn(runner)
+
+      val chainedOp = groupByOp.chain(windowTransformOp)
       chainedOp shouldBe a[GroupByOp[_, _]]
 
       unchainableOps.foreach { op =>
@@ -185,25 +187,23 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
       }
     }
 
-    "delegate to groupByFn on getProcessor" in {
-      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-      val groupByOp = GroupByOp[Any, Any](groupBy)
-
-      groupByOp.getProcessor
-      verify(groupBy).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem])
+    "be translated to processor" in {
+      val processor = groupByOp.toProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe 1
     }
   }
 
   "MergeOp" should {
 
-    val mergeOp = MergeOp("merge")
+    val mergeOp = MergeOp()
 
-    "chain ChainableOp" in {
-      val fn = mock[FunctionRunner[Any, Any]]
-      val chainableOp = mock[ChainableOp[Any, Any]]
-      when(chainableOp.fn).thenReturn(fn)
+    "chain WindowTransformOp" in {
+      val runner = mock[WindowRunner[Any, Any]]
+      val windowTransformOp = mock[WindowTransformOp[Any, Any]]
+      when(windowTransformOp.windowRunner).thenReturn(runner)
 
-      val chainedOp = mergeOp.chain(chainableOp)
+      val chainedOp = mergeOp.chain(windowTransformOp)
       chainedOp shouldBe a [MergeOp]
 
       unchainableOps.foreach { op =>
@@ -213,8 +213,8 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
       }
     }
 
-    "get Processor" in {
-      val processor = mergeOp.getProcessor
+    "be translated to processor" in {
+      val processor = mergeOp.toProcessor
       processor shouldBe a[Processor[_]]
       processor.parallelism shouldBe 1
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 70abde9..70d21b5 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -24,16 +24,14 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner}
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
 import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FoldRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
 import org.apache.gearpump.streaming.MockUtil
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
 import org.scalatest.mock.MockitoSugar
@@ -58,10 +56,11 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
   "Planner" should "chain operations" in {
     val graph = Graph.empty[Op, OpEdge]
     val sourceOp = DataSourceOp(new AnySource)
-    val groupBy = GroupAlsoByWindow((any: Any) => any, CountWindows.apply[Any](1))
+    val groupBy = (any: Any) => any
     val groupByOp = GroupByOp(groupBy)
-    val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
-    val reduceOp = ChainableOp[Any, Option[Any]](anyReducer)
+    val windowOp = WindowOp(GlobalWindows())
+    val flatMapOp = TransformOp[Any, Any](anyFlatMapper)
+    val reduceOp = TransformOp[Any, Option[Any]](anyReducer)
     val processorOp = new ProcessorOp[AnyTask]
     val sinkOp = DataSinkOp(new AnySink)
     val directEdge = Direct
@@ -70,8 +69,10 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
     graph.addVertex(sourceOp)
     graph.addVertex(groupByOp)
     graph.addEdge(sourceOp, shuffleEdge, groupByOp)
+    graph.addVertex(windowOp)
+    graph.addEdge(groupByOp, directEdge, windowOp)
     graph.addVertex(flatMapOp)
-    graph.addEdge(groupByOp, directEdge, flatMapOp)
+    graph.addEdge(windowOp, directEdge, flatMapOp)
     graph.addVertex(reduceOp)
     graph.addEdge(flatMapOp, directEdge, reduceOp)
     graph.addVertex(processorOp)
@@ -86,9 +87,11 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
       .mapVertex(_.description)
 
     plan.vertices.toSet should contain theSameElementsAs
-      Set("source", "groupBy", "processor", "sink")
-    plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]]
-    plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner]
+      Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink")
+    plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe
+      a[GroupByPartitioner[_, _]]
+    plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe
+      a[CoLocationPartitioner]
     plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner]
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index f5d7c20..6244224 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -19,28 +19,22 @@ package org.apache.gearpump.streaming.dsl.plan.functions
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark}
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
-import org.mockito.ArgumentCaptor
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
 import org.mockito.Matchers._
 import org.mockito.Mockito._
 import org.scalatest.{Matchers, WordSpec}
 import org.scalatest.mock.MockitoSugar
 
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-
 class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
   import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunnerSpec._
 
@@ -216,40 +210,6 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
     }
   }
 
-  "Emit" should {
-
-    val emitFunction = mock[T => Unit]
-    val emit = new Emit[T](emitFunction)
-
-    "emit input value when processing input value" in {
-      val input = mock[T]
-
-      emit.process(input) shouldBe List.empty[Unit]
-
-      verify(emitFunction).apply(input)
-    }
-
-    "return empty description" in {
-      emit.description shouldBe ""
-    }
-
-    "return None on finish" in {
-      emit.finish() shouldBe List.empty[Unit]
-    }
-
-    "do nothing on setup" in {
-      emit.setup()
-
-      verifyZeroInteractions(emitFunction)
-    }
-
-    "do nothing on teardown" in {
-      emit.teardown()
-
-      verifyZeroInteractions(emitFunction)
-    }
-  }
-
   "Source" should {
     "iterate over input source and apply attached operator" in {
 
@@ -258,7 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
 
       val data = "one two three".split("\\s+")
       val dataSource = new CollectionDataSource[String](data)
-      val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
+      val runner1 = new DefaultWindowRunner[String, String](
+        GlobalWindows(), new DummyRunner[String])
+      val conf = UserConfig.empty
+        .withValue(GEARPUMP_STREAMING_SOURCE, dataSource)
+        .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1)
 
       // Source with no transformer
       val source = new DataSourceTask[String, String](
@@ -275,8 +239,10 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
       val anotherTaskContext = MockUtil.mockTaskContext
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
+      val runner2 = new DefaultWindowRunner[String, String](
+        GlobalWindows(), double)
       val another = new DataSourceTask(anotherTaskContext,
-        conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
+        conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2))
       another.onStart(Instant.EPOCH)
       another.onNext(Message("next"))
       another.onWatermarkProgress(Watermark.MAX)
@@ -287,44 +253,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
     }
   }
 
-  "CountTriggerTask" should {
-    "group input by groupBy Function and " +
-      "apply attached operator for each group" in {
-
-      val data = "1 2  2  3 3  3"
-
-      val concat = new FoldRunner[String, Option[String]](ReduceFunction({ (left, right) =>
-        left + right}), "concat")
-
-      implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-      val config = UserConfig.empty.withValue[FunctionRunner[String, Option[String]]](
-        GEARPUMP_STREAMING_OPERATOR, concat)
-
-      val taskContext = MockUtil.mockTaskContext
-
-      val groupBy = GroupAlsoByWindow((input: String) => input,
-        CountWindows.apply[String](1).accumulating)
-      val task = new CountTriggerTask[String, String](groupBy, taskContext, config)
-      task.onStart(Instant.EPOCH)
-
-      val peopleCaptor = ArgumentCaptor.forClass(classOf[Message])
-
-      data.split("\\s+").foreach { word =>
-        task.onNext(Message(word))
-      }
-      verify(taskContext, times(6)).output(peopleCaptor.capture())
-
-      import scala.collection.JavaConverters._
-
-      val values = peopleCaptor.getAllValues.asScala.map(input =>
-        input.value.asInstanceOf[Option[String]].get)
-      assert(values.mkString(",") == "1,2,22,3,33,333")
-      system.terminate()
-      Await.result(system.whenTerminated, Duration.Inf)
-    }
-  }
 
-  "TransformTask" should {
+  "MergeTask" should {
     "accept two stream and apply the attached operator" in {
 
       // Source with transformer
@@ -332,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
       val conf = UserConfig.empty
       val double = new FlatMapper[String, String](FlatMapFunction(
         word => List(word, word)), "double")
-      val transform = new Transform[String, String](taskContext, Some(double))
+      val transform = new DefaultWindowRunner[String, String](GlobalWindows(), double)
       val task = new TransformTask[String, String](transform, taskContext, conf)
       task.onStart(Instant.EPOCH)
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index 5b90a3e..c8c8b9f 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -61,9 +61,9 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M
     dag.vertices.size shouldBe 2
     dag.vertices.foreach { processor =>
       processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
-      if (processor.description == "A") {
+      if (processor.description == "A.globalWindows") {
         processor.parallelism shouldBe 2
-      } else if (processor.description == "B") {
+      } else if (processor.description == "B.globalWindows") {
         processor.parallelism shouldBe 3
       } else {
         fail(s"undefined source ${processor.description}")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index 4c7e209..ef8f932 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -22,10 +22,9 @@ import akka.actor._
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.task.{EventTimeTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner, HashPartitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.source.DataSourceTask
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
@@ -92,7 +91,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock
 
   private def getExpectedDagTopology: Graph[String, String] = {
     val source = classOf[DataSourceTask[_, _]].getName
-    val group = classOf[EventTimeTriggerTask[_, _]].getName
+    val group = classOf[GroupByTask[_, _, _]].getName
     val merge = classOf[TransformTask[_, _]].getName
     val join = classOf[Join].getName
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
deleted file mode 100644
index 1a4958a..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-import org.scalatest.{Matchers, PropSpec}
-
-class CountTriggerTaskSpec extends PropSpec with PropertyChecks
-  with Matchers with MockitoSugar {
-
-  property("CountTriggerTask should trigger output by number of messages in a window") {
-
-    implicit val system = MockUtil.system
-
-    val numGen = Gen.chooseNum[Int](1, 1000)
-
-    forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) =>
-
-      val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-      val window = CountWindows.apply[Any](windowSize)
-      when(groupBy.window).thenReturn(window)
-      val windowRunner = mock[WindowRunner]
-      val userConfig = UserConfig.empty
-
-      val task = new CountTriggerTask[Any, Any](groupBy, windowRunner,
-        MockUtil.mockTaskContext, userConfig)
-      val message = mock[Message]
-
-      for (i <- 1 to msgNum) {
-        task.onNext(message)
-      }
-      verify(windowRunner, times(msgNum)).process(message)
-      verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize))
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
deleted file mode 100644
index 9414c76..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.{Duration, Instant}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindows}
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-
-class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks
-  with Matchers with MockitoSugar {
-
-  property("EventTimeTriggerTask should trigger on watermark") {
-    val longGen = Gen.chooseNum[Long](1L, 1000L)
-    val windowSizeGen = longGen
-    val windowStepGen = longGen
-    val watermarkGen = longGen.map(Instant.ofEpochMilli)
-
-    forAll(windowSizeGen, windowStepGen, watermarkGen) {
-      (windowSize: Long, windowStep: Long, watermark: Instant) =>
-
-        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
-          Duration.ofMillis(windowStep)).triggering(EventTimeTrigger)
-        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-        val windowRunner = mock[WindowRunner]
-        val context = MockUtil.mockTaskContext
-        val config = UserConfig.empty
-
-        when(groupBy.window).thenReturn(window)
-
-        val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
-
-        val message = mock[Message]
-        task.onNext(message)
-        verify(windowRunner).process(message)
-
-        task.onWatermarkProgress(watermark)
-        verify(windowRunner).trigger(any[Instant])
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
new file mode 100644
index 0000000..0f87a1c
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner
+import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows
+import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
+import org.apache.gearpump.streaming.source.Watermark
+import org.mockito.Mockito._
+import org.scalacheck.Gen
+import org.scalatest.{Matchers, PropSpec}
+import org.scalatest.mock.MockitoSugar
+import org.scalatest.prop.PropertyChecks
+
+class GroupByTaskSpec extends PropSpec with PropertyChecks
+  with Matchers with MockitoSugar {
+
+  property("GroupByTask should trigger on watermark") {
+    val longGen = Gen.chooseNum[Long](1L, 1000L).map(Instant.ofEpochMilli)
+
+    forAll(longGen) { (time: Instant) =>
+      val groupBy = mock[Any => Int]
+      val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner[Any])
+      val context = MockUtil.mockTaskContext
+      val config = UserConfig.empty
+        .withValue(
+          Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)(MockUtil.system)
+
+      val task = new GroupByTask[Any, Int, Any](groupBy, context, config)
+      val value = time
+      val message = Message(value, time)
+      when(groupBy(time)).thenReturn(0)
+      task.onNext(message)
+
+      task.onWatermarkProgress(Watermark.MAX)
+      verify(context).output(message)
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
deleted file mode 100644
index cbc9e0c..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.{Duration, Instant}
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindows}
-import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalacheck.Gen
-import org.scalatest.{Matchers, PropSpec}
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.prop.PropertyChecks
-
-class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks
-  with Matchers with MockitoSugar {
-
-  property("ProcessingTimeTriggerTask should trigger on system time interval") {
-    val longGen = Gen.chooseNum[Long](1L, 1000L)
-    val windowSizeGen = longGen
-    val windowStepGen = longGen
-    val startTimeGen = longGen.map(Instant.ofEpochMilli)
-
-    forAll(windowSizeGen, windowStepGen, startTimeGen) {
-      (windowSize: Long, windowStep: Long, startTime: Instant) =>
-
-        val window = SlidingWindows.apply[Any](Duration.ofMillis(windowSize),
-          Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger)
-        val groupBy = mock[GroupAlsoByWindow[Any, Any]]
-        val windowRunner = mock[WindowRunner]
-        val context = MockUtil.mockTaskContext
-        val config = UserConfig.empty
-
-        when(groupBy.window).thenReturn(window)
-
-        val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config)
-
-        task.onStart(startTime)
-
-        val message = mock[Message]
-        task.onNext(message)
-        verify(windowRunner).process(message)
-
-        task.receiveUnManagedMessage(Triggering)
-        verify(windowRunner).trigger(any[Instant])
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
index 481925a..6b66f01 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala
@@ -22,11 +22,9 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
-import org.apache.gearpump.streaming.source.Watermark
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
 import org.mockito.{Matchers => MockitoMatchers}
-import org.mockito.Mockito.{times, verify, when}
+import org.mockito.Mockito.{verify, when}
 import org.scalacheck.Gen
 import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.mock.MockitoSugar
@@ -34,43 +32,25 @@ import org.scalatest.prop.PropertyChecks
 
 class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  private val timeGen = Gen.chooseNum[Long](Watermark.MIN.toEpochMilli,
-    Watermark.MAX.toEpochMilli - 1).map(Instant.ofEpochMilli)
-  private val runnerGen = {
-    val runner = mock[FunctionRunner[Any, Any]]
-    Gen.oneOf(Some(runner), None)
-  }
-
-  property("TransformTask should emit on watermark") {
-    val msgGen = for {
-      str <- Gen.alphaStr.suchThat(!_.isEmpty)
-      t <- timeGen
-    } yield Message(s"$str:$t", t)
-    val msgsGen = Gen.listOfN(10, msgGen)
-
-    forAll(runnerGen, msgsGen) {
-      (runner: Option[FunctionRunner[Any, Any]], msgs: List[Message]) =>
-        val taskContext = MockUtil.mockTaskContext
-        implicit val system = MockUtil.system
-        val config = UserConfig.empty
-        val transform = new Transform[Any, Any](taskContext, runner)
-        val task = new TransformTask[Any, Any](transform, taskContext, config)
-
-        msgs.foreach(task.onNext)
-
-        runner.foreach(r => when(r.finish()).thenReturn(None))
-        task.onWatermarkProgress(Watermark.MIN)
-        verify(taskContext, times(0)).output(MockitoMatchers.any[Message])
-
-        msgs.foreach { msg =>
-          runner.foreach(r =>
-            when(r.process(msg.value)).thenReturn(Some(msg.value)))
-        }
-        task.onWatermarkProgress(Watermark.MAX)
-
-        msgs.foreach { msg =>
-          verify(taskContext).output(MockitoMatchers.eq(msg))
-        }
+  property("MergeTask should trigger on watermark") {
+    val longGen = Gen.chooseNum[Long](1L, 1000L)
+    val watermarkGen = longGen.map(Instant.ofEpochMilli)
+
+    forAll(watermarkGen) { (watermark: Instant) =>
+      val windowRunner = mock[WindowRunner[Any, Any]]
+      val context = MockUtil.mockTaskContext
+      val config = UserConfig.empty
+      val task = new TransformTask[Any, Any](windowRunner, context, config)
+      val time = watermark.minusMillis(1L)
+      val value: Any = time
+      val message = Message(value, time)
+
+      task.onNext(message)
+      verify(windowRunner).process(TimestampedValue(value, time))
+
+      when(windowRunner.trigger(watermark)).thenReturn(Some(TimestampedValue(value, time)))
+      task.onWatermarkProgress(watermark)
+      verify(context).output(message)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
index fbbee3e..98e9919 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala
@@ -21,18 +21,15 @@ package org.apache.gearpump.streaming.dsl.window.impl
 import java.time.{Duration, Instant}
 
 import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
-import org.apache.gearpump.streaming.{Constants, MockUtil}
+import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.dsl.plan.functions.FoldRunner
 import org.apache.gearpump.streaming.dsl.window.api.SessionWindows
 import org.apache.gearpump.streaming.source.Watermark
-import org.mockito.Mockito.{times, verify}
 import org.scalatest.{Matchers, PropSpec}
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 
-
 class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks
   with Matchers with MockitoSugar {
 
@@ -40,34 +37,25 @@ class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks
 
     val data = List(
       Message(("foo", 1L), Instant.ofEpochMilli(1L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(8L)),
       Message(("foo", 1L), Instant.ofEpochMilli(15L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(17L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(18L)),
       Message(("foo", 1L), Instant.ofEpochMilli(25L)),
-      Message(("foo", 1L), Instant.ofEpochMilli(26L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(30L)),
-      Message(("bar", 1L), Instant.ofEpochMilli(31L))
+      Message(("foo", 1L), Instant.ofEpochMilli(26L))
     )
 
     type KV = (String, Long)
-    val taskContext = MockUtil.mockTaskContext
     implicit val system = MockUtil.system
     val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2))
-    val operator = new FoldRunner(reduce, "reduce")
-    val userConfig = UserConfig.empty.withValue(
-      Constants.GEARPUMP_STREAMING_OPERATOR, operator)
-    val windows = SessionWindows.apply[KV](Duration.ofMillis(4L))
-    val groupBy = GroupAlsoByWindow[KV, String](_._1, windows)
-    val windowRunner = new DefaultWindowRunner(taskContext, userConfig, groupBy)
-
-    data.foreach(windowRunner.process)
-    windowRunner.trigger(Watermark.MAX)
-
-    verify(taskContext, times(2)).output(Message(Some(("foo", 1)), Watermark.MAX))
-    verify(taskContext).output(Message(Some(("foo", 2)), Watermark.MAX))
-    verify(taskContext, times(2)).output(Message(Some(("bar", 2)), Watermark.MAX))
-    verify(taskContext).output(Message(Some(("bar", 1)), Watermark.MAX))
+    val windows = SessionWindows.apply(Duration.ofMillis(4L))
+    val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows,
+      new FoldRunner[KV, Option[KV]](reduce, "reduce"))
+
+    data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp)))
+    windowRunner.trigger(Watermark.MAX).toList shouldBe
+      List(
+        TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)),
+        TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(18)),
+        TimestampedValue(Some(("foo", 2)), Instant.ofEpochMilli(29))
+      )
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
new file mode 100644
index 0000000..038f91d
--- /dev/null
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitionerSpec.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.partitioner.GroupByPartitionerSpec.People
+import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
+
+class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
+
+  it should "group by message payload and window" in {
+    val mark = People("Mark", "male")
+    val tom = People("Tom", "male")
+    val michelle = People("Michelle", "female")
+
+    val partitionNum = 10
+
+    val groupBy = new GroupByPartitioner[People, String](_.gender)
+    groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
+      groupBy.getPartition(Message(tom, 2L), partitionNum)
+
+    groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
+      groupBy.getPartition(Message(michelle, 3L), partitionNum)
+  }
+}
+
+object GroupByPartitionerSpec {
+  case class People(name: String, gender: String)
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
index 7651251..f7a3a63 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala
@@ -23,8 +23,7 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
@@ -33,21 +32,16 @@ import org.scalatest.prop.PropertyChecks
 
 class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with MockitoSugar {
 
-  private val runnerGen = {
-    val runner = mock[FunctionRunner[Any, Any]]
-    Gen.oneOf(Some(runner), None)
-  }
-
   property("DataSourceTask should setup data source") {
-    forAll(runnerGen, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
-      (runner: Option[FunctionRunner[Any, Any]], startTime: Instant) =>
+    forAll(Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+      (startTime: Instant) =>
       val taskContext = MockUtil.mockTaskContext
       implicit val system = MockUtil.system
       val dataSource = mock[DataSource]
       val config = UserConfig.empty
         .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-      val transform = new Transform[Any, Any](taskContext, runner)
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+        val runner = mock[WindowRunner[Any, Any]]
+      val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
 
       sourceTask.onStart(startTime)
 
@@ -56,21 +50,20 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
   }
 
   property("DataSourceTask should read from DataSource and transform inputs") {
-    forAll(runnerGen, Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
-      (runner: Option[FunctionRunner[Any, Any]], str: String, timestamp: Instant) =>
+    forAll(Gen.alphaStr, Gen.chooseNum[Long](0L, 1000L).map(Instant.ofEpochMilli)) {
+      (str: String, timestamp: Instant) =>
         val taskContext = MockUtil.mockTaskContext
         implicit val system = MockUtil.system
         val dataSource = mock[DataSource]
         val config = UserConfig.empty
           .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-        val transform = new Transform[Any, Any](taskContext, runner)
-        val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+        val runner = mock[WindowRunner[Any, Any]]
+        val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
         val msg = Message(str, timestamp)
         when(dataSource.read()).thenReturn(msg)
-        runner.foreach(r => {
-          when(r.process(str)).thenReturn(Some(str))
-          when(r.finish()).thenReturn(None)
-        })
+
+        when(runner.trigger(Watermark.MAX)).thenReturn(
+          Some(TimestampedValue(str.asInstanceOf[Any], timestamp)))
 
         sourceTask.onNext(Message("next"))
         sourceTask.onWatermarkProgress(Watermark.MAX)
@@ -80,18 +73,16 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with
   }
 
   property("DataSourceTask should teardown DataSource") {
-    forAll(runnerGen) { (runner: Option[FunctionRunner[Any, Any]]) =>
-      val taskContext = MockUtil.mockTaskContext
-      implicit val system = MockUtil.system
-      val dataSource = mock[DataSource]
-      val config = UserConfig.empty
-        .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
-      val transform = new Transform[Any, Any](taskContext, runner)
-      val sourceTask = new DataSourceTask[Any, Any](taskContext, config, dataSource, transform)
+    val taskContext = MockUtil.mockTaskContext
+    implicit val system = MockUtil.system
+    val dataSource = mock[DataSource]
+    val config = UserConfig.empty
+      .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1)
+    val runner = mock[WindowRunner[Any, Any]]
+    val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config)
 
-      sourceTask.onStop()
+    sourceTask.onStop()
 
-      verify(dataSource).close()
-    }
+    verify(dataSource).close()
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
index fb0beaa..65cb17a 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/task/SubscriptionSpec.scala
@@ -24,9 +24,10 @@ import java.util.Random
 import org.mockito.Mockito._
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.{FlatSpec, Matchers}
-import org.apache.gearpump.{MAX_TIME_MILLIS, Message}
+import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.streaming.task.SubscriptionSpec.NextTask
 import org.apache.gearpump.streaming.{LifeTime, ProcessorDescription}
 
@@ -115,7 +116,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
       subscription.sendMessage(Message(randomMessage, clock))
     }
 
-    assert(subscription.allowSendingMoreMessages() == false)
+    assert(!subscription.allowSendingMoreMessages())
   }
 
   it should "report minClock as Long.MaxValue when there is no pending message" in {
@@ -124,7 +125,7 @@ class SubscriptionSpec extends FlatSpec with Matchers with MockitoSugar {
     subscription.sendMessage(msg1)
     assert(subscription.minClock == 70)
     subscription.receiveAck(Ack(TaskId(1, 1), 1, 1, session))
-    assert(subscription.minClock == MAX_TIME_MILLIS)
+    assert(subscription.minClock == Watermark.MAX.toEpochMilli)
   }
 
   private def randomMessage: String = new Random().nextInt.toString


[2/2] incubator-gearpump git commit: [GEARPUMP-316] Decouple groupBy from window

Posted by ma...@apache.org.
[GEARPUMP-316] Decouple groupBy from window

This includes major changes to `Stream DSL` as following

1. Decouple groupBy from window. Previously, `window` is required before `groupBy` now users can write `window.groupBy` or `groupBy.window`, even `window.groupBy.window`.
2. Correspondingly,  `GroupByOp` is split into `GroupByOp` and `WindowOp`. When chaining `Op`s, there will be intermediate `Op`s such as `WindowTransformOp`, `TransformWindowTransformOp` generated before they can be translated into a function (wrapping UDF).
3. The function will be run by a `WindowRunner` in either `DataSourceTask`, `GroupByTask` or `TransformTask`. Messages are  windowed, processed and triggered as defined by window function and triggers.
4. A Stream will implicitly in a global window. Its window only changes when user explicitly defines another window. Hence, `groupBy` equals to `globalWindows.groupBy.globalWindows`, `groupBy.fixedWindows` equals to `globalWindows.groupBy.fixedWindows`.
5. Bug fixes for output time and `Watermark.MAX`

Author: manuzhang <ow...@gmail.com>

Closes #186 from manuzhang/window.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/24e1a454
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/24e1a454
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/24e1a454

Branch: refs/heads/master
Commit: 24e1a4546260ca414ca84f8df055e57f4d963263
Parents: c1370d9
Author: manuzhang <ow...@gmail.com>
Authored: Wed Jun 14 10:05:32 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Jun 14 10:05:32 2017 +0800

----------------------------------------------------------------------
 .../materializer/RemoteMaterializerImpl.scala   |  22 +-
 .../apache/gearpump/streaming/Constants.scala   |   1 -
 .../streaming/dsl/javaapi/JavaStream.scala      |  17 +-
 .../dsl/partitioner/GroupByPartitioner.scala    |  48 ----
 .../apache/gearpump/streaming/dsl/plan/OP.scala | 243 ++++++++++++++++---
 .../gearpump/streaming/dsl/plan/Planner.scala   |  10 +-
 .../dsl/plan/functions/FunctionRunner.scala     |  16 +-
 .../streaming/dsl/scalaapi/Stream.scala         |  70 +++---
 .../streaming/dsl/scalaapi/StreamApp.scala      |   2 +-
 .../streaming/dsl/task/CountTriggerTask.scala   |  62 -----
 .../dsl/task/EventTimeTriggerTask.scala         |  58 -----
 .../streaming/dsl/task/GroupByTask.scala        |  73 ++++++
 .../dsl/task/ProcessingTimeTriggerTask.scala    |  81 -------
 .../streaming/dsl/task/TransformTask.scala      |  56 +----
 .../streaming/dsl/window/api/Trigger.scala      |   4 -
 .../dsl/window/api/WindowFunction.scala         |  37 ++-
 .../streaming/dsl/window/api/Windows.scala      |  43 ++--
 .../streaming/dsl/window/impl/Window.scala      |  28 ---
 .../dsl/window/impl/WindowRunner.scala          | 155 ++++++------
 .../partitioner/GroupByPartitioner.scala        |  47 ++++
 .../streaming/source/DataSourceTask.scala       |  24 +-
 .../gearpump/streaming/source/Watermark.scala   |   2 +-
 .../gearpump/streaming/task/Subscription.scala  |   4 +-
 .../partitioner/GroupByPartitionerSpec.scala    |  45 ----
 .../gearpump/streaming/dsl/plan/OpSpec.scala    |  94 +++----
 .../streaming/dsl/plan/PlannerSpec.scala        |  25 +-
 .../dsl/plan/functions/FunctionRunnerSpec.scala |  98 ++------
 .../streaming/dsl/scalaapi/StreamAppSpec.scala  |   4 +-
 .../streaming/dsl/scalaapi/StreamSpec.scala     |   7 +-
 .../dsl/task/CountTriggerTaskSpec.scala         |  61 -----
 .../dsl/task/EventTimeTriggerTaskSpec.scala     |  66 -----
 .../streaming/dsl/task/GroupByTaskSpec.scala    |  60 +++++
 .../task/ProcessingTimeTriggerTaskSpec.scala    |  69 ------
 .../streaming/dsl/task/TransformTaskSpec.scala  |  62 ++---
 .../window/impl/DefaultWindowRunnerSpec.scala   |  38 +--
 .../partitioner/GroupByPartitionerSpec.scala    |  45 ++++
 .../streaming/source/DataSourceTaskSpec.scala   |  51 ++--
 .../streaming/task/SubscriptionSpec.scala       |   7 +-
 38 files changed, 761 insertions(+), 1074 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
----------------------------------------------------------------------
diff --git a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
index 74fe077..e2cdbd4 100644
--- a/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
+++ b/experiments/akkastream/src/main/scala/org/apache/gearpump/akkastream/materializer/RemoteMaterializerImpl.scala
@@ -35,11 +35,9 @@ import org.apache.gearpump.akkastream.task.{BalanceTask, BatchTask, BroadcastTas
 import org.apache.gearpump.akkastream.task.TickSourceTask.{INITIAL_DELAY, INTERVAL, TICK}
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.plan.functions.FlatMapper
-import org.apache.gearpump.streaming.dsl.plan.{ChainableOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
+import org.apache.gearpump.streaming.dsl.plan.{TransformOp, DataSinkOp, DataSourceOp, Direct, GroupByOp, MergeOp, Op, OpEdge, ProcessorOp, Shuffle}
 import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.window.api.CountWindows
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.{ProcessorId, StreamApplication}
 import org.apache.gearpump.util.Graph
 import org.slf4j.LoggerFactory
@@ -152,10 +150,10 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
       val op = module match {
         case source: SourceTaskModule[_] =>
           val updatedConf = conf.withConfig(source.conf)
-          DataSourceOp(source.source, parallelism, updatedConf, "source")
+          DataSourceOp(source.source, parallelism, "source", updatedConf)
         case sink: SinkTaskModule[_] =>
           val updatedConf = conf.withConfig(sink.conf)
-          DataSinkOp(sink.sink, parallelism, updatedConf, "sink")
+          DataSinkOp(sink.sink, parallelism, "sink", updatedConf)
         case sourceBridge: SourceBridgeModule[_, _] =>
           ProcessorOp(classOf[SourceBridgeTask], parallelism = 1, conf, "source")
         case processor: ProcessorModule[_, _, _] =>
@@ -164,8 +162,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
         case sinkBridge: SinkBridgeModule[_, _] =>
           ProcessorOp(classOf[SinkBridgeTask], parallelism, conf, "sink")
         case groupBy: GroupByModule[Any, Any] =>
-          GroupByOp(GroupAlsoByWindow(groupBy.groupBy, CountWindows.apply[Any](1).accumulating),
-            parallelism, "groupBy", conf)
+          GroupByOp(groupBy.groupBy, parallelism, "groupBy", conf)
         case reduce: ReduceModule[_] =>
           reduceOp(reduce.f, conf)
         case graphStage: GraphStageModule =>
@@ -181,7 +178,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
       op
     }.mapEdge[OpEdge] { (n1, edge, n2) =>
       n2 match {
-        case chainableOp: ChainableOp[_, _]
+        case chainableOp: TransformOp[_, _]
           if !n1.isInstanceOf[ProcessorOp[_]] && !n2.isInstanceOf[ProcessorOp[_]] =>
           Direct
         case _ =>
@@ -242,8 +239,7 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
           withValue(FoldTask.AGGREGATOR, fold.f)
         ProcessorOp(classOf[FoldTask[_, _]], parallelism, foldConf, "fold")
       case groupBy: GroupBy[Any, Any] =>
-        GroupByOp(GroupAlsoByWindow(groupBy.keyFor, CountWindows.apply[Any](1).accumulating),
-          groupBy.maxSubstreams, "groupBy", conf)
+        GroupByOp(groupBy.keyFor, groupBy.maxSubstreams, "groupBy", conf)
       case groupedWithin: GroupedWithin[_] =>
         val diConf = conf.withValue[FiniteDuration](GroupedWithinTask.TIME_WINDOW, groupedWithin.d).
           withInt(GroupedWithinTask.BATCH_SIZE, groupedWithin.n)
@@ -285,9 +281,9 @@ class RemoteMaterializerImpl(graph: Graph[Module, Edge], system: ActorSystem) {
           withInt(MergeTask.INPUT_PORTS, merge.inputPorts)
         ProcessorOp(classOf[MergeTask], parallelism, mergeConf, "merge")
       case mergePreferred: MergePreferred[_] =>
-        MergeOp("mergePreferred", conf)
+        MergeOp()
       case mergeSorted: MergeSorted[_] =>
-        MergeOp("mergeSorted", conf)
+        MergeOp()
       case prefixAndTail: PrefixAndTail[_] =>
         // TODO
         null
@@ -480,7 +476,7 @@ object RemoteMaterializerImpl {
 
   def flatMapOp[In, Out](fun: In => TraversableOnce[Out], description: String,
       conf: UserConfig): Op = {
-    ChainableOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf)
+    TransformOp(new FlatMapper(FlatMapFunction[In, Out](fun), description), conf)
   }
 
   def conflateOp[In, Out](seed: In => Out, aggregate: (Out, In) => Out,

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index 7ac1b74..d9cfb92 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -24,7 +24,6 @@ object Constants {
   val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
   val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
   val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
-  val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function"
 
   val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index da0e4db..cb9f084 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -21,7 +21,7 @@ import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
 import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
-import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
 import org.apache.gearpump.streaming.dsl.window.api.Windows
 import org.apache.gearpump.streaming.task.Task
 
@@ -59,8 +59,8 @@ class JavaStream[T](val stream: Stream[T]) {
   }
 
   /** Merges streams of same type together */
-  def merge(other: JavaStream[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.merge(other.stream, description))
+  def merge(other: JavaStream[T], parallelism: Int, description: String): JavaStream[T] = {
+    new JavaStream[T](stream.merge(other.stream, parallelism, description))
   }
 
   /**
@@ -72,8 +72,8 @@ class JavaStream[T](val stream: Stream[T]) {
     new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
   }
 
-  def window(win: Windows[T], description: String): JavaWindowStream[T] = {
-    new JavaWindowStream[T](stream.window(win, description))
+  def window(win: Windows): JavaStream[T] = {
+    new JavaStream[T](stream.window(win))
   }
 
   /** Add a low level Processor to process messages */
@@ -84,10 +84,3 @@ class JavaStream[T](val stream: Stream[T]) {
   }
 }
 
-class JavaWindowStream[T](stream: WindowStream[T]) {
-
-  def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
-      description: String): JavaStream[T] = {
-    new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
deleted file mode 100644
index 3789d4e..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- *   val groupBy: (People => String) = people => people.gender
- *   val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param fn First apply message with groupBy function, then pick the hashCode of the output
- *   to do the partitioning. You must define hashCode() for output type of groupBy function.
- */
-class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner {
-
-  override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    val hashCode = fn(message.value.asInstanceOf[T]).hashCode()
-    (hashCode & Integer.MAX_VALUE) % partitionNum
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 708e0d2..2a45a8f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -20,18 +20,44 @@ package org.apache.gearpump.streaming.dsl.plan
 
 import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FunctionRunner}
+import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => WindowRunnerAT}
 import org.apache.gearpump.streaming.{Constants, Processor}
-import org.apache.gearpump.streaming.dsl.task.TransformTask
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask}
+import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows}
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner}
 import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
 import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
 import org.apache.gearpump.streaming.task.Task
 
 import scala.reflect.ClassTag
 
+object Op {
+
+  def concatenate(desc1: String, desc2: String): String = {
+    if (desc1 == null || desc1.isEmpty) desc2
+    else if (desc2 == null || desc2.isEmpty) desc1
+    else desc1 + "." + desc2
+  }
+
+  def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
+    config1.withConfig(config2)
+  }
+
+  def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
+      processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
+    if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
+      op.chain(
+        WindowOp(GlobalWindows()).chain(TransformOp(new DummyRunner[Any]))
+      ).toProcessor
+    } else {
+      processor
+    }
+  }
+
+}
+
 /**
  * This is a vertex on the logical plan.
  */
@@ -43,7 +69,7 @@ sealed trait Op {
 
   def chain(op: Op)(implicit system: ActorSystem): Op
 
-  def getProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+  def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
 }
 
 /**
@@ -67,7 +93,7 @@ case class ProcessorOp[T <: Task](
     throw new OpChainException(this, other)
   }
 
-  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
     DefaultProcessor(parallelism, description, userConfig, processor)
   }
 }
@@ -78,24 +104,40 @@ case class ProcessorOp[T <: Task](
 case class DataSourceOp(
     dataSource: DataSource,
     parallelism: Int = 1,
-    userConfig: UserConfig = UserConfig.empty,
-    description: String = "source")
+    description: String = "source",
+    userConfig: UserConfig = UserConfig.empty)
   extends Op {
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
-      case op: ChainableOp[_, _] =>
-        DataSourceOp(dataSource, parallelism,
-          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn),
-          description)
+      case op: WindowTransformOp[_, _] =>
+        DataSourceOp(
+          dataSource,
+          parallelism,
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+            op.windowRunner),
+            op.userConfig))
+      case op: TransformOp[_, _] =>
+        chain(
+          WindowOp(GlobalWindows()).chain(op))
+      case op: WindowOp =>
+        chain(
+          op.chain(TransformOp(new DummyRunner[Any]())))
+      case op: TransformWindowTransformOp[_, _, _] =>
+        chain(
+          WindowOp(GlobalWindows()).chain(op.transformOp)
+            .chain(op.windowTransformOp))
       case _ =>
         throw new OpChainException(this, other)
     }
   }
 
-  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    Processor[DataSourceTask[Any, Any]](parallelism, description,
-      userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Op.withGlobalWindowsDummyRunner(this, userConfig,
+      Processor[DataSourceTask[Any, Any]](parallelism, description,
+        userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource))
+    )
   }
 }
 
@@ -105,15 +147,15 @@ case class DataSourceOp(
 case class DataSinkOp(
     dataSink: DataSink,
     parallelism: Int = 1,
-    userConfig: UserConfig = UserConfig.empty,
-    description: String = "sink")
+    description: String = "sink",
+    userConfig: UserConfig = UserConfig.empty)
   extends Op {
 
   override def chain(op: Op)(implicit system: ActorSystem): Op = {
     throw new OpChainException(this, op)
   }
 
-  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
     DataSinkProcessor(dataSink, parallelism, description)
   }
 }
@@ -123,7 +165,7 @@ case class DataSinkOp(
  * (e.g. flatMap, map, filter, reduce) and further chained
  * to another Op to be used
  */
-case class ChainableOp[IN, OUT](
+case class TransformOp[IN, OUT](
     fn: FunctionRunner[IN, OUT],
     userConfig: UserConfig = UserConfig.empty) extends Op {
 
@@ -131,25 +173,127 @@ case class ChainableOp[IN, OUT](
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
-      case op: ChainableOp[OUT, _] =>
+      case op: TransformOp[OUT, _] =>
         // TODO: preserve type info
-        ChainableOp(AndThen(fn, op.fn))
+        // f3(f2(f1(in)))
+        // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3))
+        // => AndThen(AndThen(f1, f2), f3)
+        TransformOp(
+          AndThen(fn, op.fn),
+          Op.concatenate(userConfig, op.userConfig))
+      case op: WindowOp =>
+        TransformWindowTransformOp(this,
+          WindowTransformOp(new DefaultWindowRunner[OUT, OUT](
+            op.windows, new DummyRunner[OUT]
+          ), op.description, op.userConfig))
+      case op: TransformWindowTransformOp[OUT, _, _] =>
+        TransformWindowTransformOp(TransformOp(
+          AndThen(fn, op.transformOp.fn),
+          Op.concatenate(userConfig, op.transformOp.userConfig)
+        ), op.windowTransformOp)
       case _ =>
         throw new OpChainException(this, other)
     }
   }
 
-  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    Processor[TransformTask[Any, Any]](1, description,
-      userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn))
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    WindowOp(GlobalWindows()).chain(this).toProcessor
   }
 }
 
 /**
- * This represents a Processor with window aggregation
+ * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
+ * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
+ * Otherwise, it will be translated to a Processor of TransformTask.
  */
-case class GroupByOp[IN, GROUP](
-    groupBy: GroupAlsoByWindow[IN, GROUP],
+case class WindowTransformOp[IN, OUT](
+    windowRunner: WindowRunner[IN, OUT],
+    description: String,
+    userConfig: UserConfig) extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: WindowTransformOp[OUT, _] =>
+        WindowTransformOp(
+          WindowRunnerAT(windowRunner, op.windowRunner),
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig, op.userConfig)
+        )
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+    Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+  }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
+ * It will later be chained to a WindowOp, which results in two WindowTransformOps.
+ * Finally, they will be chained to a single WindowTransformOp.
+ */
+case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+    transformOp: TransformOp[IN, MIDDLE],
+    windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+  override def description: String = {
+    throw new UnsupportedOperationException(s"description is not supported on $this")
+  }
+
+  override def userConfig: UserConfig = {
+    throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+  }
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new UnsupportedOperationException(s"chain is not supported on $this")
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    WindowOp(GlobalWindows()).chain(this).toProcessor
+  }
+}
+
+/**
+ * This represents a window aggregation, together with a following TransformOp
+ */
+case class WindowOp(
+    windows: Windows,
+    userConfig: UserConfig = UserConfig.empty) extends Op {
+
+  override def description: String = windows.description
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: TransformOp[_, _] =>
+        WindowTransformOp(new DefaultWindowRunner(windows, op.fn),
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig, op.userConfig))
+      case op: WindowOp =>
+        chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any])))
+      case op: TransformWindowTransformOp[_, _, _] =>
+        WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn),
+          Op.concatenate(description, op.transformOp.description),
+          Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp)
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    chain(TransformOp(new DummyRunner[Any])).toProcessor
+  }
+
+}
+
+/**
+ * This represents a Processor with groupBy and window aggregation
+ */
+case class GroupByOp[IN, GROUP] private(
+    groupBy: IN => GROUP,
     parallelism: Int = 1,
     description: String = "groupBy",
     override val userConfig: UserConfig = UserConfig.empty)
@@ -157,36 +301,57 @@ case class GroupByOp[IN, GROUP](
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
-      case op: ChainableOp[_, _] =>
-        GroupByOp(groupBy, parallelism, description,
-          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case op: WindowTransformOp[_, _] =>
+        GroupByOp(
+          groupBy,
+          parallelism,
+          Op.concatenate(description, op.description),
+          Op.concatenate(
+            userConfig
+              .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner),
+            userConfig))
+      case op: WindowOp =>
+        chain(op.chain(TransformOp(new DummyRunner[Any]())))
       case _ =>
         throw new OpChainException(this, other)
     }
   }
 
-  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    groupBy.getProcessor(parallelism, description, userConfig)
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Op.withGlobalWindowsDummyRunner(this, userConfig,
+      Processor[GroupByTask[IN, GROUP, Any]](parallelism, description,
+        userConfig.withValue(Constants.GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupBy)))
   }
 }
 
 /**
  * This represents a Processor transforming merged streams
  */
-case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty)
+case class MergeOp(
+    parallelism: Int = 1,
+    description: String = "merge",
+    userConfig: UserConfig = UserConfig.empty)
   extends Op {
 
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
-      case op: ChainableOp[_, _] =>
-        MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case op: WindowTransformOp[_, _] =>
+        MergeOp(
+          parallelism,
+          description,
+          Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR,
+            op.windowRunner),
+            op.userConfig))
+      case op: WindowOp =>
+        chain(op.chain(TransformOp(new DummyRunner[Any]())))
       case _ =>
         throw new OpChainException(this, other)
     }
   }
 
-  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    Processor[TransformTask[Any, Any]](1, description, userConfig)
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Op.withGlobalWindowsDummyRunner(this, userConfig,
+      Processor[TransformTask[Any, Any]](parallelism, description, userConfig))
   }
 
 }
@@ -198,7 +363,7 @@ trait OpEdge
 
 /**
  * The upstream OP and downstream OP doesn't require network data shuffle.
- * e.g. ChainableOp
+ * e.g. TransformOp
  */
 case object Direct extends OpEdge
 
@@ -211,4 +376,4 @@ case object Shuffle extends OpEdge
 /**
  * Runtime exception thrown on chaining.
  */
-class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2")
\ No newline at end of file
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 can't be chained by $op2")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 1dd8026..b1b39c9 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -19,10 +19,8 @@
 package org.apache.gearpump.streaming.dsl.plan
 
 import akka.actor.ActorSystem
-
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, GroupByPartitioner, HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph
 
@@ -36,18 +34,18 @@ class Planner {
     (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
 
     val graph = optimize(dag)
-    graph.mapEdge { (node1, edge, node2) =>
+    graph.mapEdge { (_, edge, node2) =>
       edge match {
         case Shuffle =>
           node2 match {
             case op: GroupByOp[_, _] =>
-              new GroupByPartitioner(op.groupBy.groupByFn)
+              new GroupByPartitioner(op.groupBy)
             case _ => new HashPartitioner
           }
         case Direct =>
           new CoLocationPartitioner
       }
-    }.mapVertex(_.getProcessor)
+    }.mapVertex(_.toProcessor)
   }
 
   private def optimize(dag: Graph[Op, OpEdge])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index c27300f..2c11238 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -17,16 +17,9 @@
  */
 package org.apache.gearpump.streaming.dsl.plan.functions
 
-import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.FoldFunction
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 
-object FunctionRunner {
-  def withEmitFn[IN, OUT](runner: FunctionRunner[IN, OUT],
-      fn: OUT => Unit): FunctionRunner[IN, Unit] = {
-    AndThen(runner, new Emit(fn))
-  }
-}
-
 /**
  * Interface to invoke SerializableFunction methods
  *
@@ -121,12 +114,9 @@ class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String)
   }
 }
 
-class Emit[T](emit: T => Unit) extends FunctionRunner[T, Unit] {
+class DummyRunner[T] extends FunctionRunner[T, T] {
 
-  override def process(value: T): TraversableOnce[Unit] = {
-    emit(value)
-    None
-  }
+  override def process(value: T): TraversableOnce[T] = Option(value)
 
   override def description: String = ""
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index e15d4ae..ef2753e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -25,7 +25,6 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.plan._
 import org.apache.gearpump.streaming.dsl.plan.functions._
 import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -35,7 +34,8 @@ import scala.language.implicitConversions
 
 class Stream[T](
     private val graph: Graph[Op, OpEdge], private val thisNode: Op,
-    private val edge: Option[OpEdge] = None) {
+    private val edge: Option[OpEdge] = None,
+    private val windows: Windows = GlobalWindows()) {
 
   /**
    * Returns a new stream by applying a flatMap function to each element
@@ -108,11 +108,7 @@ class Stream[T](
    * @return a new stream after fold
    */
   def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
-    if (graph.vertices.exists(_.isInstanceOf[GroupByOp[_, _]])) {
-      transform(new FoldRunner(fn, description))
-    } else {
-      throw new UnsupportedOperationException("fold operation can only be applied on window")
-    }
+    transform(new FoldRunner(fn, description))
   }
 
   /**
@@ -138,10 +134,10 @@ class Stream[T](
   }
 
   private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = {
-    val op = ChainableOp(fn)
+    val op = TransformOp(fn)
     graph.addVertex(op)
     graph.addEdge(thisNode, edge.getOrElse(Direct), op)
-    new Stream(graph, op)
+    new Stream(graph, op, None, windows)
   }
 
   /**
@@ -160,12 +156,13 @@ class Stream[T](
    * @param other the other stream
    * @return  the merged stream
    */
-  def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
-    val mergeOp = MergeOp(description, UserConfig.empty)
+  def merge(other: Stream[T], parallelism: Int = 1, description: String = "merge"): Stream[T] = {
+    val mergeOp = MergeOp(parallelism, description, UserConfig.empty)
     graph.addVertex(mergeOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
     graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
-    new Stream[T](graph, mergeOp)
+    val winOp = Stream.addWindowOp(graph, mergeOp, windows)
+    new Stream[T](graph, winOp, None, windows)
   }
 
   /**
@@ -184,23 +181,26 @@ class Stream[T](
    * @param fn  Group by function
    * @param parallelism  Parallelism level
    * @param description  The description
-   * @return  the grouped stream
+   * @return the grouped stream
    */
   def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
       description: String = "groupBy"): Stream[T] = {
-    window(GlobalWindows())
-      .groupBy[GROUP](fn, parallelism, description)
+    val gbOp = GroupByOp(fn, parallelism, description)
+    graph.addVertex(gbOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), gbOp)
+    val winOp = Stream.addWindowOp(graph, gbOp, windows)
+    new Stream(graph, winOp, None, windows)
   }
 
   /**
    * Window function
    *
-   * @param win window definition
-   * @param description window description
-   * @return [[WindowStream]] where groupBy could be applied
+   * @param windows window definition
+   * @return the windowed [[Stream]]
    */
-  def window(win: Windows[T], description: String = "window"): WindowStream[T] = {
-    new WindowStream[T](graph, edge, thisNode, win, description)
+  def window(windows: Windows): Stream[T] = {
+    val winOp = Stream.addWindowOp(graph, thisNode, windows)
+    new Stream(graph, winOp, None, windows)
   }
 
   /**
@@ -216,22 +216,10 @@ class Stream[T](
     val processorOp = ProcessorOp(processor, parallelism, conf, description)
     graph.addVertex(processorOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
-    new Stream[R](graph, processorOp, Some(Shuffle))
+    new Stream[R](graph, processorOp, Some(Shuffle), windows)
   }
-}
 
-class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
-    window: Windows[T], winDesc: String) {
 
-  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
-      description: String = "groupBy"): Stream[T] = {
-    val groupBy = GroupAlsoByWindow(fn, window)
-    val groupOp = GroupByOp[T, GROUP](groupBy, parallelism,
-      s"$winDesc.$description")
-    graph.addVertex(groupOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
-    new Stream[T](graph, groupOp)
-  }
 }
 
 class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
@@ -263,8 +251,9 @@ class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
 
 object Stream {
 
-  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
-    new Stream[T](graph, node, edge)
+  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge],
+      windows: Windows): Stream[T] = {
+    new Stream[T](graph, node, edge, windows)
   }
 
   def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
@@ -272,6 +261,13 @@ object Stream {
   def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
   = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
 
+  def addWindowOp(graph: Graph[Op, OpEdge], op: Op, win: Windows): Op = {
+    val winOp = WindowOp(win)
+    graph.addVertex(winOp)
+    graph.addEdge(op, Direct, winOp)
+    winOp
+  }
+
   implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
     new KVStream(stream)
   }
@@ -279,10 +275,10 @@ object Stream {
   implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
     def sink(dataSink: DataSink, parallelism: Int = 1,
         conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
-      implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
+      implicit val sink = DataSinkOp(dataSink, parallelism, description, conf)
       stream.graph.addVertex(sink)
       stream.graph.addEdge(stream.thisNode, Shuffle, sink)
-      new Stream[T](stream.graph, sink)
+      new Stream[T](stream.graph, sink, None, stream.windows)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index 6378a18..bce8c0c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -78,7 +78,7 @@ object StreamApp {
 
     def source[T](dataSource: DataSource, parallelism: Int = 1,
         conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
-      implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
+      implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
deleted file mode 100644
index 0dc28eb..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.api.CountWindowFunction
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-
-/**
- * This task triggers output on number of messages in a window.
- */
-class CountTriggerTask[IN, GROUP](
-    groupBy: GroupAlsoByWindow[IN, GROUP],
-    windowRunner: WindowRunner,
-    taskContext: TaskContext,
-    userConfig: UserConfig)
-  extends Task(taskContext, userConfig) {
-
-  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
-      taskContext: TaskContext, userConfig: UserConfig) = {
-    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
-      taskContext, userConfig)
-  }
-
-  def this(taskContext: TaskContext, userConfig: UserConfig) = {
-    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
-      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
-      taskContext, userConfig)
-  }
-
-  private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFunction[IN]].size
-  private var num = 0
-
-  override def onNext(msg: Message): Unit = {
-    windowRunner.process(msg)
-    num += 1
-    if (windowSize == num) {
-      windowRunner.trigger(Instant.ofEpochMilli(windowSize))
-      num = 0
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
deleted file mode 100644
index 0674339..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-
-/**
- * This task triggers output on watermark progress.
- */
-class EventTimeTriggerTask[IN, GROUP](
-    groupBy: GroupAlsoByWindow[IN, GROUP],
-    windowRunner: WindowRunner,
-    taskContext: TaskContext,
-    userConfig: UserConfig)
-  extends Task(taskContext, userConfig) {
-
-  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
-      taskContext: TaskContext, userConfig: UserConfig) = {
-    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
-      taskContext, userConfig)
-  }
-
-  def this(taskContext: TaskContext, userConfig: UserConfig) = {
-    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
-      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
-      taskContext, userConfig)
-  }
-
-  override def onNext(message: Message): Unit = {
-    windowRunner.process(message)
-  }
-
-  override def onWatermarkProgress(watermark: Instant): Unit = {
-    windowRunner.trigger(watermark)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
new file mode 100644
index 0000000..8301fb9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.function.Consumer
+
+import com.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR}
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * Processes messages in groups as defined by groupBy function.
+ */
+class GroupByTask[IN, GROUP, OUT](
+    groupBy: IN => GROUP,
+    taskContext: TaskContext,
+    userConfig: UserConfig) extends Task(taskContext, userConfig) {
+
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(
+      conf.getValue[IN => GROUP](GEARPUMP_STREAMING_GROUPBY_FUNCTION)(context.system).get,
+      context, conf
+    )
+  }
+
+  private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] =
+    new UnifiedMap[GROUP, WindowRunner[IN, OUT]]
+
+  override def onNext(message: Message): Unit = {
+    val input = message.value.asInstanceOf[IN]
+    val group = groupBy(input)
+
+    if (!groups.containsKey(group)) {
+      groups.put(group,
+        userConfig.getValue[WindowRunner[IN, OUT]](
+          GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get)
+    }
+
+    groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN],
+      message.timestamp))
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] {
+      override def accept(runner: WindowRunner[IN, OUT]): Unit = {
+        runner.trigger(watermark).foreach {
+          result =>
+            taskContext.output(Message(result.value, result.timestamp))
+        }
+      }
+    })
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
deleted file mode 100644
index a04e3ca..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.task
-
-import java.time.Instant
-import java.util.concurrent.TimeUnit
-
-import akka.actor.Actor.Receive
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
-import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFunction
-import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-
-import scala.concurrent.duration.FiniteDuration
-
-object ProcessingTimeTriggerTask {
-  case object Triggering
-}
-
-/**
- * This task triggers output on scheduled system time interval.
- */
-class ProcessingTimeTriggerTask[IN, GROUP](
-    groupBy: GroupAlsoByWindow[IN, GROUP],
-    windowRunner: WindowRunner,
-    taskContext: TaskContext,
-    userConfig: UserConfig)
-  extends Task(taskContext, userConfig) {
-
-  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
-      taskContext: TaskContext, userConfig: UserConfig) = {
-    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
-      taskContext, userConfig)
-  }
-
-  def this(taskContext: TaskContext, userConfig: UserConfig) = {
-    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
-      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
-      taskContext, userConfig)
-  }
-
-  private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFunction[IN]]
-  private val windowSizeMs = windowFn.size.toMillis
-  private val windowStepMs = windowFn.step.toMillis
-
-  override def onStart(startTime: Instant): Unit = {
-    val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs
-    taskContext.scheduleOnce(
-      new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering)
-  }
-
-  override def onNext(message: Message): Unit = {
-    windowRunner.process(message)
-  }
-
-  override def receiveUnManagedMessage: Receive = {
-    case Triggering =>
-      windowRunner.trigger(Instant.now)
-      taskContext.scheduleOnce(
-        new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering)
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index 572df94..6a455a5 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -22,58 +22,28 @@ import java.time.Instant
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
-object TransformTask {
-
-  class Transform[IN, OUT](taskContext: TaskContext,
-      processor: Option[FunctionRunner[IN, OUT]],
-      private var buffer: Vector[Message] = Vector.empty[Message]) {
-
-    def onNext(msg: Message): Unit = {
-      buffer +:= msg
-    }
-
-    def onWatermarkProgress(watermark: Instant): Unit = {
-      var nextBuffer = Vector.empty[Message]
-      processor.foreach(_.setup())
-      buffer.foreach { message: Message =>
-        if (message.timestamp.isBefore(watermark)) {
-          processor match {
-            case Some(p) =>
-              FunctionRunner
-                .withEmitFn(p, (out: OUT) => taskContext.output(Message(out, message.timestamp)))
-                // .toList forces eager evaluation
-                .process(message.value.asInstanceOf[IN]).toList
-            case None =>
-              taskContext.output(message)
-          }
-        } else {
-          nextBuffer +:= message
-        }
-      }
-      processor.foreach(_.teardown())
-      buffer = nextBuffer
-    }
-  }
-
-}
-
-class TransformTask[IN, OUT](transform: Transform[IN, OUT],
+class TransformTask[IN, OUT](
+    runner: WindowRunner[IN, OUT],
     taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
-  def this(taskContext: TaskContext, userConf: UserConfig) = {
-    this(new Transform(taskContext, userConf.getValue[FunctionRunner[IN, OUT]](
-      GEARPUMP_STREAMING_OPERATOR)(taskContext.system)), taskContext, userConf)
+  def this(context: TaskContext, conf: UserConfig) = {
+    this(
+      conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      context, conf
+    )
   }
 
   override def onNext(msg: Message): Unit = {
-    transform.onNext(msg)
+    runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    transform.onWatermarkProgress(watermark)
+    runner.trigger(watermark).foreach {
+      result =>
+        taskContext.output(Message(result.value, result.timestamp))
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
index 9865e18..02d52a0 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -21,7 +21,3 @@ sealed trait Trigger
 
 case object EventTimeTrigger extends Trigger
 
-case object ProcessingTimeTrigger extends Trigger
-
-case object CountTrigger extends Trigger
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
index 7da9c85..a2f51c7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFunction.scala
@@ -32,35 +32,39 @@ object WindowFunction {
   }
 }
 
-trait WindowFunction[T] {
+trait WindowFunction {
 
-  def apply(context: WindowFunction.Context[T]): Array[Window]
+  def apply[T](context: WindowFunction.Context[T]): Array[Window]
 
   def isNonMerging: Boolean
 }
 
-abstract class NonMergingWindowFunction[T] extends WindowFunction[T] {
+abstract class NonMergingWindowFunction extends WindowFunction {
 
   override def isNonMerging: Boolean = true
 }
 
-case class GlobalWindowFunction[T]() extends NonMergingWindowFunction[T] {
+object GlobalWindowFunction {
 
-  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
-    Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
-      Instant.ofEpochMilli(MAX_TIME_MILLIS)))
-  }
+  val globalWindow = Array(Window(Instant.ofEpochMilli(MIN_TIME_MILLIS),
+    Instant.ofEpochMilli(MAX_TIME_MILLIS)))
+}
 
+case class GlobalWindowFunction() extends NonMergingWindowFunction {
+
+  override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
+    GlobalWindowFunction.globalWindow
+  }
 }
 
-case class SlidingWindowFunction[T](size: Duration, step: Duration)
-  extends NonMergingWindowFunction[T] {
+case class SlidingWindowFunction(size: Duration, step: Duration)
+  extends NonMergingWindowFunction {
 
   def this(size: Duration) = {
     this(size, size)
   }
 
-  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+  override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
     val timestamp = context.timestamp
     val sizeMillis = size.toMillis
     val stepMillis = step.toMillis
@@ -81,16 +85,9 @@ case class SlidingWindowFunction[T](size: Duration, step: Duration)
   }
 }
 
-case class CountWindowFunction[T](size: Int) extends NonMergingWindowFunction[T] {
-
-  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
-    Array(Window.ofEpochMilli(0, size))
-  }
-}
-
-case class SessionWindowFunction[T](gap: Duration) extends WindowFunction[T] {
+case class SessionWindowFunction(gap: Duration) extends WindowFunction {
 
-  override def apply(context: WindowFunction.Context[T]): Array[Window] = {
+  override def apply[T](context: WindowFunction.Context[T]): Array[Window] = {
     Array(Window(context.timestamp, context.timestamp.plus(gap)))
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
index 467f57c..d53bc96 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Windows.scala
@@ -20,42 +20,35 @@ package org.apache.gearpump.streaming.dsl.window.api
 import java.time.Duration
 
 /**
- *
  * Defines how to apply window functions.
  *
  * @param windowFn how to divide windows
  * @param trigger when to trigger window result
  * @param accumulationMode whether to accumulate results across windows
  */
-case class Windows[T](
-    windowFn: WindowFunction[T],
+case class Windows(
+    windowFn: WindowFunction,
     trigger: Trigger = EventTimeTrigger,
-    accumulationMode: AccumulationMode = Discarding) {
-
-  def triggering(trigger: Trigger): Windows[T] = {
-    Windows(windowFn, trigger)
-  }
+    accumulationMode: AccumulationMode = Discarding,
+    description: String) {
 
-  def accumulating: Windows[T] = {
-    Windows(windowFn, trigger, Accumulating)
+  def triggering(trigger: Trigger): Windows = {
+    Windows(windowFn, trigger, accumulationMode, description)
   }
 
-  def discarding: Windows[T] = {
-    Windows(windowFn, trigger, Discarding)
+  def accumulating: Windows = {
+    Windows(windowFn, trigger, Accumulating, description)
   }
-}
-
-object CountWindows {
 
-  def apply[T](size: Int): Windows[T] = {
-    Windows(CountWindowFunction(size), CountTrigger)
+  def discarding: Windows = {
+    Windows(windowFn, trigger, Discarding, description)
   }
 }
 
 object GlobalWindows {
 
-  def apply[T](): Windows[T] = {
-    Windows(GlobalWindowFunction())
+  def apply(): Windows = {
+    Windows(GlobalWindowFunction(), description = "globalWindows")
   }
 }
 
@@ -67,8 +60,8 @@ object FixedWindows {
    * @param size window size
    * @return a Window definition
    */
-  def apply[T](size: Duration): Windows[T] = {
-    Windows(SlidingWindowFunction(size, size))
+  def apply(size: Duration): Windows = {
+    Windows(SlidingWindowFunction(size, size), description = "fixedWindows")
   }
 }
 
@@ -81,8 +74,8 @@ object SlidingWindows {
    * @param step window step to slide forward
    * @return a Window definition
    */
-  def apply[T](size: Duration, step: Duration): Windows[T] = {
-    Windows(SlidingWindowFunction(size, step))
+  def apply(size: Duration, step: Duration): Windows = {
+    Windows(SlidingWindowFunction(size, step), description = "slidingWindows")
   }
 }
 
@@ -94,8 +87,8 @@ object SessionWindows {
    * @param gap session gap
    * @return a Window definition
    */
-  def apply[T](gap: Duration): Windows[T] = {
-    Windows(SessionWindowFunction(gap))
+  def apply(gap: Duration): Windows = {
+    Windows(SessionWindowFunction(gap), description = "sessionWindows")
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
index 05ce74e..870c334 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -25,7 +25,6 @@ import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.{Message, TimeStamp}
 import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask}
 import org.apache.gearpump.streaming.task.Task
 
 object Window {
@@ -65,31 +64,4 @@ case class Window(startTime: Instant, endTime: Instant) extends Comparable[Windo
   }
 }
 
-case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Windows[T]) {
-
-  def groupBy(message: Message): (GROUP, List[Window]) = {
-    val ele = message.value.asInstanceOf[T]
-    val group = groupByFn(ele)
-    val windows = window.windowFn(new WindowFunction.Context[T] {
-      override def element: T = ele
-      override def timestamp: Instant = message.timestamp
-    })
-    group -> windows.toList
-  }
-
-  def getProcessor(parallelism: Int, description: String,
-      userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
-    val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this)
-    window.trigger match {
-      case CountTrigger =>
-        Processor[CountTriggerTask[T, GROUP]](parallelism, description, config)
-      case ProcessingTimeTrigger =>
-        Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config)
-      case EventTimeTrigger =>
-        Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config)
-    }
-  }
-
-}
-
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index f392f70..2025618 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -19,133 +19,124 @@ package org.apache.gearpump.streaming.dsl.window.impl
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
 import com.gs.collections.api.block.predicate.Predicate
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.UserConfig
-import com.gs.collections.api.block.procedure.{Procedure, Procedure2}
+import com.gs.collections.api.block.procedure.Procedure
 import com.gs.collections.impl.list.mutable.FastList
-import com.gs.collections.impl.map.mutable.UnifiedMap
 import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
-import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.window.api.Discarding
-import org.apache.gearpump.streaming.task.TaskContext
-import org.apache.gearpump.util.LogUtil
-import org.slf4j.Logger
+import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context
+import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows}
 
+import scala.collection.mutable.ArrayBuffer
 
-trait WindowRunner {
+case class TimestampedValue[T](value: T, timestamp: Instant)
 
-  def process(message: Message): Unit
+trait WindowRunner[IN, OUT] extends java.io.Serializable {
 
-  def trigger(time: Instant): Unit
+  def process(timestampedValue: TimestampedValue[IN]): Unit
+
+  def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]]
 }
 
-object DefaultWindowRunner {
+case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE],
+    right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] {
+
+  def process(timestampedValue: TimestampedValue[IN]): Unit = {
+    left.process(timestampedValue)
+  }
 
-  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+  def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = {
+    left.trigger(time).foreach(right.process)
+    right.trigger(time)
+  }
 }
 
-class DefaultWindowRunner[IN, GROUP, OUT](
-    taskContext: TaskContext, userConfig: UserConfig,
-    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
-  extends WindowRunner {
-
-  private val windowFn = groupBy.window.windowFn
-  private val groupedWindowInputs = new UnifiedMap[GROUP, TreeSortedMap[Window, FastList[IN]]]
-  private val groupedFnRunners = new UnifiedMap[GROUP, FunctionRunner[IN, OUT]]
-  private val groupedRunnerSetups = new UnifiedMap[GROUP, Boolean]
-
-  override def process(message: Message): Unit = {
-    val input = message.value.asInstanceOf[IN]
-    val (group, windows) = groupBy.groupBy(message)
-    if (!groupedWindowInputs.containsKey(group)) {
-      groupedWindowInputs.put(group, new TreeSortedMap[Window, FastList[IN]]())
-    }
-    val windowInputs = groupedWindowInputs.get(group)
-    windows.foreach { win =>
+class DefaultWindowRunner[IN, OUT](
+    windows: Windows,
+    fnRunner: FunctionRunner[IN, OUT])
+  extends WindowRunner[IN, OUT] {
+
+  private val windowFn = windows.windowFn
+  private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]]
+  private var setup = false
+
+  override def process(timestampedValue: TimestampedValue[IN]): Unit = {
+    val wins = windowFn(new Context[IN] {
+      override def element: IN = timestampedValue.value
+
+      override def timestamp: Instant = timestampedValue.timestamp
+    })
+    wins.foreach { win =>
       if (windowFn.isNonMerging) {
         if (!windowInputs.containsKey(win)) {
-          val inputs = new FastList[IN](1)
+          val inputs = new FastList[TimestampedValue[IN]]
           windowInputs.put(win, inputs)
         }
-        windowInputs.get(win).add(input)
+        windowInputs.get(win).add(timestampedValue)
       } else {
-        merge(windowInputs, win, input)
+        merge(windowInputs, win, timestampedValue)
       }
     }
 
-    if (!groupedFnRunners.containsKey(group)) {
-      val runner = userConfig.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
-      groupedFnRunners.put(group, runner)
-      groupedRunnerSetups.put(group, false)
-    }
-
-    def merge(windowInputs: TreeSortedMap[Window, FastList[IN]], win: Window, input: IN): Unit = {
-      val intersected = windowInputs.keySet.select(new Predicate[Window] {
+    def merge(
+        winIns: TreeSortedMap[Window, FastList[TimestampedValue[IN]]],
+        win: Window, tv: TimestampedValue[IN]): Unit = {
+      val intersected = winIns.keySet.select(new Predicate[Window] {
         override def accept(each: Window): Boolean = {
           win.intersects(each)
         }
       })
       var mergedWin = win
-      val mergedInputs = FastList.newListWith(input)
+      val mergedInputs = FastList.newListWith(tv)
       intersected.forEach(new Procedure[Window] {
         override def value(each: Window): Unit = {
           mergedWin = mergedWin.span(each)
-          mergedInputs.addAll(windowInputs.remove(each))
+          mergedInputs.addAll(winIns.remove(each))
         }
       })
-      windowInputs.put(mergedWin, mergedInputs)
+      winIns.put(mergedWin, mergedInputs)
     }
-
   }
 
-  override def trigger(time: Instant): Unit = {
-    groupedWindowInputs.forEachKeyValue(new Procedure2[GROUP, TreeSortedMap[Window, FastList[IN]]] {
-      override def value(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
-        onTrigger(group, windowInputs)
-      }
-    })
-
+  override def trigger(time: Instant): TraversableOnce[TimestampedValue[OUT]] = {
     @annotation.tailrec
-    def onTrigger(group: GROUP, windowInputs: TreeSortedMap[Window, FastList[IN]]): Unit = {
+    def onTrigger(
+        outputs: ArrayBuffer[TimestampedValue[OUT]]): TraversableOnce[TimestampedValue[OUT]] = {
       if (windowInputs.notEmpty()) {
         val firstWin = windowInputs.firstKey
         if (!time.isBefore(firstWin.endTime)) {
           val inputs = windowInputs.remove(firstWin)
-          if (groupedFnRunners.containsKey(group)) {
-            val runner = FunctionRunner.withEmitFn(groupedFnRunners.get(group),
-              (output: OUT) => {
-                taskContext.output(Message(output, time))
-              })
-            val setup = groupedRunnerSetups.get(group)
-            if (!setup) {
-              runner.setup()
-              groupedRunnerSetups.put(group, true)
-            }
-            inputs.forEach(new Procedure[IN] {
-              override def value(t: IN): Unit = {
-                // .toList forces eager evaluation
-                runner.process(t).toList
+          if (!setup) {
+            fnRunner.setup()
+            setup = true
+          }
+          inputs.forEach(new Procedure[TimestampedValue[IN]] {
+            override def value(tv: TimestampedValue[IN]): Unit = {
+              fnRunner.process(tv.value).foreach {
+                out: OUT => outputs += TimestampedValue(out, tv.timestamp)
               }
-            })
-            // .toList forces eager evaluation
-            runner.finish().toList
-            if (groupBy.window.accumulationMode == Discarding) {
-              runner.teardown()
-              groupedRunnerSetups.put(group, false)
-              // dicarding, setup need to be called for each window
-              onTrigger(group, windowInputs)
-            } else {
-              // accumulating, setup is only called for the first window
-              onTrigger(group, windowInputs)
             }
+          })
+          fnRunner.finish().foreach {
+            out: OUT => outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1))
+          }
+          if (windows.accumulationMode == Discarding) {
+            fnRunner.teardown()
+            setup = false
+            // discarding, setup need to be called for each window
+            onTrigger(outputs)
           } else {
-            throw new RuntimeException(s"FunctionRunner not found for group $group")
+            // accumulating, setup is only called for the first window
+            onTrigger(outputs)
           }
+        } else {
+          outputs
         }
+      } else {
+        outputs
       }
     }
+
+    onTrigger(ArrayBuffer.empty[TimestampedValue[OUT]])
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala
new file mode 100644
index 0000000..c2ddb0d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/GroupByPartitioner.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ *   val groupBy: (People => String) = people => people.gender
+ *   val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param fn First apply message with groupBy function, then pick the hashCode of the output
+ *   to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, GROUP](fn: T => GROUP) extends UnicastPartitioner {
+
+  override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    val hashCode = fn(message.value.asInstanceOf[T]).hashCode()
+    (hashCode & Integer.MAX_VALUE) % partitionNum
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index ff1b2d4..74b0cc2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -23,8 +23,7 @@ import java.time.Instant
 import org.apache.gearpump._
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner
-import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
+import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner}
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
 /**
@@ -40,17 +39,17 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext}
  *  - `DataSource.close()` in `onStop`
  */
 class DataSourceTask[IN, OUT] private[source](
-    context: TaskContext,
-    conf: UserConfig,
     source: DataSource,
-    transform: Transform[IN, OUT])
+    windowRunner: WindowRunner[IN, OUT],
+    context: TaskContext,
+    conf: UserConfig)
   extends Task(context, conf) {
 
   def this(context: TaskContext, conf: UserConfig) = {
-    this(context, conf,
+    this(
       conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get,
-      new Transform[IN, OUT](context,
-        conf.getValue[FunctionRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system))
+      conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get,
+      context, conf
     )
   }
 
@@ -65,14 +64,19 @@ class DataSourceTask[IN, OUT] private[source](
 
   override def onNext(m: Message): Unit = {
     0.until(batchSize).foreach { _ =>
-      Option(source.read()).foreach(transform.onNext)
+      Option(source.read()).foreach(
+        msg => windowRunner.process(
+          TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)))
     }
 
     self ! Watermark(source.getWatermark)
   }
 
   override def onWatermarkProgress(watermark: Instant): Unit = {
-    transform.onWatermarkProgress(watermark)
+    windowRunner.trigger(watermark).foreach {
+      result =>
+        context.output(Message(result.value, result.timestamp))
+    }
   }
 
   override def onStop(): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
index 0ec2b6f..14abff8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/Watermark.scala
@@ -30,7 +30,7 @@ case class Watermark(instant: Instant) {
 
 object Watermark {
 
-  val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS)
+  val MAX: Instant = Instant.ofEpochMilli(MAX_TIME_MILLIS + 1)
 
   val MIN: Instant = Instant.ofEpochMilli(MIN_TIME_MILLIS)
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index 79bcc2a..44ec2c6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -57,8 +57,8 @@ class Subscription(
   private val pendingMessageCount: Array[Short] = new Array[Short](parallelism)
   private val candidateMinClockSince: Array[Short] = new Array[Short](parallelism)
 
-  private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
-  private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(MAX_TIME_MILLIS)
+  private val minClockValue: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
+  private val candidateMinClock: Array[TimeStamp] = Array.fill(parallelism)(Long.MaxValue)
 
   private var maxPendingCount: Short = 0
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/24e1a454/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
deleted file mode 100644
index 1934d14..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers}
-import org.apache.gearpump.Message
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People
-
-class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll {
-
-  it should "group by message payload and window" in {
-    val mark = People("Mark", "male")
-    val tom = People("Tom", "male")
-    val michelle = People("Michelle", "female")
-
-    val partitionNum = 10
-
-    val groupBy = new GroupByPartitioner[People, String](_.gender)
-    groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe
-      groupBy.getPartition(Message(tom, 2L), partitionNum)
-
-    groupBy.getPartition(Message(mark, 2L), partitionNum) should not be
-      groupBy.getPartition(Message(michelle, 3L), partitionNum)
-  }
-}
-
-object GroupByPartitionerSpec {
-  case class People(name: String, gender: String)
-}