You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/31 23:01:13 UTC
crunch git commit: CRUNCH-497: Add union methods to Scrunch's
PipelineLike
Repository: crunch
Updated Branches:
refs/heads/master ebb1b2e32 -> e84751081
CRUNCH-497: Add union methods to Scrunch's PipelineLike
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e8475108
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e8475108
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e8475108
Branch: refs/heads/master
Commit: e84751081ec6d8f995b6a41f0c4488f0ff86daf1
Parents: ebb1b2e
Author: Josh Wills <jw...@apache.org>
Authored: Sat Jan 31 09:13:48 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Jan 31 09:13:48 2015 -0800
----------------------------------------------------------------------
.../org/apache/crunch/scrunch/PipelineLike.scala | 19 +++++++++++++++++++
1 file changed, 19 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/e8475108/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index e948904..b66d289 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -23,6 +23,7 @@ import org.apache.crunch.{Pipeline => JPipeline, _}
import org.apache.crunch.scrunch.interpreter.InterpreterRunner
import org.apache.crunch.types.{PTableType, PType}
+import scala.collection.JavaConversions
import scala.collection.JavaConversions.asJavaCollection
trait PipelineLike {
@@ -165,6 +166,24 @@ trait PipelineLike {
}
/**
+ * Creates a new PCollection as the union of the given elements.
+ */
+ def union[S](elements: Seq[PCollection[S]]) = {
+ val natives = elements.map(pc => pc.native)
+ val jpc = jpipeline.union(JavaConversions.seqAsJavaList(natives))
+ new PCollection[S](jpc)
+ }
+
+ /**
+ * Creates a new PTable as the union of the given elements.
+ */
+ def unionTables[K, V](elements: Seq[PTable[K, V]]) = {
+ val natives = elements.map(pc => pc.native)
+ val jpt = jpipeline.unionTables(JavaConversions.seqAsJavaList(natives))
+ new PTable[K, V](jpt)
+ }
+
+ /**
* Adds the given {@code SeqDoFn} to the pipeline execution and returns its output.
*/
def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn)