You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2017/01/09 01:18:02 UTC

incubator-gearpump git commit: [GEARPUMP-261] Translate ChainableOp to Processor of TransformTask

Repository: incubator-gearpump
Updated Branches:
  refs/heads/master f200da531 -> 385a612bb


[GEARPUMP-261] Translate ChainableOp to Processor of TransformTask

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [x] Make sure the commit message is formatted like:
   `[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
 - [x] Make sure tests pass via `sbt clean test`.
 - [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.

Author: manuzhang <ow...@gmail.com>

Closes #130 from manuzhang/chainable_op.


Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/385a612b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/385a612b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/385a612b

Branch: refs/heads/master
Commit: 385a612bb916512fdf40afc8cc9c8cab888554c1
Parents: f200da5
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 9 08:57:02 2017 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Mon Jan 9 08:57:02 2017 +0800

----------------------------------------------------------------------
 .../org/apache/gearpump/streaming/dsl/plan/OP.scala  |  3 ++-
 .../apache/gearpump/streaming/dsl/plan/OpSpec.scala  | 15 ++++++++++-----
 2 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/385a612b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 744976b..f15d875 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -141,7 +141,8 @@ case class ChainableOp[IN, OUT](
   }
 
   override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
-    throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+    Processor[TransformTask[Any, Any]](1, description,
+      userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn))
   }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/385a612b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index bf52abc..98bf24f 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -155,12 +155,17 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
       }
     }
 
-    "throw exception on getProcessor" in {
-      val fn1 = mock[SingleInputFunction[Any, Any]]
-      val chainableOp1 = ChainableOp[Any, Any](fn1)
-      intercept[UnsupportedOperationException] {
-        chainableOp1.getProcessor
+    "get Processor" in {
+      val fn = new SingleInputFunction[Any, Any] {
+        override def process(value: Any): TraversableOnce[Any] = null
+
+        override def description: String = null
       }
+      val chainableOp = ChainableOp[Any, Any](fn)
+
+      val processor = chainableOp.getProcessor
+      processor shouldBe a[Processor[_]]
+      processor.parallelism shouldBe 1
     }
   }