You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/11/19 18:05:35 UTC
git commit: CRUNCH-297: Add parallelism options for joins/cogroups to
the Scrunch API
Updated Branches:
refs/heads/master 79e6c896b -> 2aa692e52
CRUNCH-297: Add parallelism options for joins/cogroups to the Scrunch API
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/2aa692e5
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/2aa692e5
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/2aa692e5
Branch: refs/heads/master
Commit: 2aa692e5299ee9d775218d4754ae73f3d58beed1
Parents: 79e6c89
Author: Josh Wills <jw...@apache.org>
Authored: Tue Nov 19 07:44:21 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Tue Nov 19 07:44:21 2013 -0800
----------------------------------------------------------------------
.../org/apache/crunch/scrunch/PTable.scala | 34 ++++++++++----------
1 file changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/2aa692e5/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
index 5775262..7d5bd66 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PTable.scala
@@ -23,7 +23,8 @@ import scala.collection.JavaConversions._
import org.apache.crunch.{DoFn, Emitter, FilterFn, MapFn}
import org.apache.crunch.{GroupingOptions, PTable => JTable, Pair => CPair}
-import org.apache.crunch.lib.{Join, Cartesian, Aggregate, Cogroup, PTables}
+import org.apache.crunch.lib.{Cartesian, Aggregate, Cogroup, PTables}
+import org.apache.crunch.lib.join.{JoinStrategy, DefaultJoinStrategy, JoinType}
import org.apache.crunch.scrunch.interpreter.InterpreterRunner
import java.util
@@ -64,8 +65,8 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
def values() = new PCollection[V](PTables.values(native))
- def cogroup[V2](other: PTable[K, V2]) = {
- val jres = Cogroup.cogroup[K, V, V2](this.native, other.native)
+ def cogroup[V2](other: PTable[K, V2], parallelism: Int = 0) = {
+ val jres = Cogroup.cogroup[K, V, V2](parallelism, this.native, other.native)
val ptf = getTypeFamily()
val inter = new PTable[K, CPair[JCollect[V], JCollect[V2]]](jres)
inter.parallelDo(new SMapTableValuesFn[K, CPair[JCollect[V], JCollect[V2]], (Iterable[V], Iterable[V2])] {
@@ -75,10 +76,9 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
}, ptf.tableOf(keyType, ptf.tuple2(ptf.collections(valueType), ptf.collections(other.valueType))))
}
- type JoinFn[V2] = (JTable[K, V], JTable[K, V2]) => JTable[K, CPair[V, V2]]
-
- protected def join[V2](joinFn: JoinFn[V2], other: PTable[K, V2]): PTable[K, (V, V2)] = {
- val jres = joinFn(this.native, other.native)
+ protected def join[V2](other: PTable[K, V2], joinType: JoinType, parallelism: Int): PTable[K, (V, V2)] = {
+ val strategy = new DefaultJoinStrategy[K, V, V2](parallelism)
+ val jres = strategy.join(this.native, other.native, joinType)
val ptf = getTypeFamily()
val ptype = ptf.tableOf(keyType, ptf.tuple2(valueType, other.valueType))
val inter = new PTable[K, CPair[V, V2]](jres)
@@ -87,24 +87,24 @@ class PTable[K, V](val native: JTable[K, V]) extends PCollectionLike[CPair[K, V]
}, ptype)
}
- def join[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- innerJoin(other)
+ def join[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+ innerJoin(other, parallelism)
}
- def innerJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.innerJoin[K, V, V2](_, _), other)
+ def innerJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+ join[V2](other, JoinType.INNER_JOIN, parallelism)
}
- def leftJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.leftJoin[K, V, V2](_, _), other)
+ def leftJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+ join[V2](other, JoinType.LEFT_OUTER_JOIN, parallelism)
}
- def rightJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.rightJoin[K, V, V2](_, _), other)
+ def rightJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+ join[V2](other, JoinType.RIGHT_OUTER_JOIN, parallelism)
}
- def fullJoin[V2](other: PTable[K, V2]): PTable[K, (V, V2)] = {
- join[V2](Join.fullJoin[K, V, V2](_, _), other)
+ def fullJoin[V2](other: PTable[K, V2], parallelism: Int = -1): PTable[K, (V, V2)] = {
+ join[V2](other, JoinType.FULL_OUTER_JOIN, parallelism)
}
def cross[K2, V2](other: PTable[K2, V2]): PTable[(K, K2), (V, V2)] = {