You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gearpump.apache.org by hu...@apache.org on 2017/01/09 01:18:02 UTC
incubator-gearpump git commit: [GEARPUMP-261] Translate ChainableOp
to Processor of TransformTask
Repository: incubator-gearpump
Updated Branches:
refs/heads/master f200da531 -> 385a612bb
[GEARPUMP-261] Translate ChainableOp to Processor of TransformTask
Be sure to do all of the following to help us incorporate your contribution
quickly and easily:
- [x] Make sure the commit message is formatted like:
`[GEARPUMP-<Jira issue #>] Meaningful description of pull request`
- [x] Make sure tests pass via `sbt clean test`.
- [x] Make sure old documentation affected by the pull request has been updated and new documentation added for new functionality.
Author: manuzhang <ow...@gmail.com>
Closes #130 from manuzhang/chainable_op.
Project: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/commit/385a612b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/tree/385a612b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gearpump/diff/385a612b
Branch: refs/heads/master
Commit: 385a612bb916512fdf40afc8cc9c8cab888554c1
Parents: f200da5
Author: manuzhang <ow...@gmail.com>
Authored: Mon Jan 9 08:57:02 2017 +0800
Committer: huafengw <fv...@gmail.com>
Committed: Mon Jan 9 08:57:02 2017 +0800
----------------------------------------------------------------------
.../org/apache/gearpump/streaming/dsl/plan/OP.scala | 3 ++-
.../apache/gearpump/streaming/dsl/plan/OpSpec.scala | 15 ++++++++++-----
2 files changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/385a612b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index 744976b..f15d875 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -141,7 +141,8 @@ case class ChainableOp[IN, OUT](
}
override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = {
- throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor")
+ Processor[TransformTask[Any, Any]](1, description,
+ userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, fn))
}
}
http://git-wip-us.apache.org/repos/asf/incubator-gearpump/blob/385a612b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index bf52abc..98bf24f 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -155,12 +155,17 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS
}
}
- "throw exception on getProcessor" in {
- val fn1 = mock[SingleInputFunction[Any, Any]]
- val chainableOp1 = ChainableOp[Any, Any](fn1)
- intercept[UnsupportedOperationException] {
- chainableOp1.getProcessor
+ "get Processor" in {
+ val fn = new SingleInputFunction[Any, Any] {
+ override def process(value: Any): TraversableOnce[Any] = null
+
+ override def description: String = null
}
+ val chainableOp = ChainableOp[Any, Any](fn)
+
+ val processor = chainableOp.getProcessor
+ processor shouldBe a[Processor[_]]
+ processor.parallelism shouldBe 1
}
}