You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/12/14 11:13:08 UTC
incubator-gearpump git commit: [GEARPUMP-249] Fix function chains to
enforce execution
Repository: incubator-gearpump
Updated Branches:
refs/heads/master 215531cd8 -> 1bf6a9ba8
[GEARPUMP-249] Fix function chains to enforce execution
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [x] Make sure the commit message is formatted like:
`[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
- [x] Make sure tests pass via `sbt clean test`.
- [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.
Author: manuzhang <ow...@gmail.com>
Closes #123 from manuzhang/GEARPUMP-249.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1bf6a9ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1bf6a9ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1bf6a9ba
Branch: refs/heads/master
Commit: 1bf6a9ba8fa774b985fcd90a5d6bef129a4fb9a5
Parents: 215531c
Author: manuzhang <ow...@gmail.com>
Authored: Wed Dec 14 19:12:52 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Dec 14 19:12:52 2016 +0800
----------------------------------------------------------------------
.../dsl/plan/functions/SingleInputFunction.scala | 9 +++++++--
.../dsl/plan/functions/SingleInputFunctionSpec.scala | 10 +++++++---
2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1bf6a9ba/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 609fbb0..5322648 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -20,17 +20,22 @@ package org.apache.gearpump.streaming.dsl.plan.functions
trait SingleInputFunction[IN, OUT] extends Serializable {
def process(value: IN): TraversableOnce[OUT]
def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- new AndThen(this, other)
+ AndThen(this, other)
}
def finish(): TraversableOnce[OUT] = None
def clearState(): Unit = {}
def description: String
}
-class AndThen[IN, MIDDLE, OUT](
+case class AndThen[IN, MIDDLE, OUT](
first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
extends SingleInputFunction[IN, OUT] {
+ override def andThen[OUTER](
+ other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
+ first.andThen(second.andThen(other))
+ }
+
override def process(value: IN): TraversableOnce[OUT] = {
first.process(value).flatMap(second.process)
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1bf6a9ba/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
index 94feae4..ad12e33 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -45,7 +45,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
val first = mock[SingleInputFunction[R, S]]
val second = mock[SingleInputFunction[S, T]]
- val andThen = new AndThen(first, second)
+ val andThen = AndThen(first, second)
"chain first and second functions when processing input value" in {
val input = mock[R]
@@ -86,7 +86,11 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
"return AndThen on andThen" in {
val third = mock[SingleInputFunction[T, Any]]
- andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]]
+ when(second.andThen(third)).thenReturn(AndThen(second, third))
+
+ andThen.andThen[Any](third)
+
+ verify(first).andThen(AndThen(second, third))
}
}
@@ -241,7 +245,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
val taskContext = MockUtil.mockTaskContext
implicit val actorSystem = MockUtil.system
- val data = "one two three".split("\\s")
+ val data = "one two three".split("\\s+")
val dataSource = new CollectionDataSource[String](data)
val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)