You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/06 14:15:31 UTC

incubator-gearpump git commit: [GEARPUMP-289] Add FoldFunction

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 4d8c02dfe -> d78683328


[GEARPUMP-289] Add FoldFunction

Author: manuzhang <ow...@gmail.com>

Closes #167 from manuzhang/fold.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/d7868332
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/d7868332
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/d7868332

Branch: refs/heads/master
Commit: d78683328e1fb6a1143c5bc448a627de90466d7c
Parents: 4d8c02d
Author: manuzhang <ow...@gmail.com>
Authored: Mon Mar 6 22:14:44 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Mar 6 22:15:08 2017 +0800

----------------------------------------------------------------------
 .../examples/wordcountjava/dsl/WordCount.java   |  8 +-
 .../dsl/api/functions/FilterFunction.scala      |  4 +-
 .../dsl/api/functions/FoldFunction.scala        | 34 ++++++++
 .../dsl/api/functions/MapFunction.scala         |  4 +-
 .../dsl/api/functions/ReduceFunction.scala      | 17 ++--
 .../streaming/dsl/javaapi/JavaStream.scala      | 12 ++-
 .../dsl/javaapi/functions/FlatMapFunction.scala |  2 +-
 .../dsl/javaapi/functions/GroupByFunction.scala |  9 +-
 .../dsl/plan/functions/FunctionRunner.scala     | 21 ++---
 .../streaming/dsl/scalaapi/Stream.scala         | 15 +++-
 .../scalaapi/functions/FlatMapFunction.scala    | 16 ++--
 .../streaming/dsl/plan/PlannerSpec.scala        |  8 +-
 .../dsl/plan/functions/FunctionRunnerSpec.scala | 88 +++++++++++---------
 13 files changed, 154 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 7d8400d..fd32408 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -95,7 +95,7 @@ public class WordCount {
   private static class Split extends FlatMapFunction<String, String> {
 
     @Override
-    public Iterator<String> apply(String s) {
+    public Iterator<String> flatMap(String s) {
       return Arrays.asList(s.split("\\s+")).iterator();
     }
   }
@@ -103,7 +103,7 @@ public class WordCount {
   private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
 
     @Override
-    public Tuple2<String, Integer> apply(String s) {
+    public Tuple2<String, Integer> map(String s) {
       return new Tuple2<>(s, 1);
     }
   }
@@ -111,7 +111,7 @@ public class WordCount {
   private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
 
     @Override
-    public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+    public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
       return new Tuple2<>(t1._1(), t1._2() + t2._2());
     }
   }
@@ -119,7 +119,7 @@ public class WordCount {
   private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
 
     @Override
-    public String apply(Tuple2<String, Integer> tuple) {
+    public String groupBy(Tuple2<String, Integer> tuple) {
       return tuple._1();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index e4e7309..25a0929 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -23,7 +23,7 @@ object FilterFunction {
 
   def apply[T](fn: T => Boolean): FilterFunction[T] = {
     new FilterFunction[T] {
-      override def apply(t: T): Boolean = {
+      override def filter(t: T): Boolean = {
         fn(t)
       }
     }
@@ -37,6 +37,6 @@ object FilterFunction {
  */
 abstract class FilterFunction[T] extends SerializableFunction {
 
-  def apply(t: T): Boolean
+  def filter(t: T): Boolean
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
new file mode 100644
index 0000000..9ff44a8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+/**
+ * Combines input into an accumulator.
+ *
+ * @param A type of accumulator
+ * @param T Type of input
+ */
+abstract class FoldFunction[T, A] extends SerializableFunction {
+
+  def init: A
+
+  def fold(accumulator: A, t: T): A
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index 70fe9d4..a4fdca6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -23,7 +23,7 @@ object MapFunction {
 
   def apply[T, R](fn: T => R): MapFunction[T, R] = {
     new MapFunction[T, R] {
-      override def apply(t: T): R = {
+      override def map(t: T): R = {
         fn(t)
       }
     }
@@ -38,6 +38,6 @@ object MapFunction {
  */
 abstract class MapFunction[T, R] extends SerializableFunction {
 
-  def apply(t: T): R
+  def map(t: T): R
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
index 25b12be..25f157b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
@@ -17,13 +17,11 @@
  */
 package org.apache.gearpump.streaming.dsl.api.functions
 
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
 object ReduceFunction {
 
   def apply[T](fn: (T, T) => T): ReduceFunction[T] = {
     new ReduceFunction[T] {
-      override def apply(t1: T, t2: T): T = {
+      override def reduce(t1: T, t2: T): T = {
         fn(t1, t2)
       }
     }
@@ -35,8 +33,17 @@ object ReduceFunction {
  *
  * @param T Type of both inputs and output
  */
-abstract class ReduceFunction[T] extends SerializableFunction {
+abstract class ReduceFunction[T] extends FoldFunction[T, Option[T]] {
+
+  override def init: Option[T] = None
 
-  def apply(t1: T, t2: T): T
+  override def fold(accumulator: Option[T], t: T): Option[T] = {
+    if (accumulator.isEmpty) {
+      Option(t)
+    } else {
+      accumulator.map(reduce(_, t))
+    }
+  }
 
+  def reduce(t1: T, t2: T): T
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 592c4dc..da0e4db 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -18,8 +18,8 @@
 package org.apache.gearpump.streaming.dsl.javaapi
 
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
-import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
 import org.apache.gearpump.streaming.dsl.window.api.Windows
@@ -45,6 +45,10 @@ class JavaStream[T](val stream: Stream[T]) {
     new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
   }
 
+  def fold[A](fn: FoldFunction[T, A], description: String): JavaStream[A] = {
+    new JavaStream[A](stream.fold(fn, description))
+  }
+
   /** Does aggregation on the stream */
   def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
     new JavaStream[T](stream.reduce(fn, description))
@@ -65,7 +69,7 @@ class JavaStream[T](val stream: Stream[T]) {
    */
   def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
       parallelism: Int, description: String): JavaStream[T] = {
-    new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
+    new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
   }
 
   def window(win: Windows[T], description: String): JavaWindowStream[T] = {
@@ -84,6 +88,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) {
 
   def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
       description: String): JavaStream[T] = {
-    new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
+    new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index 85d597d..11e2416 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -28,5 +28,5 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
  */
 abstract class FlatMapFunction[T, R] extends SerializableFunction {
 
-  def apply(t: T): java.util.Iterator[R]
+  def flatMap(t: T): java.util.Iterator[R]
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
index 7656cba..5a86a86 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -25,4 +25,11 @@ import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
  * @param T Input value type
  * @param GROUP Group value type
  */
-abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP] {
+
+  override def map(t: T): GROUP = {
+    groupBy(t)
+  }
+
+  def groupBy(t: T): GROUP
+}

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index 9dfa6ad..c27300f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.gearpump.streaming.dsl.plan.functions
 
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 
 object FunctionRunner {
@@ -88,7 +88,7 @@ class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
   }
 
   override def process(value: IN): TraversableOnce[OUT] = {
-    fn(value)
+    fn.flatMap(value)
   }
 
   override def teardown(): Unit = {
@@ -96,25 +96,22 @@ class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
   }
 }
 
-class Reducer[T](fn: ReduceFunction[T], val description: String)
-  extends FunctionRunner[T, T] {
+class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String)
+  extends FunctionRunner[T, A] {
 
-  private var state: Option[T] = None
+  private var state: Option[A] = None
 
   override def setup(): Unit = {
     fn.setup()
+    state = Option(fn.init)
   }
 
-  override def process(value: T): TraversableOnce[T] = {
-    if (state.isEmpty) {
-      state = Option(value)
-    } else {
-      state = state.map(fn(_, value))
-    }
+  override def process(value: T): TraversableOnce[A] = {
+    state = state.map(fn.fold(_, value))
     None
   }
 
-  override def finish(): TraversableOnce[T] = {
+  override def finish(): TraversableOnce[A] = {
     state
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index f71276b..9a614e8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.scalaapi
 
 import org.apache.gearpump.Message
 import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.plan._
 import org.apache.gearpump.streaming.dsl.plan.functions._
@@ -100,6 +100,17 @@ class Stream[T](
   def filter(fn: FilterFunction[T], description: String): Stream[T] = {
     this.flatMap(FlatMapFunction(fn), description)
   }
+
+  /**
+   * Returns a new stream by applying a fold function over all the elements
+   *
+   * @param fn fold function
+   * @return a new stream after fold
+   */
+  def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
+    transform(new FoldRunner(fn, description))
+  }
+
   /**
    * Returns a new stream by applying a reduce function over all the elements.
    *
@@ -119,7 +130,7 @@ class Stream[T](
    * @return a new stream after reduce
    */
   def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
-    transform(new Reducer[T](fn, description))
+    fold(fn, description).map(_.get)
   }
 
   private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = {

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
index f10a3db..252b5bd 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -31,8 +31,8 @@ object FlatMapFunction {
         fn.setup()
       }
 
-      override def apply(t: T): TraversableOnce[R] = {
-        fn.apply(t).asScala
+      override def flatMap(t: T): TraversableOnce[R] = {
+        fn.flatMap(t).asScala
       }
 
 
@@ -44,7 +44,7 @@ object FlatMapFunction {
 
   def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
     new FlatMapFunction[T, R] {
-      override def apply(t: T): TraversableOnce[R] = {
+      override def flatMap(t: T): TraversableOnce[R] = {
         fn(t)
       }
     }
@@ -57,8 +57,8 @@ object FlatMapFunction {
         fn.setup()
       }
 
-      override def apply(t: T): TraversableOnce[R] = {
-        Option(fn(t))
+      override def flatMap(t: T): TraversableOnce[R] = {
+        Option(fn.map(t))
       }
 
       override def teardown(): Unit = {
@@ -74,8 +74,8 @@ object FlatMapFunction {
         fn.setup()
       }
 
-      override def apply(t: T): TraversableOnce[T] = {
-        if (fn(t)) {
+      override def flatMap(t: T): TraversableOnce[T] = {
+        if (fn.filter(t)) {
           Option(t)
         } else {
           None
@@ -98,6 +98,6 @@ object FlatMapFunction {
  */
 abstract class FlatMapFunction[T, R] extends SerializableFunction {
 
-  def apply(t: T): TraversableOnce[R]
+  def flatMap(t: T): TraversableOnce[R]
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 2e4bbb3..70abde9 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -27,13 +27,13 @@ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
 import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
 import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
 import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FoldRunner}
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.window.api.CountWindows
 import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
 import org.apache.gearpump.streaming.sink.DataSink
 import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.{MockUtil, Processor}
+import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.task.{Task, TaskContext}
 import org.apache.gearpump.util.Graph
 import org.scalatest.mock.MockitoSugar
@@ -61,7 +61,7 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
     val groupBy = GroupAlsoByWindow((any: Any) => any, CountWindows.apply[Any](1))
     val groupByOp = GroupByOp(groupBy)
     val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
-    val reduceOp = ChainableOp[Any, Any](anyReducer)
+    val reduceOp = ChainableOp[Any, Option[Any]](anyReducer)
     val processorOp = new ProcessorOp[AnyTask]
     val sinkOp = DataSinkOp(new AnySink)
     val directEdge = Direct
@@ -97,7 +97,7 @@ object PlannerSpec {
 
   private val anyFlatMapper = new FlatMapper[Any, Any](
     FlatMapFunction(Option(_)), "flatMap")
-  private val anyReducer = new Reducer[Any](
+  private val anyReducer = new FoldRunner[Any, Option[Any]](
     ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
 
   class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index d26b7d9..a9b23fe 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
 import org.apache.gearpump.streaming.MockUtil
 import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark}
 import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
 import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
 import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
 import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
@@ -102,7 +102,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
 
       val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map")
 
-      val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum")
+      val sum = new FoldRunner[Int, Option[Int]](
+        ReduceFunction({(left, right) => left + right}), "sum")
 
       val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
 
@@ -116,11 +117,14 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
       five  four
       five
         """
+
+      all.setup()
       // force eager evaluation
       all.process(data).toList
-      val result = all.finish().toList
+      val result = all.finish().toList.map(_.get)
       assert(result.nonEmpty)
       assert(result.last == 15)
+      all.teardown()
     }
   }
 
@@ -132,7 +136,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
     "call flatMap function when processing input value" in {
       val input = mock[R]
       flatMapper.process(input)
-      verify(flatMapFunction).apply(input)
+      verify(flatMapFunction).flatMap(input)
     }
 
     "return passed in description" in {
@@ -156,54 +160,59 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
     }
   }
 
-  "ReduceFunction" should {
+  "FoldRunner" should {
 
-    "call reduce function when processing input value" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
+    "call fold function when processing input value" in {
+      val foldFunction = mock[FoldFunction[T, List[T]]]
+      val foldRunner = new FoldRunner[T, List[T]](foldFunction, "fold")
       val input1 = mock[T]
       val input2 = mock[T]
-      val output = mock[T]
-
-      when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
-
-      reducer.process(input1) shouldBe List.empty[T]
-      reducer.process(input2) shouldBe List.empty[T]
-      reducer.finish() shouldBe List(output)
 
-      reducer.teardown()
-      reducer.process(input1) shouldBe List.empty[T]
-      reducer.teardown()
-      reducer.process(input2) shouldBe List.empty[T]
-      reducer.finish() shouldBe List(input2)
+      when(foldFunction.init).thenReturn(Nil)
+      when(foldFunction.fold(Nil, input1)).thenReturn(List(input1))
+      when(foldFunction.fold(Nil, input2)).thenReturn(List(input2))
+      when(foldFunction.fold(List(input1), input2)).thenReturn(List(input1, input2))
+
+      foldRunner.setup()
+      foldRunner.process(input1) shouldBe List.empty[T]
+      foldRunner.process(input2) shouldBe List.empty[T]
+      foldRunner.finish() shouldBe List(List(input1, input2))
+      foldRunner.teardown()
+
+      foldRunner.setup()
+      foldRunner.process(input1) shouldBe List.empty[T]
+      foldRunner.teardown()
+      foldRunner.setup()
+      foldRunner.process(input2) shouldBe List.empty[T]
+      foldRunner.finish() shouldBe List(List(input2))
     }
 
     "return passed in description" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.description shouldBe "reduce"
+      val foldFunction = mock[FoldFunction[S, T]]
+      val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+      foldRunner.description shouldBe "fold"
     }
 
     "return None on finish" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.finish() shouldBe List.empty[T]
+      val foldFunction = mock[FoldFunction[S, T]]
+      val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+      foldRunner.finish() shouldBe List.empty[T]
     }
 
-    "set up reduce function on setup" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.setup()
+    "set up fold function on setup" in {
+      val foldFunction = mock[FoldFunction[S, T]]
+      val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+      foldRunner.setup()
 
-      verify(reduceFunction).setup()
+      verify(foldFunction).setup()
     }
 
-    "tear down reduce function on teardown" in {
-      val reduceFunction = mock[ReduceFunction[T]]
-      val reducer = new Reducer[T](reduceFunction, "reduce")
-      reducer.teardown()
+    "tear down fold function on teardown" in {
+      val foldFunction = mock[FoldFunction[S, T]]
+      val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+      foldRunner.teardown()
 
-      verify(reduceFunction).teardown()
+      verify(foldFunction).teardown()
     }
   }
 
@@ -284,11 +293,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
 
       val data = "1 2  2  3 3  3"
 
-      val concat = new Reducer[String](ReduceFunction({ (left, right) =>
+      val concat = new FoldRunner[String, Option[String]](ReduceFunction({ (left, right) =>
         left + right}), "concat")
 
       implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
-      val config = UserConfig.empty.withValue[FunctionRunner[String, String]](
+      val config = UserConfig.empty.withValue[FunctionRunner[String, Option[String]]](
         GEARPUMP_STREAMING_OPERATOR, concat)
 
       val taskContext = MockUtil.mockTaskContext
@@ -307,7 +316,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
 
       import scala.collection.JavaConverters._
 
-      val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
+      val values = peopleCaptor.getAllValues.asScala.map(input =>
+        input.msg.asInstanceOf[Option[String]].get)
       assert(values.mkString(",") == "1,2,22,3,33,333")
       system.terminate()
       Await.result(system.whenTerminated, Duration.Inf)