You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2017/01/20 08:47:42 UTC

[02/19] incubator-gearpump git commit: merge master into akka-streams branch

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 3003b98..7f3c250 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -15,14 +15,14 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.gearpump.streaming.dsl.javaapi
 
-import scala.collection.JavaConverters._
 import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
 import org.apache.gearpump.streaming.dsl.window.api.Window
-import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
-import org.apache.gearpump.streaming.javaapi.dsl.functions._
 import org.apache.gearpump.streaming.task.Task
 
 /**
@@ -31,23 +31,23 @@ import org.apache.gearpump.streaming.task.Task
 class JavaStream[T](val stream: Stream[T]) {
 
   /** FlatMap on stream */
-  def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
-    new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+  def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
   }
 
   /** Map on stream */
   def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
-    new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+    new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
   }
 
   /** Only keep the messages that FilterFunction returns true.  */
   def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+    new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
   }
 
   /** Does aggregation on the stream */
   def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
-    new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description))
+    new JavaStream[T](stream.reduce(fn, description))
   }
 
   def log(): Unit = {
@@ -65,7 +65,7 @@ class JavaStream[T](val stream: Stream[T]) {
    */
   def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
       parallelism: Int, description: String): JavaStream[T] = {
-    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+    new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
   }
 
   def window(win: Window, description: String): JavaWindowStream[T] = {
@@ -84,6 +84,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) {
 
   def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
       description: String): JavaStream[T] = {
-    new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description))
+    new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index 0d841be..f5b2910 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -19,13 +19,14 @@
 package org.apache.gearpump.streaming.dsl.javaapi
 
 import java.util.Collection
-import scala.collection.JavaConverters._
 
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.cluster.client.{ClientContext, RunningApplication}
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp}
 import org.apache.gearpump.streaming.source.DataSource
 
+import scala.collection.JavaConverters._
+
 class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
 
   private val streamApp = StreamApp(name, context, userConfig)
@@ -41,7 +42,7 @@ class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig
     new JavaStream[T](streamApp.source(dataSource, parallelism, conf, description))
   }
 
-  def run(): Unit = {
+  def submit(): RunningApplication = {
     context.submit(streamApp)
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..85d597d
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Java version of FlatMapFunction returns a java.util.Iterator.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): java.util.Iterator[R]
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
new file mode 100644
index 0000000..7656cba
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.javaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
+
+/**
+ * Assigns the input value into a group.
+ *
+ * @param T Input value type
+ * @param GROUP Group value type
+ */
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
index 2ec881b..efa7409 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala
@@ -19,7 +19,7 @@
 package org.apache.gearpump.streaming.dsl.partitioner
 
 import org.apache.gearpump.Message
-import org.apache.gearpump.partitioner.UnicastPartitioner
+import org.apache.gearpump.streaming.partitioner.UnicastPartitioner
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index b2c5506..5aaf2fa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -22,11 +22,10 @@ import akka.actor.ActorSystem
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction}
 import org.apache.gearpump.streaming.{Constants, Processor}
 import org.apache.gearpump.streaming.dsl.task.TransformTask
-import org.apache.gearpump.streaming.dsl.window.api.{CountWindow, GroupByFn}
-import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
+import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor}
 import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask}
 import org.apache.gearpump.streaming.task.Task
@@ -130,12 +129,11 @@ case class ChainableOp[IN, OUT](
 
   override def description: String = fn.description
 
-
   override def chain(other: Op)(implicit system: ActorSystem): Op = {
     other match {
       case op: ChainableOp[OUT, _] =>
         // TODO: preserve type info
-        ChainableOp(fn.andThen(op.fn))
+        ChainableOp(AndThen(fn, op.fn))
       case _ =>
         throw new OpChainException(this, other)
     }
@@ -147,15 +145,6 @@ case class ChainableOp[IN, OUT](
   }
 }
 
-object GroupByOp {
-
-  def apply[IN, GROUP](groupBy: IN => GROUP, parallelism: Int,
-      description: String, userConfig: UserConfig): Op = {
-    GroupByOp(GroupAlsoByWindow(groupBy, CountWindow.apply(1).accumulating), parallelism,
-      description, userConfig)
-  }
-}
-
 /**
  * This represents a Processor with window aggregation
  */

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
index 16d5c06..65f9cd2 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.plan
 
 import akka.actor.ActorSystem
 
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.task.Task

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 609fbb0..687fd2e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -17,20 +17,37 @@
  */
 package org.apache.gearpump.streaming.dsl.plan.functions
 
-trait SingleInputFunction[IN, OUT] extends Serializable {
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+
+/**
+ * Internal function to process single input
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
   def process(value: IN): TraversableOnce[OUT]
-  def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
-    new AndThen(this, other)
-  }
+
   def finish(): TraversableOnce[OUT] = None
-  def clearState(): Unit = {}
+
+  def teardown(): Unit = {}
+
   def description: String
 }
 
-class AndThen[IN, MIDDLE, OUT](
-    first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE],
+    second: SingleInputFunction[MIDDLE, OUT])
   extends SingleInputFunction[IN, OUT] {
 
+  override def setup(): Unit = {
+    first.setup()
+    second.setup()
+  }
+
   override def process(value: IN): TraversableOnce[OUT] = {
     first.process(value).flatMap(second.process)
   }
@@ -44,9 +61,9 @@ class AndThen[IN, MIDDLE, OUT](
     }
   }
 
-  override def clearState(): Unit = {
-    first.clearState()
-    second.clearState()
+  override def teardown(): Unit = {
+    first.teardown()
+    second.teardown()
   }
 
   override def description: String = {
@@ -56,22 +73,31 @@ class AndThen[IN, MIDDLE, OUT](
   }
 }
 
-class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
   extends SingleInputFunction[IN, OUT] {
 
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
   override def process(value: IN): TraversableOnce[OUT] = {
     fn(value)
   }
 
-  override def description: String = descriptionMessage
+  override def teardown(): Unit = {
+    fn.teardown()
+  }
 }
 
-
-class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+class Reducer[T](fn: ReduceFunction[T], val description: String)
   extends SingleInputFunction[T, T] {
 
   private var state: Option[T] = None
 
+  override def setup(): Unit = {
+    fn.setup()
+  }
+
   override def process(value: T): TraversableOnce[T] = {
     if (state.isEmpty) {
       state = Option(value)
@@ -85,23 +111,18 @@ class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
     state
   }
 
-  override def clearState(): Unit = {
+  override def teardown(): Unit = {
     state = None
+    fn.teardown()
   }
-
-  override def description: String = descriptionMessage
 }
 
-class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
 
   override def process(value: T): TraversableOnce[Unit] = {
     emit(value)
     None
   }
 
-  override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
-    throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
-  }
-
   override def description: String = ""
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
new file mode 100644
index 0000000..430d795
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.dsl.plan.functions._
+import org.apache.gearpump.streaming.dsl.window.api._
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow}
+import org.apache.gearpump.streaming.sink.DataSink
+import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.util.Graph
+import org.slf4j.{Logger, LoggerFactory}
+
+import scala.language.implicitConversions
+
+class Stream[T](
+    private val graph: Graph[Op, OpEdge], private val thisNode: Op,
+    private val edge: Option[OpEdge] = None) {
+
+  /**
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
+   *
+   * @param fn flatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a flatMap function to each element
+   * and flatten the results.
+   *
+   * @param fn flatMap function
+   * @param description The description message for this operation
+   * @return A new stream with type [R]
+   */
+  def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+    transform(new FlatMapper[T, R](fn, description))
+  }
+
+  /**
+   * Returns a new stream by applying a map function to each element.
+   *
+   * @param fn map function
+   * @return A new stream with type [R]
+   */
+  def map[R](fn: T => R, description: String = "map"): Stream[R] = {
+    this.map(MapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a map function to each element.
+   *
+   * @param fn map function
+   * @return A new stream with type [R]
+   */
+  def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+
+  /**
+   * Returns a new Stream keeping the elements that satisfy the filter function.
+   *
+   * @param fn filter function
+   * @return a new stream after filter
+   */
+  def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
+    this.filter(FilterFunction(fn), description)
+  }
+
+  /**
+   * Returns a new Stream keeping the elements that satisfy the filter function.
+   *
+   * @param fn filter function
+   * @return a new stream after filter
+   */
+  def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+    this.flatMap(FlatMapFunction(fn), description)
+  }
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
+   * @param description description message for this operator
+   * @return a new stream after reduce
+   */
+  def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
+    reduce(ReduceFunction(fn), description)
+  }
+
+  /**
+   * Returns a new stream by applying a reduce function over all the elements.
+   *
+   * @param fn reduce function
+   * @param description description message for this operator
+   * @return a new stream after reduce
+   */
+  def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+    transform(new Reducer[T](fn, description))
+  }
+
+  private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = {
+    val op = ChainableOp(fn)
+    graph.addVertex(op)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+    new Stream(graph, op)
+  }
+
+  /**
+   * Log to task log file
+   */
+  def log(): Unit = {
+    this.map(msg => {
+      LoggerFactory.getLogger("dsl").info(msg.toString)
+      msg
+    }, "log")
+  }
+
+  /**
+   * Merges data from two stream into one
+   *
+   * @param other the other stream
+   * @return  the merged stream
+   */
+  def merge(other: Stream[T], description: String = "merge"): Stream[T] = {
+    val mergeOp = MergeOp(description, UserConfig.empty)
+    graph.addVertex(mergeOp)
+    graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp)
+    graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp)
+    new Stream[T](graph, mergeOp)
+  }
+
+  /**
+   * Group by function (T => Group)
+   *
+   * For example, we have T type, People(name: String, gender: String, age: Int)
+   * groupBy[People](_.gender) will group the people by gender.
+   *
+   * You can append other combinators after groupBy
+   *
+   * For example,
+   * {{{
+   * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..)
+   * }}}
+   *
+   * @param fn  Group by function
+   * @param parallelism  Parallelism level
+   * @param description  The description
+   * @return  the grouped stream
+   */
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    window(CountWindow.apply(1).accumulating)
+      .groupBy[GROUP](fn, parallelism, description)
+  }
+
+  /**
+   * Window function
+   *
+   * @param win window definition
+   * @param description window description
+   * @return [[WindowStream]] where groupBy could be applied
+   */
+  def window(win: Window, description: String = "window"): WindowStream[T] = {
+    new WindowStream[T](graph, edge, thisNode, win, description)
+  }
+
+  /**
+   * Connects with a low level Processor(TaskDescription)
+   *
+   * @param processor  a user defined processor
+   * @param parallelism  parallelism level
+   * @return  new stream after processing with type [R]
+   */
+  def process[R](
+      processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty,
+      description: String = "process"): Stream[R] = {
+    val processorOp = ProcessorOp(processor, parallelism, conf, description)
+    graph.addVertex(processorOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp)
+    new Stream[R](graph, processorOp, Some(Shuffle))
+  }
+}
+
+class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op,
+    window: Window, winDesc: String) {
+
+  def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1,
+      description: String = "groupBy"): Stream[T] = {
+    val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window)
+    val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism,
+      s"$winDesc.$description")
+    graph.addVertex(groupOp)
+    graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp)
+    new Stream[T](graph, groupOp)
+  }
+}
+
+class KVStream[K, V](stream: Stream[Tuple2[K, V]]) {
+  /**
+   * GroupBy key
+   *
+   * Applies to Stream[Tuple2[K,V]]
+   *
+   * @param parallelism  the parallelism for this operation
+   * @return  the new KV stream
+   */
+  def groupByKey(parallelism: Int = 1): Stream[Tuple2[K, V]] = {
+    stream.groupBy(Stream.getTupleKey[K, V], parallelism, "groupByKey")
+  }
+
+  /**
+   * Sum the value of the tuples
+   *
+   * Apply to Stream[Tuple2[K,V]], V must be of type Number
+   *
+   * For input (key, value1), (key, value2), will generate (key, value1 + value2)
+   * @param numeric  the numeric operations
+   * @return  the sum stream
+   */
+  def sum(implicit numeric: Numeric[V]): Stream[(K, V)] = {
+    stream.reduce(Stream.sumByKey[K, V](numeric), "sum")
+  }
+}
+
+object Stream {
+
+  def apply[T](graph: Graph[Op, OpEdge], node: Op, edge: Option[OpEdge]): Stream[T] = {
+    new Stream[T](graph, node, edge)
+  }
+
+  def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1
+
+  def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V]
+  = (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2))
+
+  implicit def streamToKVStream[K, V](stream: Stream[Tuple2[K, V]]): KVStream[K, V] = {
+    new KVStream(stream)
+  }
+
+  implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable {
+    def sink(dataSink: DataSink, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = {
+      implicit val sink = DataSinkOp(dataSink, parallelism, conf, description)
+      stream.graph.addVertex(sink)
+      stream.graph.addEdge(stream.thisNode, Shuffle, sink)
+      new Stream[T](stream.graph, sink)
+    }
+  }
+}
+
+class LoggerSink[T] extends DataSink {
+  var logger: Logger = _
+
+  override def open(context: TaskContext): Unit = {
+    this.logger = context.logger
+  }
+
+  override def write(message: Message): Unit = {
+    logger.info("logging message " + message.msg)
+  }
+
+  override def close(): Unit = Unit
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
new file mode 100644
index 0000000..d6eed2e
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.scalaapi
+
+import java.time.Instant
+
+import akka.actor.ActorSystem
+import org.apache.gearpump.Message
+import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.StreamApplication
+import org.apache.gearpump.streaming.dsl.plan._
+import org.apache.gearpump.streaming.source.DataSource
+import org.apache.gearpump.streaming.task.TaskContext
+import org.apache.gearpump.util.Graph
+
+import scala.language.implicitConversions
+
+/**
+ * Example:
+ * {{{
+ * val data = "This is a good start, bingo!! bingo!!"
+ * app.fromCollection(data.lines.toList).
+ * // word => (word, count)
+ * flatMap(line => line.split("[\\s]+")).map((_, 1)).
+ * // (word, count1), (word, count2) => (word, count1 + count2)
+ * groupBy(kv => kv._1).reduce(sum(_, _))
+ *
+ * val appId = context.submit(app)
+ * context.close()
+ * }}}
+ *
+ * @param name name of app
+ */
+class StreamApp(
+    name: String, system: ActorSystem, userConfig: UserConfig,
+    private val graph: Graph[Op, OpEdge]) {
+
+  def this(name: String, system: ActorSystem, userConfig: UserConfig) = {
+    this(name, system, userConfig, Graph.empty[Op, OpEdge])
+  }
+
+  def plan(): StreamApplication = {
+    implicit val actorSystem = system
+    val planner = new Planner
+    val dag = planner.plan(graph)
+    StreamApplication(name, dag, userConfig)
+  }
+}
+
+object StreamApp {
+  def apply(name: String, context: ClientContext, userConfig: UserConfig = UserConfig.empty)
+    : StreamApp = {
+    new StreamApp(name, context.system, userConfig)
+  }
+
+  implicit def streamAppToApplication(streamApp: StreamApp): StreamApplication = {
+    streamApp.plan()
+  }
+
+  implicit class Source(app: StreamApp) extends java.io.Serializable {
+
+    def source[T](dataSource: DataSource, parallelism: Int = 1,
+        conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = {
+      implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description)
+      app.graph.addVertex(sourceOp)
+      new Stream[T](app.graph, sourceOp)
+    }
+
+    def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = {
+      this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description)
+    }
+  }
+}
+
+/** A test message source which generated message sequence repeatedly. */
+class CollectionDataSource[T](seq: Seq[T]) extends DataSource {
+  private lazy val iterator: Iterator[T] = seq.iterator
+
+  override def open(context: TaskContext, startTime: Instant): Unit = {}
+
+  override def read(): Message = {
+    if (iterator.hasNext) {
+      Message(iterator.next(), Instant.now().toEpochMilli)
+    } else {
+      null
+    }
+  }
+
+  override def close(): Unit = {}
+
+  override def getWatermark: Instant = Instant.now()
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..f10a3db
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
+
+import scala.collection.JavaConverters._
+
+object FlatMapFunction {
+
+  def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[R] = {
+        fn.apply(t).asScala
+      }
+
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+
+  def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+      override def apply(t: T): TraversableOnce[R] = {
+        fn(t)
+      }
+    }
+  }
+
+  def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = {
+    new FlatMapFunction[T, R] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[R] = {
+        Option(fn(t))
+      }
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+
+  def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = {
+    new FlatMapFunction[T, T] {
+
+      override def setup(): Unit = {
+        fn.setup()
+      }
+
+      override def apply(t: T): TraversableOnce[T] = {
+        if (fn(t)) {
+          Option(t)
+        } else {
+          None
+        }
+      }
+
+      override def teardown(): Unit = {
+        fn.teardown()
+      }
+    }
+  }
+}
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Scala version of FlatMapFunction returns a TraversableOnce.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+  def apply(t: T): TraversableOnce[R]
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
new file mode 100644
index 0000000..ab88bf1
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+/**
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[FlatMapFunction]].
+ */
+abstract class SerializableFunction extends java.io.Serializable {
+
+  def setup(): Unit = {}
+
+  def teardown(): Unit = {}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
index 4ee2fa8..06f2964 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
index 4b7649f..0674339 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala
@@ -19,7 +19,6 @@ package org.apache.gearpump.streaming.dsl.task
 
 import java.time.Instant
 
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
index 980a54b..78ba762 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala
@@ -21,7 +21,6 @@ import java.time.Instant
 import java.util.concurrent.TimeUnit
 
 import akka.actor.Actor.Receive
-import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index e35f085..f8fbefa 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -17,21 +17,26 @@
  */
 package org.apache.gearpump.streaming.dsl.task
 
+import java.time.Instant
+
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
 import org.apache.gearpump.streaming.Constants._
 import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 
-class TransformTask[IN, OUT](
-    operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
-    userConf: UserConfig) extends Task(taskContext, userConf) {
+class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
+    taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
 
   def this(taskContext: TaskContext, userConf: UserConfig) = {
     this(userConf.getValue[SingleInputFunction[IN, OUT]](
       GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf)
   }
 
+  override def onStart(startTime: Instant): Unit = {
+    operator.foreach(_.setup())
+  }
+
   override def onNext(msg: Message): Unit = {
     val time = msg.timestamp
 
@@ -44,4 +49,8 @@ class TransformTask[IN, OUT](
         taskContext.output(new Message(msg.msg, time))
     }
   }
+
+  override def onStop(): Unit = {
+    operator.foreach(_.teardown())
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index 9af5e61..223a4af 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -22,12 +22,13 @@ import java.time.Instant
 import akka.actor.ActorSystem
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.gs.collections.api.block.procedure.Procedure
-import org.apache.gearpump.gs.collections.impl.list.mutable.FastList
-import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap
-import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import com.gs.collections.api.block.procedure.Procedure
+import com.gs.collections.impl.list.mutable.FastList
+import com.gs.collections.impl.map.mutable.UnifiedMap
+import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
+import com.gs.collections.impl.set.mutable.UnifiedSet
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction}
 import org.apache.gearpump.streaming.dsl.window.api.Discarding
 import org.apache.gearpump.streaming.task.TaskContext
 import org.apache.gearpump.util.LogUtil
@@ -38,7 +39,6 @@ trait WindowRunner {
   def process(message: Message): Unit
 
   def trigger(time: Instant): Unit
-
 }
 
 object DefaultWindowRunner {
@@ -46,18 +46,6 @@ object DefaultWindowRunner {
   private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]])
 
   case class WindowGroup[GROUP](bucket: Bucket, group: GROUP)
-    extends Comparable[WindowGroup[GROUP]] {
-    override def compareTo(o: WindowGroup[GROUP]): Int = {
-      val ret = bucket.compareTo(o.bucket)
-      if (ret != 0) {
-        ret
-      } else if (group.equals(o.group)) {
-        0
-      } else {
-        -1
-      }
-    }
-  }
 }
 
 class DefaultWindowRunner[IN, GROUP, OUT](
@@ -66,20 +54,27 @@ class DefaultWindowRunner[IN, GROUP, OUT](
   extends WindowRunner {
   import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._
 
-  private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]]
+  private val windows = new TreeSortedMap[Bucket, UnifiedSet[WindowGroup[GROUP]]]
+  private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
   private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
 
-
   override def process(message: Message): Unit = {
     val (group, buckets) = groupBy.groupBy(message)
     buckets.foreach { bucket =>
       val wg = WindowGroup(bucket, group)
+      val wgs = windows.getOrDefault(bucket, new UnifiedSet[WindowGroup[GROUP]](1))
+      wgs.add(wg)
+      windows.put(bucket, wgs)
+
       val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1))
       inputs.add(message.msg.asInstanceOf[IN])
       windowGroups.put(wg, inputs)
     }
-    groupFns.putIfAbsent(group,
-      userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+    if (!groupFns.containsKey(group)) {
+      val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
+      fn.setup()
+      groupFns.put(group, fn)
+    }
   }
 
   override def trigger(time: Instant): Unit = {
@@ -87,21 +82,28 @@ class DefaultWindowRunner[IN, GROUP, OUT](
 
     @annotation.tailrec
     def onTrigger(): Unit = {
-      if (windowGroups.notEmpty()) {
-        val first = windowGroups.firstKey
-        if (!time.isBefore(first.bucket.endTime)) {
-          val inputs = windowGroups.remove(first)
-          val reduceFn = groupFns.get(first.group)
-            .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
-          inputs.forEach(new Procedure[IN] {
-            override def value(t: IN): Unit = {
-              reduceFn.process(t)
+      if (windows.notEmpty()) {
+        val first = windows.firstKey
+        if (!time.isBefore(first.endTime)) {
+          val wgs = windows.remove(first)
+          wgs.forEach(new Procedure[WindowGroup[GROUP]] {
+            override def value(each: WindowGroup[GROUP]): Unit = {
+              val inputs = windowGroups.remove(each)
+              val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time)))
+              inputs.forEach(new Procedure[IN] {
+                override def value(t: IN): Unit = {
+                  // .toList forces eager evaluation
+                  reduceFn.process(t).toList
+                }
+              })
+              // .toList forces eager evaluation
+              reduceFn.finish().toList
+              if (groupBy.window.accumulationMode == Discarding) {
+                reduceFn.teardown()
+              }
             }
           })
-          reduceFn.finish()
-          if (groupBy.window.accumulationMode == Discarding) {
-            reduceFn.clearState()
-          }
+
           onTrigger()
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
index d045def..8f8b7ab 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/metrics/ProcessorAggregator.scala
@@ -20,12 +20,11 @@ package org.apache.gearpump.streaming.metrics
 
 import java.util
 
+import com.google.common.collect.Iterators
 import com.typesafe.config.Config
-
 import org.apache.gearpump.TimeStamp
 import org.apache.gearpump.cluster.ClientToMaster.ReadOption
 import org.apache.gearpump.cluster.MasterToClient.HistoryMetricsItem
-import org.apache.gearpump.google.common.collect.Iterators
 import org.apache.gearpump.metrics.Metrics.{Histogram, Meter}
 import org.apache.gearpump.metrics.MetricsAggregator
 import org.apache.gearpump.streaming.metrics.ProcessorAggregator._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
new file mode 100644
index 0000000..9b63e04
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/BroadcastPartitioner.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/** Used by storm module to broadcast message to all downstream tasks  */
+class BroadcastPartitioner extends MulticastPartitioner {
+  private var lastPartitionNum = -1
+  private var partitions = Array.empty[Int]
+
+  override def getPartitions(
+      msg: Message, partitionNum: Int, currentPartitionId: Int): Array[Int] = {
+    if (partitionNum != lastPartitionNum) {
+      partitions = (0 until partitionNum).toArray
+      lastPartitionNum = partitionNum
+    }
+    partitions
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
new file mode 100644
index 0000000..4cb1bad
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/CoLocationPartitioner.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Will have the same parallelism with last processor
+ * And each task in current processor will co-locate with task of last processor
+ */
+class CoLocationPartitioner extends UnicastPartitioner {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    currentPartitionId
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
new file mode 100644
index 0000000..6137705
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/HashPartitioner.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+/**
+ * Only make sense when the message has implemented the hashCode()
+ * Otherwise, it will use Object.hashCode(), which will not return
+ * same hash code after serialization and deserialization.
+ */
+class HashPartitioner extends UnicastPartitioner {
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    (msg.msg.hashCode() & Integer.MAX_VALUE) % partitionNum
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
new file mode 100644
index 0000000..f685cc9
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/Partitioner.scala
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.commons.lang.SerializationUtils
+import org.apache.gearpump.Message
+
+import scala.reflect.ClassTag
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), partitioner decide how ONE task
+ * of upstream processor A send to several tasks of downstream processor B.
+ */
+sealed trait Partitioner extends Serializable
+
+/**
+ * For processor chain: A (3 tasks) {@literal ->} B (3 tasks), UnicastPartitioner does
+ * ONE-task {@literal ->} ONE-task mapping.
+ */
+trait UnicastPartitioner extends Partitioner {
+
+  /**
+   * Gets the SINGLE downstream processor task index to send message to.
+   *
+   * @param msg Message you want to send
+   * @param partitionNum How many tasks does the downstream processor have.
+   * @param upstreamTaskIndex Upstream task's task index who trigger the getPartition() call.
+   *
+   * @return ONE task index of downstream processor.
+   */
+  def getPartition(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Int
+
+  def getPartition(msg: Message, partitionNum: Int): Int = {
+    getPartition(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+  }
+}
+
+trait MulticastPartitioner extends Partitioner {
+
+  /**
+   * Gets a list of downstream processor task indexes to send message to.
+   *
+   * @param upstreamTaskIndex Current sender task's task index.
+   *
+   */
+  def getPartitions(msg: Message, partitionNum: Int, upstreamTaskIndex: Int): Array[Int]
+
+  def getPartitions(msg: Message, partitionNum: Int): Array[Int] = {
+    getPartitions(msg, partitionNum, Partitioner.UNKNOWN_PARTITION_ID)
+  }
+}
+
+sealed trait PartitionerFactory {
+
+  def name: String
+
+  def partitioner: Partitioner
+}
+
+/** Stores the Partitioner in an object. To use it, user need to deserialize the object */
+class PartitionerObject(private[this] val _partitioner: Partitioner)
+  extends PartitionerFactory with Serializable {
+
+  override def name: String = partitioner.getClass.getName
+
+  override def partitioner: Partitioner = {
+    SerializationUtils.clone(_partitioner).asInstanceOf[Partitioner]
+  }
+}
+
+/** Store the partitioner in class Name, the user need to instantiate a new class */
+class PartitionerByClassName(partitionerClass: String)
+  extends PartitionerFactory with Serializable {
+
+  override def name: String = partitionerClass
+  override def partitioner: Partitioner = {
+    Class.forName(partitionerClass).newInstance().asInstanceOf[Partitioner]
+  }
+}
+
+/**
+ * @param partitionerFactory How we construct a Partitioner.
+ */
+case class PartitionerDescription(partitionerFactory: PartitionerFactory)
+
+object Partitioner {
+  val UNKNOWN_PARTITION_ID = -1
+
+  def apply[T <: Partitioner](implicit clazz: ClassTag[T]): PartitionerDescription = {
+    PartitionerDescription(new PartitionerByClassName(clazz.runtimeClass.getName))
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
new file mode 100644
index 0000000..1b223e0
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShuffleGroupingPartitioner.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import org.apache.gearpump.Message
+
+import scala.util.Random
+
+/**
+ * The idea of ShuffleGroupingPartitioner is derived from Storm.
+ * Messages are randomly distributed across the downstream's tasks in a way such that
+ * each task is guaranteed to get an equal number of messages.
+ */
+class ShuffleGroupingPartitioner extends UnicastPartitioner {
+  private val random = new Random
+  private var index = -1
+  private var partitions = List.empty[Int]
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+    index += 1
+    if (partitions.isEmpty) {
+      partitions = 0.until(partitionNum).toList
+      partitions = random.shuffle(partitions)
+    } else if (index >= partitionNum) {
+      index = 0
+      partitions = random.shuffle(partitions)
+    }
+    partitions(index)
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
new file mode 100644
index 0000000..39d5e3b
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/partitioner/ShufflePartitioner.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.partitioner
+
+import java.util.Random
+
+import org.apache.gearpump.Message
+
+/**
+ * Round Robin partition the data to downstream processor tasks.
+ */
+class ShufflePartitioner extends UnicastPartitioner {
+  private var seed = 0
+  private var count = 0
+
+  override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = {
+
+    if (seed == 0) {
+      seed = newSeed()
+    }
+
+    val result = ((count + seed) & Integer.MAX_VALUE) % partitionNum
+    count = count + 1
+    result
+  }
+
+  private def newSeed(): Int = new Random().nextInt()
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
index 535497c..450f2d6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala
@@ -69,6 +69,7 @@ class DataSourceTask[IN, OUT] private[source](
   override def onStart(startTime: Instant): Unit = {
     LOG.info(s"opening data source at $startTime")
     source.open(context, startTime)
+    operator.foreach(_.setup())
 
     self ! Watermark(source.getWatermark)
   }
@@ -82,6 +83,7 @@ class DataSourceTask[IN, OUT] private[source](
   }
 
   override def onStop(): Unit = {
+    operator.foreach(_.teardown())
     LOG.info("closing data source...")
     source.close()
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
index 902c663..3b32163 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/SerializerResolver.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.task
 
-import org.apache.gearpump.esotericsoftware.kryo.util.{IntMap, ObjectMap}
+import com.esotericsoftware.kryo.util.{IntMap, ObjectMap}
 import org.apache.gearpump.streaming.task.SerializerResolver.Registration
 
 private[task] class SerializerResolver {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
index 692d7f9..5c99980 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscriber.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming.task
 
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.{DAG, LifeTime}
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
index d9fbc82..4193fbf 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala
@@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.task
 
 import org.slf4j.Logger
 
-import org.apache.gearpump.google.common.primitives.Shorts
-import org.apache.gearpump.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
+import com.google.common.primitives.Shorts
+import org.apache.gearpump.streaming.partitioner.{MulticastPartitioner, Partitioner, UnicastPartitioner}
 import org.apache.gearpump.streaming.AppMasterToExecutor.MsgLostException
 import org.apache.gearpump.streaming.LifeTime
 import org.apache.gearpump.streaming.task.Subscription._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
index f72e5b8..92f6672 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala
@@ -23,17 +23,17 @@ import java.util
 import java.util.concurrent.TimeUnit
 
 import akka.actor._
+import com.gs.collections.impl.map.mutable.primitive.IntShortHashMap
+import org.apache.gearpump.streaming.source.Watermark
+import org.slf4j.Logger
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap
 import org.apache.gearpump.metrics.Metrics
 import org.apache.gearpump.serializer.SerializationFramework
 import org.apache.gearpump.streaming.AppMasterToExecutor._
 import org.apache.gearpump.streaming.ExecutorToAppMaster._
 import org.apache.gearpump.streaming.ProcessorId
-import org.apache.gearpump.streaming.source.Watermark
 import org.apache.gearpump.util.{LogUtil, TimeOutScheduler}
 import org.apache.gearpump.{Message, TimeStamp}
-import org.slf4j.Logger
 
 /**
  *
@@ -52,9 +52,10 @@ class TaskActor(
 
   def serializerPool: SerializationFramework = inputSerializerPool
 
+  import taskContextData._
+
   import org.apache.gearpump.streaming.Constants._
   import org.apache.gearpump.streaming.task.TaskActor._
-  import taskContextData._
   val config = context.system.settings.config
 
   val LOG: Logger = LogUtil.getLogger(getClass, app = appId, executor = executorId, task = taskId)
@@ -75,9 +76,9 @@ class TaskActor(
   private var life = taskContextData.life
 
   // Latency probe
-  import context.dispatcher
-
   import scala.concurrent.duration._
+
+  import context.dispatcher
   final val LATENCY_PROBE_INTERVAL = FiniteDuration(1, TimeUnit.SECONDS)
 
   // Clock report interval

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
index 5f4faee..ccda8f0 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/DAGSpec.scala
@@ -18,7 +18,7 @@
 
 package org.apache.gearpump.streaming
 
-import org.apache.gearpump.partitioner.PartitionerDescription
+import org.apache.gearpump.streaming.partitioner.PartitionerDescription
 import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.util.Graph
 import org.apache.gearpump.util.Graph.Node

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
index e461ae8..29dfc57 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/AppMasterSpec.scala
@@ -34,7 +34,7 @@ import org.apache.gearpump.cluster.master.MasterProxy
 import org.apache.gearpump.cluster.scheduler.{Resource, ResourceAllocation, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.HashPartitioner
+import org.apache.gearpump.streaming.partitioner.HashPartitioner
 import org.apache.gearpump.streaming.AppMasterToExecutor.StopTask
 import org.apache.gearpump.streaming.ExecutorToAppMaster.{MessageLoss, UnRegisterTask}
 import org.apache.gearpump.streaming.appmaster.AppMaster.{TaskActorRef, LookupTaskActorRef}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
index d42fe6f..46175a4 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/ClockServiceSpec.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.appmaster
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.{ImplicitSender, TestKit, TestProbe}
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.appmaster.ClockService.{ChangeToNewDAG, ChangeToNewDAGSuccess, HealthChecker, ProcessorClock}
 import org.apache.gearpump.streaming.appmaster.ClockServiceSpec.Store
 import org.apache.gearpump.streaming.storage.AppDataStore

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
index be3b3b7..adde927 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/DagManagerSpec.scala
@@ -22,7 +22,7 @@ package org.apache.gearpump.streaming.appmaster
 import akka.actor.{ActorSystem, Props}
 import akka.testkit.TestProbe
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.DagManager.{DAGOperationFailed, DAGOperationSuccess, GetLatestDAG, GetTaskLaunchData, LatestDAG, NewDAGDeployed, ReplaceProcessor, TaskLaunchData, WatchChange}
 import org.apache.gearpump.streaming.task.{Subscriber, TaskActor}
 import org.apache.gearpump.streaming._

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
index 5f6dd04..def9d44 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/JarSchedulerSpec.scala
@@ -22,7 +22,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{AppJar, TestUtil}
 import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
 import org.apache.gearpump.streaming.task.TaskId
 import org.apache.gearpump.streaming.{DAG, ProcessorDescription, _}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
index 54ecde1..bcf96e4 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskManagerSpec.scala
@@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.scheduler.{Resource, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{AppJar, TestUtil, UserConfig}
 import org.apache.gearpump.jarstore.FilePath
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner, PartitionerDescription}
 import org.apache.gearpump.streaming.AppMasterToExecutor.{LaunchTasks, StartAllTasks, StartDynamicDag, TaskLocationsReady, TaskLocationsReceived, TaskRegistered}
 import org.apache.gearpump.streaming.ExecutorToAppMaster.RegisterTask
 import org.apache.gearpump.streaming.appmaster.AppMaster.AllocateResourceTimeOut

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
index 864aa93..1bfde94 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/appmaster/TaskSchedulerSpec.scala
@@ -21,7 +21,7 @@ import com.typesafe.config.ConfigFactory
 import org.apache.gearpump.cluster.scheduler.{Relaxation, Resource, ResourceRequest}
 import org.apache.gearpump.cluster.worker.WorkerId
 import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{HashPartitioner, Partitioner}
+import org.apache.gearpump.streaming.partitioner.{HashPartitioner, Partitioner}
 import org.apache.gearpump.streaming.appmaster.TaskLocator.Localities
 import org.apache.gearpump.streaming.appmaster.TaskSchedulerSpec.{TestTask1, TestTask2}
 import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskId}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
deleted file mode 100644
index e0407ec..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor.ActorSystem
-import org.apache.gearpump.cluster.TestUtil
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.util.Graph
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
-
-  implicit var system: ActorSystem = _
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "be able to generate multiple new streams" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val dsl = StreamApp("dsl", context)
-    dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
-    dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
-
-    val application = dsl.plan()
-    application shouldBe a [StreamApplication]
-    application.name shouldBe "dsl"
-    val dag = application.userConfig
-      .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
-    dag.vertices.size shouldBe 2
-    dag.vertices.foreach { processor =>
-      processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName
-      if (processor.description == "A") {
-        processor.parallelism shouldBe 2
-      } else if (processor.description == "B") {
-        processor.parallelism shouldBe 3
-      } else {
-        fail(s"undefined source ${processor.description}")
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
deleted file mode 100644
index fdc721b..0000000
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.dsl
-
-import akka.actor._
-import org.apache.gearpump.Message
-import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.dsl.StreamSpec.Join
-import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
-import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
-import org.apache.gearpump.streaming.source.DataSourceTask
-import org.apache.gearpump.streaming.task.{Task, TaskContext}
-import org.apache.gearpump.util.Graph
-import org.apache.gearpump.util.Graph._
-import org.mockito.Mockito.when
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import scala.concurrent.Await
-import scala.concurrent.duration.Duration
-import scala.util.{Either, Left, Right}
-
-class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar {
-
-  implicit var system: ActorSystem = _
-
-  override def beforeAll(): Unit = {
-    system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-  }
-
-  override def afterAll(): Unit = {
-    system.terminate()
-    Await.result(system.whenTerminated, Duration.Inf)
-  }
-
-  it should "translate the DSL to a DAG" in {
-    val context: ClientContext = mock[ClientContext]
-    when(context.system).thenReturn(system)
-
-    val dsl = StreamApp("dsl", context)
-
-    val data =
-      """
-        five  four three  two    one
-        five  four three  two
-        five  four three
-        five  four
-        five
-      """
-    val stream = dsl.source(data.lines.toList, 1, "").
-      flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty).
-      map(word => (word, 1)).
-      groupBy(_._1, parallelism = 2).
-      reduce((left, right) => (left._1, left._2 + right._2)).
-      map[Either[(String, Int), String]](Left(_))
-
-    val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
-    stream.merge(query).process[(String, Int)](classOf[Join], 1)
-
-    val app: StreamApplication = dsl.plan()
-    val dag = app.userConfig
-      .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get
-
-    val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) =>
-      edge.partitionerFactory.partitioner.getClass.getName
-    }
-    val expectedDagTopology = getExpectedDagTopology
-
-    dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet
-    dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet
-  }
-
-  private def getExpectedDagTopology: Graph[String, String] = {
-    val source = classOf[DataSourceTask[_, _]].getName
-    val group = classOf[CountTriggerTask[_, _]].getName
-    val merge = classOf[TransformTask[_, _]].getName
-    val join = classOf[Join].getName
-
-    val hash = classOf[HashPartitioner].getName
-    val groupBy = classOf[GroupByPartitioner[_, _]].getName
-    val colocation = classOf[CoLocationPartitioner].getName
-
-    val expectedDagTopology = Graph(
-      source ~ groupBy ~> group ~ colocation ~> merge ~ hash ~> join,
-      source ~ hash ~> merge
-    )
-    expectedDagTopology
-  }
-}
-
-object StreamSpec {
-
-  class Join(taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
-
-    var query: String = _
-
-    override def onNext(msg: Message): Unit = {
-      msg.msg match {
-        case Left(wordCount: (String @unchecked, Int @unchecked)) =>
-          if (query != null && wordCount._1 == query) {
-            taskContext.output(new Message(wordCount))
-          }
-
-        case Right(query: String) =>
-          this.query = query
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/cc0578e5/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index bf52abc..f0920de 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,7 +25,8 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.Processor
 import org.apache.gearpump.streaming.Processor.DefaultProcessor
 import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
@@ -145,7 +146,6 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
 
       val chainedOp = chainableOp1.chain(chainableOp2)
 
-      verify(fn1).andThen(fn2)
       chainedOp shouldBe a[ChainableOp[_, _]]
 
       unchainableOps.foreach { op =>
@@ -155,12 +155,14 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
       }
     }
 
-    "throw exception on getProcessor" in {
-      val fn1 = mock[SingleInputFunction[Any, Any]]
-      val chainableOp1 = ChainableOp[Any, Any](fn1)
-      intercept[UnsupportedOperationException] {
-        chainableOp1.getProcessor
-      }
+    "get Processor" in {
+      val fn = mock[FlatMapFunction[Any, Any]]
+      val flatMapper = new FlatMapper(fn, "flatMap")
+      val chainableOp = ChainableOp[Any, Any](flatMapper)
+
+      val processor = chainableOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe 1
     }
   }