You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by ma...@apache.org on 2016/12/14 11:13:08 UTC

incubator-gearpump git commit: [GEARPUMP-249] Fix function chains to enforce execution

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master 215531cd8 -> 1bf6a9ba8


[GEARPUMP-249] Fix function chains to enforce execution

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [x] Make sure tests pass via `sbt clean test`.
 - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.

Author: manuzhang <ow...@gmail.com>

Closes #123 from manuzhang/GEARPUMP-249.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/1bf6a9ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/1bf6a9ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/1bf6a9ba

Branch: refs/heads/master
Commit: 1bf6a9ba8fa774b985fcd90a5d6bef129a4fb9a5
Parents: 215531c
Author: manuzhang <ow...@gmail.com>
Authored: Wed Dec 14 19:12:52 2016 +0800
Committer: manuzhang <ow...@gmail.com>
Committed: Wed Dec 14 19:12:52 2016 +0800

----------------------------------------------------------------------
 .../dsl/plan/functions/SingleInputFunction.scala          |  9 +++++++--
 .../dsl/plan/functions/SingleInputFunctionSpec.scala      | 10 +++++++---
 2 files changed, 14 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1bf6a9ba/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 609fbb0..5322648 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -20,17 +20,22 @@ package org.apache.gearpump.streaming.dsl.plan.functions
 trait SingleInputFunction[IN, OUT] extends Serializable {
   def process(value: IN): TraversableOnce[OUT]
   def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
-    new AndThen(this, other)
+    AndThen(this, other)
   }
   def finish(): TraversableOnce[OUT] = None
   def clearState(): Unit = {}
   def description: String
 }
 
-class AndThen[IN, MIDDLE, OUT](
+case class AndThen[IN, MIDDLE, OUT](
     first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
   extends SingleInputFunction[IN, OUT] {
 
+  override def andThen[OUTER](
+      other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
+    first.andThen(second.andThen(other))
+  }
+
   override def process(value: IN): TraversableOnce[OUT] = {
     first.process(value).flatMap(second.process)
   }

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/1bf6a9ba/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
index 94feae4..ad12e33 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -45,7 +45,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
 
     val first = mock[SingleInputFunction[R, S]]
     val second = mock[SingleInputFunction[S, T]]
-    val andThen = new AndThen(first, second)
+    val andThen = AndThen(first, second)
 
     "chain first and second functions when processing input value" in {
       val input = mock[R]
@@ -86,7 +86,11 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
 
     "return AndThen on andThen" in {
       val third = mock[SingleInputFunction[T, Any]]
-      andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]]
+      when(second.andThen(third)).thenReturn(AndThen(second, third))
+
+      andThen.andThen[Any](third)
+
+      verify(first).andThen(AndThen(second, third))
     }
   }
 
@@ -241,7 +245,7 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar {
       val taskContext = MockUtil.mockTaskContext
       implicit val actorSystem = MockUtil.system
 
-      val data = "one two three".split("\\s")
+      val data = "one two three".split("\\s+")
       val dataSource = new CollectionDataSource[String](data)
       val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)