You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/10/09 00:52:12 UTC

[2/2] incubator-gearpump git commit: [GEARPUMP-23] add window dsl

[GEARPUMP-23] add window dsl

The PR is opened for early review and the work is in progress with following todos.

- [x] basic window dsl support with `WindowedWordCount` example
- [x] improve `ReduceFunction` to not emit intermediate results
- [x] add unit tests
- [ ] add comments and update documentation
- [ ] support different types of computation (e.g. monoid which doesn't require input elements to be held in the window)

Author: manuzhang <ow...@gmail.com>

Closes #85 from manuzhang/window_dsl.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/66017ab7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/66017ab7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/66017ab7

Branch: refs/heads/master
Commit: 66017ab7bf5166ec312684f0e3e49e7219b4c24d
Parents: 5c4d60c
Author: manuzhang <ow...@gmail.com>
Authored: Sun Oct 9 08:52:00 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Sun Oct 9 08:52:00 2016 +0800

----------------------------------------------------------------------
 .../wordcount/dsl/WindowedWordCount.scala       |  87 +++++
 .../apache/gearpump/streaming/Constants.scala   |   1 +
 .../gearpump/streaming/StreamApplication.scala  |   2 +-
 .../apache/gearpump/streaming/dsl/Stream.scala  | 106 +++---
 .../gearpump/streaming/dsl/StreamApp.scala      |  34 +-
 .../streaming/dsl/javaapi/JavaStream.scala      |  22 +-
 .../apache/gearpump/streaming/dsl/op/OP.scala   | 109 ------
 .../dsl/partitioner/GroupByPartitioner.scala    |  49 +++
 .../dsl/partitioner/GroupbyPartitioner.scala    |  46 ---
 .../apache/gearpump/streaming/dsl/plan/OP.scala | 214 ++++++++++++
 .../streaming/dsl/plan/OpTranslator.scala       | 222 -------------
 .../gearpump/streaming/dsl/plan/Planner.scala   |  65 ++--
 .../plan/functions/SingleInputFunction.scala    | 107 ++++++
 .../streaming/dsl/task/CountTriggerTask.scala   |  63 ++++
 .../dsl/task/EventTimeTriggerTask.scala         |  59 ++++
 .../dsl/task/ProcessingTimeTriggerTask.scala    |  82 +++++
 .../streaming/dsl/task/TransformTask.scala      |  47 +++
 .../dsl/window/api/AccumulationMode.scala       |  24 ++
 .../streaming/dsl/window/api/GroupByFn.scala    |  47 +++
 .../streaming/dsl/window/api/Trigger.scala      |  27 ++
 .../streaming/dsl/window/api/Window.scala       |  77 +++++
 .../streaming/dsl/window/api/WindowFn.scala     |  63 ++++
 .../dsl/window/impl/ReduceFnRunner.scala        |  29 ++
 .../streaming/dsl/window/impl/Window.scala      |  75 +++++
 .../dsl/window/impl/WindowRunner.scala          | 114 +++++++
 .../streaming/source/DataSourceTask.scala       |  15 +-
 .../gearpump/streaming/task/TaskActor.scala     |   4 +-
 .../gearpump/streaming/dsl/StreamAppSpec.scala  |  67 ++--
 .../gearpump/streaming/dsl/StreamSpec.scala     |  24 +-
 .../partitioner/GroupByPartitionerSpec.scala    |  23 +-
 .../gearpump/streaming/dsl/plan/OpSpec.scala    | 244 ++++++++++++++
 .../streaming/dsl/plan/OpTranslatorSpec.scala   | 148 ---------
 .../streaming/dsl/plan/PlannerSpec.scala        | 132 ++++++++
 .../functions/SingleInputFunctionSpec.scala     | 333 +++++++++++++++++++
 .../dsl/task/CountTriggerTaskSpec.scala         |  61 ++++
 .../dsl/task/EventTimeTriggerTaskSpec.scala     |  66 ++++
 .../task/ProcessingTimeTriggerTaskSpec.scala    |  69 ++++
 37 files changed, 2246 insertions(+), 711 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
new file mode 100644
index 0000000..4f43fd4
--- /dev/null
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.examples.wordcount.dsl
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.AkkaApp
+
+object WindowedWordCount extends AkkaApp with ArgumentsParser {
+
+  override val options: Array[(String, CLIOption[Any])] = Array.empty
+
+  override def main(akkaConf: Config, args: Array[String]): Unit = {
+    val context = ClientContext(akkaConf)
+    val app = StreamApp("dsl", context)
+    app.source[String](new TimedDataSource).
+      // word => (word, count)
+      flatMap(line => line.split("[\\s]+")).map((_, 1)).
+      // fix window
+      window(FixedWindow.apply(Duration.ofMillis(5L))
+        .triggering(EventTimeTrigger)).
+      // (word, count1), (word, count2) => (word, count1 + count2)
+      groupBy(_._1).
+      sum.sink(new LoggerSink)
+
+    context.submit(app)
+    context.close()
+  }
+
+  private class TimedDataSource extends DataSource {
+
+    private var data = List(
+      Message("foo", 1L),
+      Message("bar", 2L),
+      Message("foo", 3L),
+      Message("foo", 5L),
+      Message("bar", 7L),
+      Message("bar", 8L)
+    )
+
+    private var watermark: Instant = Instant.ofEpochMilli(0)
+
+    override def read(): Message = {
+      if (data.nonEmpty) {
+        val msg = data.head
+        data = data.tail
+        watermark = Instant.ofEpochMilli(msg.timestamp)
+        msg
+      } else {
+        null
+      }
+    }
+
+    override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+    override def close(): Unit = {}
+
+    override def getWatermark: Instant = {
+      if (data.isEmpty) {
+        watermark = watermark.plusMillis(1)
+      }
+      watermark
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
index cd33b50..f99a436 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala
@@ -22,6 +22,7 @@ object Constants {
   val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator"
   val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source"
   val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function"
+  val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function"
 
   val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities"
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
index 66ec873..a6588a1 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala
@@ -123,7 +123,7 @@ object LifeTime {
  */
 class StreamApplication(
     override val name: String, val inputUserConfig: UserConfig,
-    val dag: Graph[ProcessorDescription, PartitionerDescription])
+    dag: Graph[ProcessorDescription, PartitionerDescription])
   extends Application {
 
   require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges")

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
index 786d496..440a45e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
@@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.op._
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl._
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
@@ -35,12 +38,12 @@ class Stream[T](
   /**
    * converts a value[T] to a list of value[R]
    *
-   * @param fun FlatMap function
+   * @param fn FlatMap function
    * @param description The description message for this operation
    * @return A new stream with type [R]
    */
-  def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = {
-    val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap"))
+  def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+    val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
     graph.addVertex(flatMapOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
     new Stream[R](graph, flatMapOp)
@@ -49,36 +52,36 @@ class Stream[T](
   /**
    * Maps message of type T message of type R
    *
-   * @param fun Function
+   * @param fn Function
    * @return A new stream with type [R]
    */
-  def map[R](fun: T => R, description: String = null): Stream[R] = {
+  def map[R](fn: T => R, description: String = "map"): Stream[R] = {
     this.flatMap({ data =>
-      Option(fun(data))
-    }, Option(description).getOrElse("map"))
+      Option(fn(data))
+    }, description)
   }
 
   /**
    * Keeps records when fun(T) == true
    *
-   * @param fun  the filter
+   * @param fn  the filter
    * @return  a new stream after filter
    */
-  def filter(fun: T => Boolean, description: String = null): Stream[T] = {
+  def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
     this.flatMap({ data =>
-      if (fun(data)) Option(data) else None
-    }, Option(description).getOrElse("filter"))
+      if (fn(data)) Option(data) else None
+    }, description)
   }
 
   /**
    * Reduces operations.
    *
-   * @param fun  reduction function
+   * @param fn  reduction function
    * @param description description message for this operator
    * @return a new stream after reduction
    */
-  def reduce(fun: (T, T) => T, description: String = null): Stream[T] = {
-    val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce"))
+  def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+    val reduceOp = ChainableOp(new ReduceFunction(fn, description))
     graph.addVertex(reduceOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
     new Stream(graph, reduceOp)
@@ -88,7 +91,10 @@ class Stream[T](
    * Log to task log file
    */
   def log(): Unit = {
-    this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log")
+    this.map(msg => {
+      LoggerFactory.getLogger("dsl").info(msg.toString)
+      msg
+    }, "log")
   }
 
   /**
@@ -97,8 +103,8 @@ class Stream[T](
    * @param other the other stream
    * @return  the merged stream
    */
-  def merge(other: Stream[T], description: String = null): Stream[T] = {
-    val mergeOp = MergeOp(Option(description).getOrElse("merge"))
+  def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+    val mergeOp = MergeOp(description, UserConfig.empty)
     graph.addVertex(mergeOp)
     graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
     graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
@@ -115,20 +121,29 @@ class Stream[T](
    *
    * For example,
    * {{{
-   * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..)
+   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
    * }}}
    *
-   * @param fun  Group by function
+   * @param fn  Group by function
    * @param parallelism  Parallelism level
    * @param description  The description
    * @return  the grouped stream
    */
-  def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null)
-    : Stream[T] = {
-    val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy"))
-    graph.addVertex(groupOp)
-    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
-    new Stream[T](graph, groupOp)
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    window(CountWindow.apply(1).accumulating)
+      .groupBy[GROUP](fn, parallelism, description)
+  }
+
+  /**
+   * Window function
+   *
+   * @param win window definition
+   * @param description window description
+   * @return [[WindowStream]] where groupBy could be applied
+   */
+  def window(win: Window, description: String = "window"): WindowStream[T] = {
+    new WindowStream[T](graph, edge, thisNode, win, description)
   }
 
   /**
@@ -140,15 +155,28 @@ class Stream[T](
    */
   def process[R](
       processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
-      description: String = null): Stream[R] = {
-    val processorOp = ProcessorOp(processor, parallelism, conf,
-      Option(description).getOrElse("process"))
+      description: String = "process"): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf, description)
     graph.addVertex(processorOp)
     graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
     new Stream[R](graph, processorOp, Some(Shuffle))
   }
 }
 
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
+    window: Window, winDesc: String) {
+
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
+    val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+      s"$winDesc.$description")
+    graph.addVertex(groupOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+    new Stream[T](graph, groupOp)
+  }
+}
+
 class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
   /**
    * GroupBy key
@@ -192,30 +220,18 @@ object Stream {
   }
 
   implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
-    def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String)
-      : Stream[T] = {
-      implicit val sink = DataSinkOp[T](dataSink, parallism, conf,
-        Some(description).getOrElse("traversable"))
+    def sink(dataSink: DataSink, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+      implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
       stream.graph.addVertex(sink)
       stream.graph.addEdge(stream.thisNode, Shuffle, sink)
       new Stream[T](stream.graph, sink)
     }
-
-    def sink[T](
-        sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty,
-        description: String = null): Stream[T] = {
-      val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source"))
-      stream.graph.addVertex(sinkOp)
-      stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp)
-      new Stream[T](stream.graph, sinkOp)
-    }
   }
 }
 
 class LoggerSink[T] extends DataSink {
-  var logger: Logger = null
-
-  private var context: TaskContext = null
+  var logger: Logger = _
 
   override def open(context: TaskContext): Unit = {
     this.logger = context.logger

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
index d45737b..8116146 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
@@ -24,10 +24,9 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.cluster.client.ClientContext
 import org.apache.gearpump.streaming.StreamApplication
-import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp}
-import org.apache.gearpump.streaming.dsl.plan.Planner
+import org.apache.gearpump.streaming.dsl.plan._
 import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.Message
 
@@ -50,7 +49,8 @@ import scala.language.implicitConversions
  * @param name name of app
  */
 class StreamApp(
-    val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) {
+    name: String, system: ActorSystem, userConfig: UserConfig,
+    private val graph: Graph[Op, OpEdge]) {
 
   def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
     this(name, system, userConfig, Graph.empty[Op, OpEdge])
@@ -76,34 +76,16 @@ object StreamApp {
 
   implicit class Source(app: StreamApp) extends java.io.Serializable {
 
-    def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = {
-      source(dataSource, parallelism, UserConfig.empty)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = {
-      source(dataSource, parallelism, UserConfig.empty, description)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = {
-      source(dataSource, parallelism, conf, description = null)
-    }
-
-    def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
-      : Stream[T] = {
+    def source[T](dataSource: DataSource, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
       implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
       app.graph.addVertex(sourceOp)
       new Stream[T](app.graph, sourceOp)
     }
+
     def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
       this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
     }
-
-    def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String)
-      : Stream[T] = {
-      val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source"))
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
   }
 }
 
@@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
 
   override def read(): Message = {
     if (iterator.hasNext) {
-      Message(iterator.next())
+      Message(iterator.next(), Instant.now().toEpochMilli)
     } else {
       null
     }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 6eff20c..3003b98 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -19,9 +19,9 @@
 package org.apache.gearpump.streaming.dsl.javaapi
 
 import scala.collection.JavaConverters._
-
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.Stream
+import org.apache.gearpump.streaming.dsl.window.api.Window
+import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
 import org.apache.gearpump.streaming.javaapi.dsl.functions._
 import org.apache.gearpump.streaming.task.Task
 
@@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) {
    * Group by a stream and turns it to a list of sub-streams. Operations chained after
    * groupBy applies to sub-streams.
    */
-  def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String)
-    : JavaStream[T] = {
-    new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description))
+  def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
+      parallelism: Int, description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+  }
+
+  def window(win: Window, description: String): JavaWindowStream[T] = {
+    new JavaWindowStream[T](stream.window(win, description))
   }
 
   /** Add a low level Processor to process messages */
@@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) {
     new JavaStream[R](stream.process(processor, parallelism, conf, description))
   }
 }
+
+class JavaWindowStream[T](stream: WindowStream[T]) {
+
+  def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
+      description: String): JavaStream[T] = {
+    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
deleted file mode 100644
index 49d9dec..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.op
-
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.sink.DataSink
-import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.task.Task
-
-/**
- * Operators for the DSL
- */
-sealed trait Op {
-  def description: String
-  def conf: UserConfig
-}
-
-/**
- * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP
- * "Attach" means running in same Actor.
- */
-trait SlaveOp[T] extends Op
-
-case class FlatMapOp[T, R](
-    fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty)
-  extends SlaveOp[T]
-
-case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty)
-  extends SlaveOp[T]
-
-trait MasterOp extends Op
-
-trait ParameterizedOp[T] extends MasterOp
-
-case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty)
-  extends MasterOp
-
-case class GroupByOp[T, R](
-    fun: T => R, parallelism: Int, description: String,
-    override val conf: UserConfig = UserConfig.empty)
-  extends ParameterizedOp[T]
-
-case class ProcessorOp[T <: Task](
-    processor: Class[T], parallelism: Int, conf: UserConfig, description: String)
-  extends ParameterizedOp[T]
-
-case class DataSourceOp[T](
-    dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String)
-  extends ParameterizedOp[T]
-
-case class DataSinkOp[T](
-    dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String)
-  extends ParameterizedOp[T]
-
-/**
- * Contains operators which can be chained to single one.
- *
- * For example, flatmap().map().reduce() can be chained to single operator as
- * no data shuffling is required.
- * @param ops list of operations
- */
-case class OpChain(ops: List[Op]) extends Op {
-  def head: Op = ops.head
-  def last: Op = ops.last
-
-  def description: String = null
-
-  override def conf: UserConfig = {
-    // The head's conf has priority
-    ops.reverse.foldLeft(UserConfig.empty) { (conf, op) =>
-      conf.withConfig(op.conf)
-    }
-  }
-}
-
-trait OpEdge
-
-/**
- * The upstream OP and downstream OP doesn't require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Direct extends OpEdge
-
-/**
- * The upstream OP and downstream OP DOES require network data shuffle.
- *
- * For example, map, flatmap operation doesn't require network shuffle, we can use Direct
- * to represent the relation with upstream operators.
- */
-case object Shuffle extends OpEdge
-

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
new file mode 100644
index 0000000..2ec881b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.partitioner
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+
+/**
+ * Partition messages by applying group by function first.
+ *
+ * For example:
+ * {{{
+ * case class People(name: String, gender: String)
+ *
+ * object Test{
+ *
+ *   val groupBy: (People => String) = people => people.gender
+ *   val partitioner = GroupByPartitioner(groupBy)
+ * }
+ * }}}
+ *
+ * @param fn First apply message with groupBy function, then pick the hashCode of the output
+ *   to do the partitioning. You must define hashCode() for output type of groupBy function.
+ */
+class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group])
+  extends UnicastPartitioner {
+  override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    val hashCode = fn.groupBy(message).hashCode()
+    (hashCode & Integer.MAX_VALUE) % partitionNum
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
deleted file mode 100644
index b2e2932..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.partitioner
-
-import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
-
-/**
- * Partition messages by applying group by function first.
- *
- * For example:
- * {{{
- * case class People(name: String, gender: String)
- *
- * object Test{
- *
- *   val groupBy: (People => String) = people => people.gender
- *   val partitioner = GroupByPartitioner(groupBy)
- * }
- * }}}
- *
- * @param groupBy First apply message with groupBy function, then pick the hashCode of the output
- *   to do the partitioning. You must define hashCode() for output type of groupBy function.
- */
-class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner {
-  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
-    val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode()
-    (hashCode & Integer.MAX_VALUE) % partitionNum
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
new file mode 100644
index 0000000..744976b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.plan
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor.DefaultProcessor
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.{Constants, Processor}
+import org.apache.gearpump.streaming.dsl.task.TransformTask
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
+import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
+import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
+import org.apache.gearpump.streaming.task.Task
+
+import scala.reflect.ClassTag
+
+/**
+ * This is a vertex on the logical plan.
+ */
+sealed trait Op {
+
+  def description: String
+
+  def userConfig: UserConfig
+
+  def chain(op: Op)(implicit system: ActorSystem): Op
+
+  def getProcessor(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+/**
+ * This represents a low level Processor.
+ */
+case class ProcessorOp[T <: Task](
+    processor: Class[T],
+    parallelism: Int,
+    userConfig: UserConfig,
+    description: String)
+  extends Op {
+
+  def this(
+      parallelism: Int = 1,
+      userConfig: UserConfig = UserConfig.empty,
+      description: String = "processor")(implicit classTag: ClassTag[T]) = {
+    this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description)
+  }
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, other)
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    DefaultProcessor(parallelism, description, userConfig, processor)
+  }
+}
+
+/**
+ * This represents a DataSource.
+ */
+case class DataSourceOp(
+    dataSource: DataSource,
+    parallelism: Int = 1,
+    userConfig: UserConfig = UserConfig.empty,
+    description: String = "source")
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        DataSourceOp(dataSource, parallelism,
+          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn),
+          description)
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Processor[DataSourceTask[Any, Any]](parallelism, description,
+      userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
+  }
+}
+
+/**
+ * This represents a DataSink.
+ */
+case class DataSinkOp(
+    dataSink: DataSink,
+    parallelism: Int = 1,
+    userConfig: UserConfig = UserConfig.empty,
+    description: String = "sink")
+  extends Op {
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new OpChainException(this, op)
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    DataSinkProcessor(dataSink, parallelism, description)
+  }
+}
+
+/**
+ * This represents operations that can be chained together
+ * (e.g. flatMap, map, filter, reduce) and further chained
+ * to another Op to be used
+ */
+case class ChainableOp[IN, OUT](
+    fn: SingleInputFunction[IN, OUT]) extends Op {
+
+  override def description: String = fn.description
+
+  override def userConfig: UserConfig = UserConfig.empty
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[OUT, _] =>
+        // TODO: preserve type info
+        ChainableOp(fn.andThen(op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+  }
+}
+
+/**
+ * This represents a Processor with window aggregation
+ */
+case class GroupByOp[IN, GROUP](
+    groupByFn: GroupByFn[IN, GROUP],
+    parallelism: Int = 1,
+    description: String = "groupBy",
+    override val userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        GroupByOp(groupByFn, parallelism, description,
+          userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    groupByFn.getProcessor(parallelism, description, userConfig)
+  }
+}
+
+/**
+ * This represents a Processor transforming merged streams
+ */
+case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty)
+  extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: ChainableOp[_, _] =>
+        MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn))
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    Processor[TransformTask[Any, Any]](1, description, userConfig)
+  }
+
+}
+
+/**
+ * This is an edge on the logical plan.
+ */
+trait OpEdge
+
+/**
+ * The upstream OP and downstream OP doesn't require network data shuffle.
+ * e.g. ChainableOp
+ */
+case object Direct extends OpEdge
+
+/**
+ * The upstream OP and downstream OP DOES require network data shuffle.
+ * e.g. GroupByOp
+ */
+case object Shuffle extends OpEdge
+
+/**
+ * Runtime exception thrown on chaining.
+ */
+class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2")
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
deleted file mode 100644
index 8de291c..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala
+++ /dev/null
@@ -1,222 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl.plan
-
-import scala.collection.TraversableOnce
-import akka.actor.ActorSystem
-import org.slf4j.Logger
-import org.apache.gearpump._
-import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.op._
-import org.apache.gearpump.streaming.dsl.plan.OpTranslator._
-import org.apache.gearpump.streaming.sink.DataSinkProcessor
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.LogUtil
-
-/**
- * Translates a OP to a TaskDescription
- */
-class OpTranslator extends java.io.Serializable {
-  val LOG: Logger = LogUtil.getLogger(getClass)
-
-  def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = {
-
-    val baseConfig = ops.conf
-
-    ops.ops.head match {
-      case op: MasterOp =>
-        val tail = ops.ops.tail
-        val func = toFunction(tail)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
-        op match {
-          case DataSourceOp(dataSource, parallelism, conf, description) =>
-            Processor[DataSourceTask[Any, Any]](parallelism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource))
-          case groupby@GroupByOp(_, parallelism, description, _) =>
-            Processor[GroupByTask[Object, Object, Object]](parallelism,
-              description = description + "." + func.description,
-              userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby))
-          case merge: MergeOp =>
-            Processor[TransformTask[Object, Object]](1,
-              description = op.description + "." + func.description,
-              userConfig)
-          case ProcessorOp(processor, parallelism, conf, description) =>
-            DefaultProcessor(parallelism,
-              description = description + "." + func.description,
-              userConfig, processor)
-          case DataSinkOp(dataSink, parallelism, conf, description) =>
-            DataSinkProcessor(dataSink, parallelism, description + func.description)
-        }
-      case op: SlaveOp[_] =>
-        val func = toFunction(ops.ops)
-        val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func)
-
-        Processor[TransformTask[Object, Object]](1,
-          description = func.description,
-          taskConf = userConfig)
-      case chain: OpChain =>
-        throw new RuntimeException("Not supposed to be called!")
-    }
-  }
-
-  private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = {
-    val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]()
-    val totalFunction = ops.foldLeft(func) { (fun, op) =>
-
-      val opFunction = op match {
-        case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] =>
-          new FlatMapFunction(flatmap.fun, flatmap.description)
-        case reduce: ReduceOp[Object @unchecked] =>
-          new ReduceFunction(reduce.fun, reduce.description)
-        case _ =>
-          throw new RuntimeException("Not supposed to be called!")
-      }
-      fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]])
-    }
-    totalFunction.asInstanceOf[SingleInputFunction[Object, Object]]
-  }
-}
-
-object OpTranslator {
-
-  trait SingleInputFunction[IN, OUT] extends Serializable {
-    def process(value: IN): TraversableOnce[OUT]
-    def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
-      new AndThen(this, other)
-    }
-
-    def description: String
-  }
-
-  class DummyInputFunction[T] extends SingleInputFunction[T, T] {
-    override def andThen[OUTER](other: SingleInputFunction[T, OUTER])
-    : SingleInputFunction[T, OUTER] = {
-      other
-    }
-
-    // Should never be called
-    override def process(value: T): TraversableOnce[T] = None
-
-    override def description: String = ""
-  }
-
-  class AndThen[IN, MIDDLE, OUT](
-      first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      first.process(value).flatMap(second.process)
-    }
-
-    override def description: String = {
-      Option(first.description).flatMap { description =>
-        Option(second.description).map(description + "." + _)
-      }.orNull
-    }
-  }
-
-  class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String)
-    extends SingleInputFunction[IN, OUT] {
-
-    override def process(value: IN): TraversableOnce[OUT] = {
-      fun(value)
-    }
-
-    override def description: String = {
-      this.descriptionMessage
-    }
-  }
-
-  class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String)
-    extends SingleInputFunction[T, T] {
-
-    private var state: Any = _
-
-    override def process(value: T): TraversableOnce[T] = {
-      if (state == null) {
-        state = value
-      } else {
-        state = fun(state.asInstanceOf[T], value)
-      }
-      Some(state.asInstanceOf[T])
-    }
-
-    override def description: String = descriptionMessage
-  }
-
-  class GroupByTask[IN, GROUP, OUT](
-      groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig)
-    extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[GroupByOp[IN, GROUP]](
-        GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun,
-        taskContext, userConf)
-    }
-
-    private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]]
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      val group = groupBy(msg.msg.asInstanceOf[IN])
-      if (!groups.contains(group)) {
-        val operator =
-          userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
-        groups += group -> operator
-      }
-
-      val operator = groups(group)
-
-      operator.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-        taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-      }
-    }
-  }
-
-  class TransformTask[IN, OUT](
-      operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
-      userConf: UserConfig) extends Task(taskContext, userConf) {
-
-    def this(taskContext: TaskContext, userConf: UserConfig) = {
-      this(userConf.getValue[SingleInputFunction[IN, OUT]](
-        GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
-    }
-
-    override def onNext(msg: Message): Unit = {
-      val time = msg.timestamp
-
-      operator match {
-        case Some(op) =>
-          op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
-            taskContext.output(new Message(msg.asInstanceOf[AnyRef], time))
-          }
-        case None =>
-          taskContext.output(new Message(msg.msg, time))
-      }
-    }
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index f5bbd65..16d5c06 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -22,7 +22,6 @@ import akka.actor.ActorSystem
 
 import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.Processor
-import org.apache.gearpump.streaming.dsl.op._
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph
@@ -33,64 +32,60 @@ class Planner {
    * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
    * level Graph API.
    */
-  def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem)
-    : Graph[Processor[_ <: Task], _ <: Partitioner] = {
+  def plan(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
 
-    val opTranslator = new OpTranslator()
-
-    val newDag = optimize(dag)
-    newDag.mapEdge { (node1, edge, node2) =>
+    val graph = optimize(dag)
+    graph.mapEdge { (node1, edge, node2) =>
       edge match {
         case Shuffle =>
-          node2.head match {
-            case groupBy: GroupByOp[Any @unchecked, Any @unchecked] =>
-              new GroupByPartitioner(groupBy.fun)
+          node2 match {
+            case groupBy: GroupByOp[_, _] =>
+              new GroupByPartitioner(groupBy.groupByFn)
             case _ => new HashPartitioner
           }
         case Direct =>
           new CoLocationPartitioner
       }
-    }.mapVertex { opChain =>
-      opTranslator.translate(opChain)
-    }
+    }.mapVertex(_.getProcessor)
   }
 
-  private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = {
-    val newGraph = dag.mapVertex(op => OpChain(List(op)))
-
-    val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse
+  private def optimize(dag: Graph[Op, OpEdge])
+    (implicit system: ActorSystem): Graph[Op, OpEdge] = {
+    val graph = dag.copy
+    val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse
     for (node <- nodes) {
-      val outGoingEdges = newGraph.outgoingEdgesOf(node)
+      val outGoingEdges = graph.outgoingEdgesOf(node)
       for (edge <- outGoingEdges) {
-        merge(newGraph, edge._1, edge._3)
+        merge(graph, edge._1, edge._3)
       }
     }
-    newGraph
+    graph
   }
 
-  private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain)
-    : Graph[OpChain, OpEdge] = {
-    if (dag.outDegreeOf(node1) == 1 &&
-      dag.inDegreeOf(node2) == 1 &&
+  private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op)
+    (implicit system: ActorSystem): Unit = {
+    if (graph.outDegreeOf(node1) == 1 &&
+      graph.inDegreeOf(node2) == 1 &&
       // For processor node, we don't allow it to merge with downstream operators
-      !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) {
-      val (_, edge, _) = dag.outgoingEdgesOf(node1).head
+      !node1.isInstanceOf[ProcessorOp[_ <: Task]] &&
+      !node2.isInstanceOf[ProcessorOp[_ <: Task]]) {
+      val (_, edge, _) = graph.outgoingEdgesOf(node1).head
       if (edge == Direct) {
-        val opList = OpChain(node1.ops ++ node2.ops)
-        dag.addVertex(opList)
-        for (incomingEdge <- dag.incomingEdgesOf(node1)) {
-          dag.addEdge(incomingEdge._1, incomingEdge._2, opList)
+        val chainedOp = node1.chain(node2)
+        graph.addVertex(chainedOp)
+        for (incomingEdge <- graph.incomingEdgesOf(node1)) {
+          graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp)
         }
 
-        for (outgoingEdge <- dag.outgoingEdgesOf(node2)) {
-          dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3)
+        for (outgoingEdge <- graph.outgoingEdgesOf(node2)) {
+          graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3)
         }
 
         // Remove the old vertex
-        dag.removeVertex(node1)
-        dag.removeVertex(node2)
+        graph.removeVertex(node1)
+        graph.removeVertex(node2)
       }
     }
-    dag
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
new file mode 100644
index 0000000..609fbb0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.plan.functions
+
+trait SingleInputFunction[IN, OUT] extends Serializable {
+  def process(value: IN): TraversableOnce[OUT]
+  def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
+    new AndThen(this, other)
+  }
+  def finish(): TraversableOnce[OUT] = None
+  def clearState(): Unit = {}
+  def description: String
+}
+
+class AndThen[IN, MIDDLE, OUT](
+    first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+  extends SingleInputFunction[IN, OUT] {
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    first.process(value).flatMap(second.process)
+  }
+
+  override def finish(): TraversableOnce[OUT] = {
+    val firstResult = first.finish().flatMap(second.process)
+    if (firstResult.isEmpty) {
+      second.finish()
+    } else {
+      firstResult
+    }
+  }
+
+  override def clearState(): Unit = {
+    first.clearState()
+    second.clearState()
+  }
+
+  override def description: String = {
+    Option(first.description).flatMap { description =>
+      Option(second.description).map(description + "." + _)
+    }.orNull
+  }
+}
+
+class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+  extends SingleInputFunction[IN, OUT] {
+
+  override def process(value: IN): TraversableOnce[OUT] = {
+    fn(value)
+  }
+
+  override def description: String = descriptionMessage
+}
+
+
+class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+  extends SingleInputFunction[T, T] {
+
+  private var state: Option[T] = None
+
+  override def process(value: T): TraversableOnce[T] = {
+    if (state.isEmpty) {
+      state = Option(value)
+    } else {
+      state = state.map(fn(_, value))
+    }
+    None
+  }
+
+  override def finish(): TraversableOnce[T] = {
+    state
+  }
+
+  override def clearState(): Unit = {
+    state = None
+  }
+
+  override def description: String = descriptionMessage
+}
+
+class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+
+  override def process(value: T): TraversableOnce[Unit] = {
+    emit(value)
+    None
+  }
+
+  override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
+    throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
+  }
+
+  override def description: String = ""
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
new file mode 100644
index 0000000..4ee2fa8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on number of messages in a window.
+ */
+class CountTriggerTask[IN, GROUP](
+    groupBy: GroupAlsoByWindow[IN, GROUP],
+    windowRunner: WindowRunner,
+    taskContext: TaskContext,
+    userConfig: UserConfig)
+  extends Task(taskContext, userConfig) {
+
+  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+      taskContext: TaskContext, userConfig: UserConfig) = {
+    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+      taskContext, userConfig)
+  }
+
+  def this(taskContext: TaskContext, userConfig: UserConfig) = {
+    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+      taskContext, userConfig)
+  }
+
+  private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size
+  private var num = 0
+
+  override def onNext(msg: Message): Unit = {
+    windowRunner.process(msg)
+    num += 1
+    if (windowSize == num) {
+      windowRunner.trigger(Instant.ofEpochMilli(windowSize))
+      num = 0
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
new file mode 100644
index 0000000..4b7649f
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+/**
+ * This task triggers output on watermark progress.
+ */
+class EventTimeTriggerTask[IN, GROUP](
+    groupBy: GroupAlsoByWindow[IN, GROUP],
+    windowRunner: WindowRunner,
+    taskContext: TaskContext,
+    userConfig: UserConfig)
+  extends Task(taskContext, userConfig) {
+
+  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+      taskContext: TaskContext, userConfig: UserConfig) = {
+    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+      taskContext, userConfig)
+  }
+
+  def this(taskContext: TaskContext, userConfig: UserConfig) = {
+    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+      taskContext, userConfig)
+  }
+
+  override def onNext(message: Message): Unit = {
+    windowRunner.process(message)
+  }
+
+  override def onWatermarkProgress(watermark: Instant): Unit = {
+    windowRunner.trigger(watermark)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
new file mode 100644
index 0000000..980a54b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import java.time.Instant
+import java.util.concurrent.TimeUnit
+
+import akka.actor.Actor.Receive
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering
+import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn
+import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner}
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+import scala.concurrent.duration.FiniteDuration
+
+object ProcessingTimeTriggerTask {
+  case object Triggering
+}
+
+/**
+ * This task triggers output on scheduled system time interval.
+ */
+class ProcessingTimeTriggerTask[IN, GROUP](
+    groupBy: GroupAlsoByWindow[IN, GROUP],
+    windowRunner: WindowRunner,
+    taskContext: TaskContext,
+    userConfig: UserConfig)
+  extends Task(taskContext, userConfig) {
+
+  def this(groupBy: GroupAlsoByWindow[IN, GROUP],
+      taskContext: TaskContext, userConfig: UserConfig) = {
+    this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system),
+      taskContext, userConfig)
+  }
+
+  def this(taskContext: TaskContext, userConfig: UserConfig) = {
+    this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]](
+      GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get,
+      taskContext, userConfig)
+  }
+
+  private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn]
+  private val windowSizeMs = windowFn.size.toMillis
+  private val windowStepMs = windowFn.step.toMillis
+
+  override def onStart(startTime: Instant): Unit = {
+    val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs
+    taskContext.scheduleOnce(
+      new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering)
+  }
+
+  override def onNext(message: Message): Unit = {
+    windowRunner.process(message)
+  }
+
+  override def receiveUnManagedMessage: Receive = {
+    case Triggering =>
+      windowRunner.trigger(Instant.now)
+      taskContext.scheduleOnce(
+        new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering)
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
new file mode 100644
index 0000000..e35f085
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.task
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+
+class TransformTask[IN, OUT](
+    operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
+    userConf: UserConfig) extends Task(taskContext, userConf) {
+
+  def this(taskContext: TaskContext, userConf: UserConfig) = {
+    this(userConf.getValue[SingleInputFunction[IN, OUT]](
+      GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
+  }
+
+  override def onNext(msg: Message): Unit = {
+    val time = msg.timestamp
+
+    operator match {
+      case Some(op) =>
+        op.process(msg.msg.asInstanceOf[IN]).foreach { msg =>
+          taskContext.output(new Message(msg, time))
+        }
+      case None =>
+        taskContext.output(new Message(msg.msg, time))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
new file mode 100644
index 0000000..a4524a8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait AccumulationMode
+
+case object Accumulating extends AccumulationMode
+
+case object Discarding extends AccumulationMode

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
new file mode 100644
index 0000000..30e68ba
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.streaming.task.Task
+
+/**
+ * Divides messages into groups according its payload and timestamp.
+ * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]]
+ * for default implementation.
+ */
+trait GroupByFn[T, GROUP] {
+
+  /**
+   * Used by
+   *   1. GroupByPartitioner to shuffle messages
+   *   2. WindowRunner to group messages for time-based aggregation
+   */
+  def groupBy(message: Message): GROUP
+
+  /**
+   * Returns a Processor according to window trigger during planning
+   */
+  def getProcessor(parallelism: Int, description: String,
+      userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task]
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
new file mode 100644
index 0000000..9865e18
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+sealed trait Trigger
+
+case object EventTimeTrigger extends Trigger
+
+case object ProcessingTimeTrigger extends Trigger
+
+case object CountTrigger extends Trigger
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
new file mode 100644
index 0000000..4b94879
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.Duration
+
+/**
+ *
+ * @param windowFn
+ * @param trigger
+ * @param accumulationMode
+ */
+case class Window(
+    windowFn: WindowFn,
+    trigger: Trigger = EventTimeTrigger,
+    accumulationMode: AccumulationMode = Discarding) {
+
+  def triggering(trigger: Trigger): Window = {
+    Window(windowFn, trigger)
+  }
+
+  def accumulating: Window = {
+    Window(windowFn, trigger, Accumulating)
+  }
+
+  def discarding: Window = {
+    Window(windowFn, trigger, Discarding)
+  }
+}
+
+object CountWindow {
+
+  def apply(size: Int): Window = {
+    Window(CountWindowFn(size), CountTrigger)
+  }
+}
+
+object FixedWindow {
+
+  /**
+   * Defines a FixedWindow.
+   * @param size window size
+   * @return a Window definition
+   */
+  def apply(size: Duration): Window = {
+    Window(SlidingWindowFn(size, size))
+  }
+}
+
+object SlidingWindow {
+
+  /**
+   * Defines a SlidingWindow
+   * @param size window size
+   * @param step window step to slide forward
+   * @return a Window definition
+   */
+  def apply(size: Duration, step: Duration): Window = {
+    Window(SlidingWindowFn(size, step))
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
new file mode 100644
index 0000000..0768730
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.api
+
+import java.time.{Duration, Instant}
+
+import org.apache.gearpump.TimeStamp
+import org.apache.gearpump.streaming.dsl.window.impl.Bucket
+
+import scala.collection.mutable.ArrayBuffer
+
+sealed trait WindowFn {
+  def apply(timestamp: Instant): List[Bucket]
+}
+
+case class SlidingWindowFn(size: Duration, step: Duration)
+  extends WindowFn {
+
+  def this(size: Duration) = {
+    this(size, size)
+  }
+
+  override def apply(timestamp: Instant): List[Bucket] = {
+    val sizeMillis = size.toMillis
+    val stepMillis = step.toMillis
+    val timeMillis = timestamp.toEpochMilli
+    val windows = ArrayBuffer.empty[Bucket]
+    var start = lastStartFor(timeMillis, stepMillis)
+    windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+    start -= stepMillis
+    while (start >= timeMillis) {
+      windows += Bucket.ofEpochMilli(start, start + sizeMillis)
+      start -= stepMillis
+    }
+    windows.toList
+  }
+
+  private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = {
+    timestamp - (timestamp + windowStep) % windowStep
+  }
+}
+
+case class CountWindowFn(size: Int) extends WindowFn {
+
+  override def apply(timestamp: Instant): List[Bucket] = {
+    List(Bucket.ofEpochMilli(0, size))
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
new file mode 100644
index 0000000..e978983
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.streaming.dsl.window.api.Trigger
+
+trait ReduceFnRunner {
+
+  def process(message: Message): Unit
+
+  def onTrigger(trigger: Trigger): Unit
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
new file mode 100644
index 0000000..53cf5d0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.Processor
+import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask}
+import org.apache.gearpump.streaming.task.Task
+
+object Bucket {
+  def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = {
+    Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime))
+  }
+}
+
+/**
+ * A window unit including startTime and excluding endTime.
+ */
+case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] {
+  override def compareTo(o: Bucket): Int = {
+    val ret = startTime.compareTo(o.startTime)
+    if (ret != 0) {
+      ret
+    } else {
+      endTime.compareTo(o.endTime)
+    }
+  }
+}
+
+case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window)
+  extends GroupByFn[T, (GROUP, List[Bucket])] {
+
+  override def groupBy(message: Message): (GROUP, List[Bucket]) = {
+    val group = groupByFn(message.msg.asInstanceOf[T])
+    val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp))
+    group -> buckets
+  }
+
+  override def getProcessor(parallelism: Int, description: String,
+      userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = {
+    val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this)
+    window.trigger match {
+      case CountTrigger =>
+        Processor[CountTriggerTask[T, GROUP]](parallelism, description, config)
+      case ProcessingTimeTrigger =>
+        Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config)
+      case EventTimeTrigger =>
+        Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config)
+    }
+  }
+
+}
+
+

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/66017ab7/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
new file mode 100644
index 0000000..9af5e61
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.window.impl
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.gs.collections.api.block.procedure.Procedure
+import org.apache.gearpump.gs.collections.impl.list.mutable.FastList
+import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap
+import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.window.api.Discarding
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.LogUtil
+import org.slf4j.Logger
+
+trait WindowRunner {
+
+  def process(message: Message): Unit
+
+  def trigger(time: Instant): Unit
+
+}
+
+object DefaultWindowRunner {
+
+  private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
+
+  case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
+    extends Comparable[WindowGroup[GROUP]] {
+    override def compareTo(o: WindowGroup[GROUP]): Int = {
+      val ret = bucket.compareTo(o.bucket)
+      if (ret != 0) {
+        ret
+      } else if (group.equals(o.group)) {
+        0
+      } else {
+        -1
+      }
+    }
+  }
+}
+
+class DefaultWindowRunner[IN, GROUP, OUT](
+    taskContext: TaskContext, userConfig: UserConfig,
+    groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem)
+  extends WindowRunner {
+  import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
+
+  private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]]
+  private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
+
+
+  override def process(message: Message): Unit = {
+    val (group, buckets) = groupBy.groupBy(message)
+    buckets.foreach { bucket =>
+      val wg = WindowGroup(bucket, group)
+      val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
+      inputs.add(message.msg.asInstanceOf[IN])
+      windowGroups.put(wg, inputs)
+    }
+    groupFns.putIfAbsent(group,
+      userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+  }
+
+  override def trigger(time: Instant): Unit = {
+    onTrigger()
+
+    @annotation.tailrec
+    def onTrigger(): Unit = {
+      if (windowGroups.notEmpty()) {
+        val first = windowGroups.firstKey
+        if (!time.isBefore(first.bucket.endTime)) {
+          val inputs = windowGroups.remove(first)
+          val reduceFn = groupFns.get(first.group)
+            .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+          inputs.forEach(new Procedure[IN] {
+            override def value(t: IN): Unit = {
+              reduceFn.process(t)
+            }
+          })
+          reduceFn.finish()
+          if (groupBy.window.accumulationMode == Discarding) {
+            reduceFn.clearState()
+          }
+          onTrigger()
+        }
+      }
+    }
+
+    def emitResult(result: OUT, time: Instant): Unit = {
+      taskContext.output(Message(result, time.toEpochMilli))
+    }
+  }
+}