You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/08/06 22:03:09 UTC

incubator-gearpump git commit: [GEARPUMP-339] Add ScalaDoc to Streaming DSL

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master f96aca995 -> e1228a314


[GEARPUMP-339] Add ScalaDoc to Streaming DSL

Author: manuzhang <ow...@gmail.com>

Closes #212 from manuzhang/plan_doc.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/e1228a31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/e1228a31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/e1228a31

Branch: refs/heads/master
Commit: e1228a31445418e6aca77ad3cf4c2c18fb6748ee
Parents: f96aca9
Author: manuzhang <ow...@gmail.com>
Authored: Mon Aug 7 06:02:29 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Aug 7 06:02:47 2017 +0800

----------------------------------------------------------------------
 .../topology/GearpumpStormComponentSpec.scala   |   5 +-
 .../storm/util/StormOutputCollectorSpec.scala   |   9 +-
 .../dsl/api/functions/FilterFunction.scala      |   2 -
 .../dsl/api/functions/FoldFunction.scala        |   2 -
 .../dsl/api/functions/MapFunction.scala         |   2 -
 .../api/functions/SerializableFunction.scala    |  32 ++++
 .../dsl/javaapi/functions/FlatMapFunction.scala |   2 +-
 .../apache/gearpump/streaming/dsl/package.scala |  48 ++++++
 .../apache/gearpump/streaming/dsl/plan/OP.scala | 171 ++++++++++++-------
 .../gearpump/streaming/dsl/plan/Planner.scala   |  20 ++-
 .../streaming/dsl/scalaapi/Stream.scala         |   1 +
 .../streaming/dsl/scalaapi/StreamApp.scala      |  25 ++-
 .../scalaapi/functions/FlatMapFunction.scala    |   2 +-
 .../functions/SerializableFunction.scala        |  32 ----
 14 files changed, 229 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
index 0891070..50204ca 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/topology/GearpumpStormComponentSpec.scala
@@ -24,12 +24,13 @@ import akka.actor.ActorRef
 import backtype.storm.spout.{ISpout, SpoutOutputCollector}
 import backtype.storm.task.{GeneralTopologyContext, IBolt, OutputCollector, TopologyContext}
 import backtype.storm.tuple.Tuple
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.producer.StormSpoutOutputCollector
 import org.apache.gearpump.experiments.storm.topology.GearpumpStormComponent.{GearpumpBolt, GearpumpSpout}
 import org.apache.gearpump.experiments.storm.util.StormOutputCollector
 import org.apache.gearpump.streaming.task.{TaskContext, TaskId}
 import org.apache.gearpump.streaming.{DAG, MockUtil}
-import org.apache.gearpump.{Message, TimeStamp}
+import org.apache.gearpump.Message
 import org.mockito.Matchers.{anyObject, eq => mockitoEq}
 import org.mockito.Mockito._
 import org.scalacheck.Gen
@@ -75,7 +76,7 @@ class GearpumpStormComponentSpec
   property("GearpumpBolt lifecycle") {
     val timestampGen = Gen.chooseNum[Long](0L, 1000L)
     val freqGen = Gen.chooseNum[Int](1, 100)
-    forAll(timestampGen, freqGen) { (timestamp: TimeStamp, freq: Int) =>
+    forAll(timestampGen, freqGen) { (timestamp: MilliSeconds, freq: Int) =>
       val config = mock[JMap[AnyRef, AnyRef]]
       val bolt = mock[IBolt]
       val taskContext = MockUtil.mockTaskContext

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
----------------------------------------------------------------------
diff --git a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
index 05627c9..7fab2cc 100644
--- a/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
+++ b/experiments/storm/src/test/scala/org/apache/gearpump/experiments/storm/util/StormOutputCollectorSpec.scala
@@ -27,7 +27,8 @@ import org.scalacheck.Gen
 import org.scalatest.mock.MockitoSugar
 import org.scalatest.prop.PropertyChecks
 import org.scalatest.{Matchers, PropSpec}
-import org.apache.gearpump.{MIN_TIME_MILLIS, Message, TimeStamp}
+import org.apache.gearpump.{Message, Time}
+import org.apache.gearpump.Time.MilliSeconds
 import org.apache.gearpump.experiments.storm.topology.GearpumpTuple
 import org.apache.gearpump.streaming.MockUtil
 
@@ -41,7 +42,7 @@ class StormOutputCollectorSpec
 
   property("StormOutputCollector emits tuple values into a stream") {
     forAll(timestampGen, streamIdGen, valuesGen) {
-      (timestamp: TimeStamp, streamId: String, values: JList[AnyRef]) =>
+      (timestamp: MilliSeconds, streamId: String, values: JList[AnyRef]) =>
         val targets = mock[JMap[String, JMap[String, Grouping]]]
         val taskToComponent = mock[JMap[Integer, String]]
         val getTargetPartitionsFn = mock[(String, JList[AnyRef]) =>
@@ -52,7 +53,7 @@ class StormOutputCollectorSpec
           targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
         val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
+          targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         stormOutputCollector.emit(streamId, values) shouldBe StormOutputCollector.EMPTY_LIST
@@ -85,7 +86,7 @@ class StormOutputCollectorSpec
           targetStormTaskIds))
         val taskContext = MockUtil.mockTaskContext
         val stormOutputCollector = new StormOutputCollector(stormTaskId, taskToComponent,
-          targets, getTargetPartitionsFn, taskContext, MIN_TIME_MILLIS)
+          targets, getTargetPartitionsFn, taskContext, Time.MIN_TIME_MILLIS)
 
         when(targets.containsKey(streamId)).thenReturn(false)
         verify(taskContext, times(0)).output(anyObject[Message])

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index 25a0929..8d3ffb3 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -17,8 +17,6 @@
  */
 package org.apache.gearpump.streaming.dsl.api.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
 object FilterFunction {
 
   def apply[T](fn: T => Boolean): FilterFunction[T] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
index 9ff44a8..1525d6e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
@@ -18,8 +18,6 @@
 
 package org.apache.gearpump.streaming.dsl.api.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
 /**
  * Combines input into an accumulator.
  *

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index a4fdca6..7880c2f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -17,8 +17,6 @@
  */
 package org.apache.gearpump.streaming.dsl.api.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
 object MapFunction {
 
   def apply[T, R](fn: T => R): MapFunction[T, R] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
new file mode 100644
index 0000000..b90ba28
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.api.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def teardown(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index 11e2416..adad878 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.streaming.dsl.javaapi.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+import org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction
 
 /**
  * Transforms one input into zero or more outputs of possibly different types.

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
new file mode 100644
index 0000000..6d43f16
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming
+
+
+// scalastyle:off line.size.limit
+/**
+ *
+ * The architecture of Gearpump Streaming DSL consists of several layers:
+ *
+ *   * User facing [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL. Stream is created by [[org.apache.gearpump.streaming.dsl.scalaapi.StreamApp]]
+ *     from input source like Kafka or by applying high level operations (e.g. flatMap, window, groupBy) to user defined functions(UDFs). UDFs are subclasses
+ *     of [[org.apache.gearpump.streaming.dsl.api.functions.SerializableFunction]], represented by [[org.apache.gearpump.streaming.dsl.plan.Op]]
+ *     in the underlying [[org.apache.gearpump.util.Graph]].
+ *   * [[org.apache.gearpump.streaming.dsl.plan.Planner]], responsible for interpreting the Op Graph, optimizing it and building a low level Graph of
+ *     [[org.apache.gearpump.streaming.Processor]]. Finally, it creates a runnable Graph of [[org.apache.gearpump.streaming.task.Task]].
+ *   * The execution layer is usually composed of the following four tasks.
+ *
+ *     * [[org.apache.gearpump.streaming.source.DataSourceTask]] for [[org.apache.gearpump.streaming.source.DataSource]] to ingest data into Gearpump
+ *     * [[org.apache.gearpump.streaming.sink.DataSinkTask]] for [[org.apache.gearpump.streaming.sink.DataSink]] to write data out.
+ *     * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]]
+ *     * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops.
+ *
+ *     All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally
+ *     runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with
+ *     [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually
+ *     executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]].
+ *
+ */
+// scalastyle:on line.size.limit
+package object dsl {
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 2a45a8f..c37ced6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -35,16 +35,29 @@ import scala.reflect.ClassTag
 
 object Op {
 
+  /**
+   * Concatenates two descriptions with "." or returns one if the other is empty.
+   */
   def concatenate(desc1: String, desc2: String): String = {
     if (desc1 == null || desc1.isEmpty) desc2
     else if (desc2 == null || desc2.isEmpty) desc1
     else desc1 + "." + desc2
   }
 
+  /**
+   * Concatenates two configs according to the following rules
+   *   1. The first config cannot be null.
+   *   2. The first config is returned if the second config is null
+   *   3. The second config takes precedence for overlapping config keys
+   */
   def concatenate(config1: UserConfig, config2: UserConfig): UserConfig = {
     config1.withConfig(config2)
   }
 
+  /**
+   * This adds a [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] in
+   * [[GlobalWindows]] if a targeting [[Task]] has no executable UDF.
+   */
   def withGlobalWindowsDummyRunner(op: Op, userConfig: UserConfig,
       processor: Processor[_ <: Task])(implicit system: ActorSystem): Processor[_ <: Task] = {
     if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) {
@@ -59,22 +72,37 @@ object Op {
 }
 
 /**
- * This is a vertex on the logical plan.
+ * This is a vertex on the logical Graph, representing user defined functions in
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL.
  */
 sealed trait Op {
 
+  /**
+   * This comes from user function description and is used to display it on front end.
+   */
   def description: String
 
+  /**
+   * This will ship user function to [[org.apache.gearpump.streaming.task.Task]] to be executed.
+   */
   def userConfig: UserConfig
 
+  /**
+   * This creates a new Op by merging their user functions, user configs and descriptions.
+   */
   def chain(op: Op)(implicit system: ActorSystem): Op
 
+  /**
+   *  This creates a Processor after chaining.
+   */
   def toProcessor(implicit system: ActorSystem): Processor[_ <: Task]
 }
 
 /**
- * This represents a low level Processor.
+ * This represents a low level Processor. It is deprecated since it
+ * doesn't work with other Ops.
  */
+@deprecated
 case class ProcessorOp[T <: Task](
     processor: Class[T],
     parallelism: Int,
@@ -99,7 +127,8 @@ case class ProcessorOp[T <: Task](
 }
 
 /**
- * This represents a DataSource.
+ * This represents a DataSource and creates a
+ * [[org.apache.gearpump.streaming.source.DataSourceTask]]
  */
 case class DataSourceOp(
     dataSource: DataSource,
@@ -142,7 +171,7 @@ case class DataSourceOp(
 }
 
 /**
- * This represents a DataSink.
+ * This represents a DataSink and creates a [[org.apache.gearpump.streaming.sink.DataSinkTask]].
  */
 case class DataSinkOp(
     dataSink: DataSink,
@@ -163,7 +192,7 @@ case class DataSinkOp(
 /**
  * This represents operations that can be chained together
  * (e.g. flatMap, map, filter, reduce) and further chained
- * to another Op to be used
+ * to another Op to be executed
  */
 case class TransformOp[IN, OUT](
     fn: FunctionRunner[IN, OUT],
@@ -201,61 +230,7 @@ case class TransformOp[IN, OUT](
   }
 }
 
-/**
- * This is an intermediate operation, produced by chaining WindowOp and TransformOp.
- * Usually, it will be chained to a DataSourceOp, GroupByOp or MergeOp.
- * Otherwise, it will be translated to a Processor of TransformTask.
- */
-case class WindowTransformOp[IN, OUT](
-    windowRunner: WindowRunner[IN, OUT],
-    description: String,
-    userConfig: UserConfig) extends Op {
-
-  override def chain(other: Op)(implicit system: ActorSystem): Op = {
-    other match {
-      case op: WindowTransformOp[OUT, _] =>
-        WindowTransformOp(
-          WindowRunnerAT(windowRunner, op.windowRunner),
-          Op.concatenate(description, op.description),
-          Op.concatenate(userConfig, op.userConfig)
-        )
-      case _ =>
-        throw new OpChainException(this, other)
-    }
-  }
-
-  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
-    Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
-      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
-  }
-}
-
-/**
- * This is an intermediate operation, produced by chaining TransformOp and WindowOp.
- * It will later be chained to a WindowOp, which results in two WindowTransformOps.
- * Finally, they will be chained to a single WindowTransformOp.
- */
-case class TransformWindowTransformOp[IN, MIDDLE, OUT](
-    transformOp: TransformOp[IN, MIDDLE],
-    windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
-
-  override def description: String = {
-    throw new UnsupportedOperationException(s"description is not supported on $this")
-  }
-
-  override def userConfig: UserConfig = {
-    throw new UnsupportedOperationException(s"userConfig is not supported on $this")
-  }
-
-  override def chain(op: Op)(implicit system: ActorSystem): Op = {
-    throw new UnsupportedOperationException(s"chain is not supported on $this")
-  }
 
-  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    WindowOp(GlobalWindows()).chain(this).toProcessor
-  }
-}
 
 /**
  * This represents a window aggregation, together with a following TransformOp
@@ -290,7 +265,14 @@ case class WindowOp(
 }
 
 /**
- * This represents a Processor with groupBy and window aggregation
+ * This represents an operation with groupBy followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]].
+ * However, it's possible a window function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
  */
 case class GroupByOp[IN, GROUP] private(
     groupBy: IN => GROUP,
@@ -325,7 +307,14 @@ case class GroupByOp[IN, GROUP] private(
 }
 
 /**
- * This represents a Processor transforming merged streams
+ * This represents an operation with merge followed by window aggregation.
+ *
+ * It can only be chained with [[WindowTransformOp]] to be executed in
+ * [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ * However, it's possible a merge function has no following aggregations. In that case,
+ * we manually tail a [[WindowOp]] with [[TransformOp]] of
+ * [[org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner]] to create a
+ * [[WindowTransformOp]].
  */
 case class MergeOp(
     parallelism: Int = 1,
@@ -357,7 +346,65 @@ case class MergeOp(
 }
 
 /**
- * This is an edge on the logical plan.
+ * This is an intermediate operation, produced by chaining [[WindowOp]] and [[TransformOp]].
+ * Usually, it will be chained to a [[DataSourceOp]], [[GroupByOp]] or [[MergeOp]]. Nonetheless,
+ * Op with more than 1 outgoing edge or incoming edge cannot be chained. In that case,
+ * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]].
+ */
+private case class WindowTransformOp[IN, OUT](
+    windowRunner: WindowRunner[IN, OUT],
+    description: String,
+    userConfig: UserConfig) extends Op {
+
+  override def chain(other: Op)(implicit system: ActorSystem): Op = {
+    other match {
+      case op: WindowTransformOp[OUT, _] =>
+        WindowTransformOp(
+          WindowRunnerAT(windowRunner, op.windowRunner),
+          Op.concatenate(description, op.description),
+          Op.concatenate(userConfig, op.userConfig)
+        )
+      case _ =>
+        throw new OpChainException(this, other)
+    }
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp
+    Processor[TransformTask[Any, Any]](1, description, userConfig.withValue(
+      Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner))
+  }
+}
+
+/**
+ * This is an intermediate operation, produced by chaining [[TransformOp]] and
+ * [[WindowTransformOp]]. It will later be chained to a [[WindowOp]], which results in
+ * two [[WindowTransformOp]]s. Finally, they will be chained to a single WindowTransformOp.
+ */
+private case class TransformWindowTransformOp[IN, MIDDLE, OUT](
+    transformOp: TransformOp[IN, MIDDLE],
+    windowTransformOp: WindowTransformOp[MIDDLE, OUT]) extends Op {
+
+  override def description: String = {
+    throw new UnsupportedOperationException(s"description is not supported on $this")
+  }
+
+  override def userConfig: UserConfig = {
+    throw new UnsupportedOperationException(s"userConfig is not supported on $this")
+  }
+
+  override def chain(op: Op)(implicit system: ActorSystem): Op = {
+    throw new UnsupportedOperationException(s"chain is not supported on $this")
+  }
+
+  override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
+    WindowOp(GlobalWindows()).chain(this).toProcessor
+  }
+}
+
+/**
+ * This is an edge on the logical plan. It defines whether data should be transported locally
+ * or shuffled remotely between [[Op]].
  */
 trait OpEdge
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index b1b39c9..04b5337 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -24,11 +24,26 @@ import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.task.Task
 import org.apache.gearpump.util.Graph
 
+/**
+ * This class is responsible for turning the high level
+ * [[org.apache.gearpump.streaming.dsl.scalaapi.Stream]] DSL into low level
+ * [[org.apache.gearpump.streaming.Processor]] API.
+ */
 class Planner {
 
   /**
-   * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low
-   * level Graph API.
+   * This method interprets a Graph of [[Op]] and creates a Graph of
+   * [[org.apache.gearpump.streaming.Processor]].
+   *
+   * It firstly reversely traverses the Graph from a leaf Op and merges it with
+   * its downstream Op according to the following rules.
+   *
+   *   1. The Op has only one outgoing edge and the downstream Op has only one incoming edge
+   *   2. Neither Op is [[ProcessorOp]]
+   *   3. The edge is [[Direct]]
+   *
+   * Finally the vertices of the optimized Graph are translated to Processors
+   * and the edges to Partitioners.
    */
   def plan(dag: Graph[Op, OpEdge])
     (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = {
@@ -43,6 +58,7 @@ class Planner {
             case _ => new HashPartitioner
           }
         case Direct =>
+          // FIXME: This is never used
           new CoLocationPartitioner
       }
     }.mapVertex(_.toProcessor)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index ef2753e..d0c733e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -210,6 +210,7 @@ class Stream[T](
    * @param parallelism  parallelism level
    * @return  new stream after processing with type [R]
    */
+  @deprecated
   def process[R](
       processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
       description: String = "process"): Stream[R] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index bce8c0c..17d77bc 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -62,6 +62,17 @@ class StreamApp(
     val dag = planner.plan(graph)
     StreamApplication(name, dag, userConfig)
   }
+
+  def source[T](dataSource: DataSource, parallelism: Int = 1,
+      conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
+    val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
+    graph.addVertex(sourceOp)
+    new Stream[T](graph, sourceOp)
+  }
+
+  def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+    this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
+  }
 }
 
 object StreamApp {
@@ -73,20 +84,6 @@ object StreamApp {
   implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
     streamApp.plan()
   }
-
-  implicit class Source(app: StreamApp) extends java.io.Serializable {
-
-    def source[T](dataSource: DataSource, parallelism: Int = 1,
-        conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
-      implicit val sourceOp = DataSourceOp(dataSource, parallelism, description, conf)
-      app.graph.addVertex(sourceOp)
-      new Stream[T](app.graph, sourceOp)
-    }
-
-    def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
-      this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
-    }
-  }
 }
 
 /** A test message source which generated message sequence repeatedly. */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
index 252b5bd..2d26df6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.streaming.dsl.scalaapi.functions
 
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, SerializableFunction}
 import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
 
 import scala.collection.JavaConverters._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/e1228a31/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
deleted file mode 100644
index ab88bf1..0000000
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.gearpump.streaming.dsl.scalaapi.functions
-
-/**
- * Superclass for all user defined function interfaces.
- * This ensures all functions are serializable and provides common methods
- * like setup and teardown. Users should not extend this class directly
- * but subclasses like [[FlatMapFunction]].
- */
-abstract class SerializableFunction extends java.io.Serializable {
-
-  def setup(): Unit = {}
-
-  def teardown(): Unit = {}
-
-}