You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2017/03/06 14:15:31 UTC
incubator-gearpump git commit: [GEARPUMP-289] Add FoldFunction
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 4d8c02dfe -> d78683328
[GEARPUMP-289] Add FoldFunction
Author: manuzhang <ow...@gmail.com>
Closes #167 from manuzhang/fold.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/d7868332
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/d7868332
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/d7868332
Branch: refs/heads/master
Commit: d78683328e1fb6a1143c5bc448a627de90466d7c
Parents: 4d8c02d
Author: manuzhang <ow...@gmail.com>
Authored: Mon Mar 6 22:14:44 2017 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Mon Mar 6 22:15:08 2017 +0800
----------------------------------------------------------------------
.../examples/wordcountjava/dsl/WordCount.java | 8 +-
.../dsl/api/functions/FilterFunction.scala | 4 +-
.../dsl/api/functions/FoldFunction.scala | 34 ++++++++
.../dsl/api/functions/MapFunction.scala | 4 +-
.../dsl/api/functions/ReduceFunction.scala | 17 ++--
.../streaming/dsl/javaapi/JavaStream.scala | 12 ++-
.../dsl/javaapi/functions/FlatMapFunction.scala | 2 +-
.../dsl/javaapi/functions/GroupByFunction.scala | 9 +-
.../dsl/plan/functions/FunctionRunner.scala | 21 ++---
.../streaming/dsl/scalaapi/Stream.scala | 15 +++-
.../scalaapi/functions/FlatMapFunction.scala | 16 ++--
.../streaming/dsl/plan/PlannerSpec.scala | 8 +-
.../dsl/plan/functions/FunctionRunnerSpec.scala | 88 +++++++++++---------
13 files changed, 154 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index 7d8400d..fd32408 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -95,7 +95,7 @@ public class WordCount {
private static class Split extends FlatMapFunction<String, String> {
@Override
- public Iterator<String> apply(String s) {
+ public Iterator<String> flatMap(String s) {
return Arrays.asList(s.split("\\s+")).iterator();
}
}
@@ -103,7 +103,7 @@ public class WordCount {
private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
@Override
- public Tuple2<String, Integer> apply(String s) {
+ public Tuple2<String, Integer> map(String s) {
return new Tuple2<>(s, 1);
}
}
@@ -111,7 +111,7 @@ public class WordCount {
private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
@Override
- public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+ public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
return new Tuple2<>(t1._1(), t1._2() + t2._2());
}
}
@@ -119,7 +119,7 @@ public class WordCount {
private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
@Override
- public String apply(Tuple2<String, Integer> tuple) {
+ public String groupBy(Tuple2<String, Integer> tuple) {
return tuple._1();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index e4e7309..25a0929 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -23,7 +23,7 @@ object FilterFunction {
def apply[T](fn: T => Boolean): FilterFunction[T] = {
new FilterFunction[T] {
- override def apply(t: T): Boolean = {
+ override def filter(t: T): Boolean = {
fn(t)
}
}
@@ -37,6 +37,6 @@ object FilterFunction {
*/
abstract class FilterFunction[T] extends SerializableFunction {
- def apply(t: T): Boolean
+ def filter(t: T): Boolean
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
new file mode 100644
index 0000000..9ff44a8
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FoldFunction.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gearpump.streaming.dsl.api.functions
+
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
+
+/**
+ * Combines input into an accumulator.
+ *
+ * @param A type of accumulator
+ * @param T Type of input
+ */
+abstract class FoldFunction[T, A] extends SerializableFunction {
+
+ def init: A
+
+ def fold(accumulator: A, t: T): A
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index 70fe9d4..a4fdca6 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -23,7 +23,7 @@ object MapFunction {
def apply[T, R](fn: T => R): MapFunction[T, R] = {
new MapFunction[T, R] {
- override def apply(t: T): R = {
+ override def map(t: T): R = {
fn(t)
}
}
@@ -38,6 +38,6 @@ object MapFunction {
*/
abstract class MapFunction[T, R] extends SerializableFunction {
- def apply(t: T): R
+ def map(t: T): R
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
index 25b12be..25f157b 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
@@ -17,13 +17,11 @@
*/
package org.apache.gearpump.streaming.dsl.api.functions
-import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-
object ReduceFunction {
def apply[T](fn: (T, T) => T): ReduceFunction[T] = {
new ReduceFunction[T] {
- override def apply(t1: T, t2: T): T = {
+ override def reduce(t1: T, t2: T): T = {
fn(t1, t2)
}
}
@@ -35,8 +33,17 @@ object ReduceFunction {
*
* @param T Type of both inputs and output
*/
-abstract class ReduceFunction[T] extends SerializableFunction {
+abstract class ReduceFunction[T] extends FoldFunction[T, Option[T]] {
+
+ override def init: Option[T] = None
- def apply(t1: T, t2: T): T
+ override def fold(accumulator: Option[T], t: T): Option[T] = {
+ if (accumulator.isEmpty) {
+ Option(t)
+ } else {
+ accumulator.map(reduce(_, t))
+ }
+ }
+ def reduce(t1: T, t2: T): T
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index 592c4dc..da0e4db 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -18,8 +18,8 @@
package org.apache.gearpump.streaming.dsl.javaapi
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
-import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{GroupByFunction, FlatMapFunction => JFlatMapFunction}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
import org.apache.gearpump.streaming.dsl.window.api.Windows
@@ -45,6 +45,10 @@ class JavaStream[T](val stream: Stream[T]) {
new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
}
+ def fold[A](fn: FoldFunction[T, A], description: String): JavaStream[A] = {
+ new JavaStream[A](stream.fold(fn, description))
+ }
+
/** Does aggregation on the stream */
def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
new JavaStream[T](stream.reduce(fn, description))
@@ -65,7 +69,7 @@ class JavaStream[T](val stream: Stream[T]) {
*/
def groupBy[GROUP](fn: GroupByFunction[T, GROUP],
parallelism: Int, description: String): JavaStream[T] = {
- new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
+ new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
}
def window(win: Windows[T], description: String): JavaWindowStream[T] = {
@@ -84,6 +88,6 @@ class JavaWindowStream[T](stream: WindowStream[T]) {
def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int,
description: String): JavaStream[T] = {
- new JavaStream[T](stream.groupBy(fn.apply, parallelism, description))
+ new JavaStream[T](stream.groupBy(fn.groupBy, parallelism, description))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index 85d597d..11e2416 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -28,5 +28,5 @@ import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
*/
abstract class FlatMapFunction[T, R] extends SerializableFunction {
- def apply(t: T): java.util.Iterator[R]
+ def flatMap(t: T): java.util.Iterator[R]
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
index 7656cba..5a86a86 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -25,4 +25,11 @@ import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
* @param T Input value type
* @param GROUP Group value type
*/
-abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP] {
+
+ override def map(t: T): GROUP = {
+ groupBy(t)
+ }
+
+ def groupBy(t: T): GROUP
+}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
index 9dfa6ad..c27300f 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala
@@ -17,7 +17,7 @@
*/
package org.apache.gearpump.streaming.dsl.plan.functions
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
object FunctionRunner {
@@ -88,7 +88,7 @@ class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
}
override def process(value: IN): TraversableOnce[OUT] = {
- fn(value)
+ fn.flatMap(value)
}
override def teardown(): Unit = {
@@ -96,25 +96,22 @@ class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
}
}
-class Reducer[T](fn: ReduceFunction[T], val description: String)
- extends FunctionRunner[T, T] {
+class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String)
+ extends FunctionRunner[T, A] {
- private var state: Option[T] = None
+ private var state: Option[A] = None
override def setup(): Unit = {
fn.setup()
+ state = Option(fn.init)
}
- override def process(value: T): TraversableOnce[T] = {
- if (state.isEmpty) {
- state = Option(value)
- } else {
- state = state.map(fn(_, value))
- }
+ override def process(value: T): TraversableOnce[A] = {
+ state = state.map(fn.fold(_, value))
None
}
- override def finish(): TraversableOnce[T] = {
+ override def finish(): TraversableOnce[A] = {
state
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index f71276b..9a614e8 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -20,7 +20,7 @@ package org.apache.gearpump.streaming.dsl.scalaapi
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
-import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, FoldFunction, MapFunction, ReduceFunction}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.plan._
import org.apache.gearpump.streaming.dsl.plan.functions._
@@ -100,6 +100,17 @@ class Stream[T](
def filter(fn: FilterFunction[T], description: String): Stream[T] = {
this.flatMap(FlatMapFunction(fn), description)
}
+
+ /**
+ * Returns a new stream by applying a fold function over all the elements
+ *
+ * @param fn fold function
+ * @return a new stream after fold
+ */
+ def fold[A](fn: FoldFunction[T, A], description: String): Stream[A] = {
+ transform(new FoldRunner(fn, description))
+ }
+
/**
* Returns a new stream by applying a reduce function over all the elements.
*
@@ -119,7 +130,7 @@ class Stream[T](
* @return a new stream after reduce
*/
def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
- transform(new Reducer[T](fn, description))
+ fold(fn, description).map(_.get)
}
private def transform[R](fn: FunctionRunner[T, R]): Stream[R] = {
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
index f10a3db..252b5bd 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -31,8 +31,8 @@ object FlatMapFunction {
fn.setup()
}
- override def apply(t: T): TraversableOnce[R] = {
- fn.apply(t).asScala
+ override def flatMap(t: T): TraversableOnce[R] = {
+ fn.flatMap(t).asScala
}
@@ -44,7 +44,7 @@ object FlatMapFunction {
def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
new FlatMapFunction[T, R] {
- override def apply(t: T): TraversableOnce[R] = {
+ override def flatMap(t: T): TraversableOnce[R] = {
fn(t)
}
}
@@ -57,8 +57,8 @@ object FlatMapFunction {
fn.setup()
}
- override def apply(t: T): TraversableOnce[R] = {
- Option(fn(t))
+ override def flatMap(t: T): TraversableOnce[R] = {
+ Option(fn.map(t))
}
override def teardown(): Unit = {
@@ -74,8 +74,8 @@ object FlatMapFunction {
fn.setup()
}
- override def apply(t: T): TraversableOnce[T] = {
- if (fn(t)) {
+ override def flatMap(t: T): TraversableOnce[T] = {
+ if (fn.filter(t)) {
Option(t)
} else {
None
@@ -98,6 +98,6 @@ object FlatMapFunction {
*/
abstract class FlatMapFunction[T, R] extends SerializableFunction {
- def apply(t: T): TraversableOnce[R]
+ def flatMap(t: T): TraversableOnce[R]
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 2e4bbb3..70abde9 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -27,13 +27,13 @@ import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, FoldRunner}
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.window.api.CountWindows
import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
-import org.apache.gearpump.streaming.{MockUtil, Processor}
+import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
import org.scalatest.mock.MockitoSugar
@@ -61,7 +61,7 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc
val groupBy = GroupAlsoByWindow((any: Any) => any, CountWindows.apply[Any](1))
val groupByOp = GroupByOp(groupBy)
val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
- val reduceOp = ChainableOp[Any, Any](anyReducer)
+ val reduceOp = ChainableOp[Any, Option[Any]](anyReducer)
val processorOp = new ProcessorOp[AnyTask]
val sinkOp = DataSinkOp(new AnySink)
val directEdge = Direct
@@ -97,7 +97,7 @@ object PlannerSpec {
private val anyFlatMapper = new FlatMapper[Any, Any](
FlatMapFunction(Option(_)), "flatMap")
- private val anyReducer = new Reducer[Any](
+ private val anyReducer = new FoldRunner[Any, Option[Any]](
ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/d7868332/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
index d26b7d9..a9b23fe 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala
@@ -25,7 +25,7 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.MockUtil
import org.apache.gearpump.streaming.source.{DataSourceTask, Watermark}
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.api.functions.{FoldFunction, ReduceFunction}
import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.task.TransformTask.Transform
@@ -102,7 +102,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map")
- val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum")
+ val sum = new FoldRunner[Int, Option[Int]](
+ ReduceFunction({(left, right) => left + right}), "sum")
val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
@@ -116,11 +117,14 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
five four
five
"""
+
+ all.setup()
// force eager evaluation
all.process(data).toList
- val result = all.finish().toList
+ val result = all.finish().toList.map(_.get)
assert(result.nonEmpty)
assert(result.last == 15)
+ all.teardown()
}
}
@@ -132,7 +136,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
"call flatMap function when processing input value" in {
val input = mock[R]
flatMapper.process(input)
- verify(flatMapFunction).apply(input)
+ verify(flatMapFunction).flatMap(input)
}
"return passed in description" in {
@@ -156,54 +160,59 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
}
}
- "ReduceFunction" should {
+ "FoldRunner" should {
- "call reduce function when processing input value" in {
- val reduceFunction = mock[ReduceFunction[T]]
- val reducer = new Reducer[T](reduceFunction, "reduce")
+ "call fold function when processing input value" in {
+ val foldFunction = mock[FoldFunction[T, List[T]]]
+ val foldRunner = new FoldRunner[T, List[T]](foldFunction, "fold")
val input1 = mock[T]
val input2 = mock[T]
- val output = mock[T]
-
- when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
-
- reducer.process(input1) shouldBe List.empty[T]
- reducer.process(input2) shouldBe List.empty[T]
- reducer.finish() shouldBe List(output)
- reducer.teardown()
- reducer.process(input1) shouldBe List.empty[T]
- reducer.teardown()
- reducer.process(input2) shouldBe List.empty[T]
- reducer.finish() shouldBe List(input2)
+ when(foldFunction.init).thenReturn(Nil)
+ when(foldFunction.fold(Nil, input1)).thenReturn(List(input1))
+ when(foldFunction.fold(Nil, input2)).thenReturn(List(input2))
+ when(foldFunction.fold(List(input1), input2)).thenReturn(List(input1, input2))
+
+ foldRunner.setup()
+ foldRunner.process(input1) shouldBe List.empty[T]
+ foldRunner.process(input2) shouldBe List.empty[T]
+ foldRunner.finish() shouldBe List(List(input1, input2))
+ foldRunner.teardown()
+
+ foldRunner.setup()
+ foldRunner.process(input1) shouldBe List.empty[T]
+ foldRunner.teardown()
+ foldRunner.setup()
+ foldRunner.process(input2) shouldBe List.empty[T]
+ foldRunner.finish() shouldBe List(List(input2))
}
"return passed in description" in {
- val reduceFunction = mock[ReduceFunction[T]]
- val reducer = new Reducer[T](reduceFunction, "reduce")
- reducer.description shouldBe "reduce"
+ val foldFunction = mock[FoldFunction[S, T]]
+ val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+ foldRunner.description shouldBe "fold"
}
"return None on finish" in {
- val reduceFunction = mock[ReduceFunction[T]]
- val reducer = new Reducer[T](reduceFunction, "reduce")
- reducer.finish() shouldBe List.empty[T]
+ val foldFunction = mock[FoldFunction[S, T]]
+ val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+ foldRunner.finish() shouldBe List.empty[T]
}
- "set up reduce function on setup" in {
- val reduceFunction = mock[ReduceFunction[T]]
- val reducer = new Reducer[T](reduceFunction, "reduce")
- reducer.setup()
+ "set up fold function on setup" in {
+ val foldFunction = mock[FoldFunction[S, T]]
+ val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+ foldRunner.setup()
- verify(reduceFunction).setup()
+ verify(foldFunction).setup()
}
- "tear down reduce function on teardown" in {
- val reduceFunction = mock[ReduceFunction[T]]
- val reducer = new Reducer[T](reduceFunction, "reduce")
- reducer.teardown()
+ "tear down fold function on teardown" in {
+ val foldFunction = mock[FoldFunction[S, T]]
+ val foldRunner = new FoldRunner[S, T](foldFunction, "fold")
+ foldRunner.teardown()
- verify(reduceFunction).teardown()
+ verify(foldFunction).teardown()
}
}
@@ -284,11 +293,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
val data = "1 2 2 3 3 3"
- val concat = new Reducer[String](ReduceFunction({ (left, right) =>
+ val concat = new FoldRunner[String, Option[String]](ReduceFunction({ (left, right) =>
left + right}), "concat")
implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
- val config = UserConfig.empty.withValue[FunctionRunner[String, String]](
+ val config = UserConfig.empty.withValue[FunctionRunner[String, Option[String]]](
GEARPUMP_STREAMING_OPERATOR, concat)
val taskContext = MockUtil.mockTaskContext
@@ -307,7 +316,8 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar {
import scala.collection.JavaConverters._
- val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String])
+ val values = peopleCaptor.getAllValues.asScala.map(input =>
+ input.msg.asInstanceOf[Option[String]].get)
assert(values.mkString(",") == "1,2,22,3,33,333")
system.terminate()
Await.result(system.whenTerminated, Duration.Inf)