You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/31 23:01:13 UTC

crunch git commit: CRUNCH-497: Add union methods to Scrunch's PipelineLike

Repository: crunch
Updated Branches:
  refs/heads/master ebb1b2e32 -> e84751081


CRUNCH-497: Add union methods to Scrunch's PipelineLike


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/e8475108
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/e8475108
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/e8475108

Branch: refs/heads/master
Commit: e84751081ec6d8f995b6a41f0c4488f0ff86daf1
Parents: ebb1b2e
Author: Josh Wills <jw...@apache.org>
Authored: Sat Jan 31 09:13:48 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sat Jan 31 09:13:48 2015 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/scrunch/PipelineLike.scala | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/e8475108/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index e948904..b66d289 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -23,6 +23,7 @@ import org.apache.crunch.{Pipeline => JPipeline, _}
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
 import org.apache.crunch.types.{PTableType, PType}
 
+import scala.collection.JavaConversions
 import scala.collection.JavaConversions.asJavaCollection
 
 trait PipelineLike {
@@ -165,6 +166,24 @@ trait PipelineLike {
   }
 
   /**
+   * Creates a new PCollection as the union of the given elements.
+   */
+  def union[S](elements: Seq[PCollection[S]]) = {
+    val natives = elements.map(pc => pc.native)
+    val jpc = jpipeline.union(JavaConversions.seqAsJavaList(natives))
+    new PCollection[S](jpc)
+  }
+
+  /**
+   * Creates a new PTable as the union of the given elements.
+   */
+  def unionTables[K, V](elements: Seq[PTable[K, V]]) = {
+    val natives = elements.map(pc => pc.native)
+    val jpt = jpipeline.unionTables(JavaConversions.seqAsJavaList(natives))
+    new PTable[K, V](jpt)
+  }
+
+  /**
    * Adds the given {@code SeqDoFn} to the pipeline execution and returns its output.
    */
   def sequentialDo[Output](seqDoFn: PipelineCallable[Output]) = jpipeline.sequentialDo(seqDoFn)