You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/19 18:05:35 UTC

git commit: CRUNCH-297: Add parallelism options for joins/cogroups to the Scrunch API

Updated Branches:
  refs/heads/master 79e6c896b -> 2aa692e52


CRUNCH-297: Add parallelism options for joins/cogroups to the Scrunch API


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2aa692e5
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2aa692e5
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2aa692e5

Branch: refs/heads/master
Commit: 2aa692e5299ee9d775218d4754ae73f3d58beed1
Parents: 79e6c89
Author: Josh Wills <jw...@apache.org>
Authored: Tue Nov 19 07:44:21 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Nov 19 07:44:21 2013 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/scrunch/PTable.scala      | 34 ++++++++++----------
 1 file changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/2aa692e5/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 5775262..7d5bd66 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -23,7 +23,8 @@ import scala.collection.JavaConversions._
 
 import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
 import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
-import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables}
+import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables}
+import org.apache.crunch.lib.join.{JoinStrategy, DefaultJoinStrategy, JoinType}
 import org.apache.crunch.scrunch.interpreter.InterpreterRunner
 import java.util
 
@@ -64,8 +65,8 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
 
   def values() = new PCollection[V](PTables.values(native))
 
-  def cogroup[V2](other: PTable[K, V2]) = {
-    val jres = Cogroup.cogroup[K, V, V2](this.native, other.native)
+  def cogroup[V2](other: PTable[K, V2], parallelism: Int = 0) = {
+    val jres = Cogroup.cogroup[K, V, V2](parallelism, this.native, other.native)
     val ptf = getTypeFamily()
     val inter = new PTable[K, CPair[JCollect[V], JCollect[V2]]](jres)
     inter.parallelDo(new SMapTableValuesFn[K, CPair[JCollect[V], JCollect[V2]], (Iterable[V], Iterable[V2])] {
@@ -75,10 +76,9 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
     }, ptf.tableOf(keyType, ptf.tuple2(ptf.collections(valueType), ptf.collections(other.valueType))))
   }
 
-  type JoinFn[V2] = (JTable[K, V], JTable[K, V2]) => JTable[K, CPair[V, V2]]
-
-  protected def join[V2](joinFn: JoinFn[V2], other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    val jres = joinFn(this.native, other.native)
+  protected def join[V2](other: PTable[K, V2], joinType: JoinType, parallelism: Int): PTable[K, (V, V2)] = {
+    val strategy = new DefaultJoinStrategy[K, V, V2](parallelism)
+    val jres = strategy.join(this.native, other.native, joinType)
     val ptf = getTypeFamily()
     val ptype = ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType))
     val inter = new PTable[K, CPair[V, V2]](jres)
@@ -87,24 +87,24 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
     }, ptype)
   }
 
-  def join[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    innerJoin(other)
+  def join[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+    innerJoin(other, parallelism)
   }
 
-  def innerJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.innerJoin[K, V, V2](_, _), other)
+  def innerJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+    join[V2](other, JoinType.INNER_JOIN, parallelism)
   }
 
-  def leftJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.leftJoin[K, V, V2](_, _), other)
+  def leftJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+    join[V2](other, JoinType.LEFT_OUTER_JOIN, parallelism)
   }
 
-  def rightJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.rightJoin[K, V, V2](_, _), other)
+  def rightJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+    join[V2](other, JoinType.RIGHT_OUTER_JOIN, parallelism)
   }
 
-  def fullJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
-    join[V2](Join.fullJoin[K, V, V2](_, _), other)
+  def fullJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+    join[V2](other, JoinType.FULL_OUTER_JOIN, parallelism)
   }
 
   def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = {