You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/16 02:03:59 UTC
git commit: CRUNCH-265: Allow clients to specify the number of
reducers to use for default and one-to-many joins
Updated Branches:
refs/heads/master ac17f4f72 -> acdf396b5
CRUNCH-265: Allow clients to specify the number of reducers to use for default and one-to-many joins
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/acdf396b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/acdf396b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/acdf396b
Branch: refs/heads/master
Commit: acdf396b516e9a9c8348eb041be6adc1cd0de278
Parents: ac17f4f
Author: Josh Wills <jw...@apache.org>
Authored: Sun Sep 15 13:14:56 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Sep 15 13:19:46 2013 -0700
----------------------------------------------------------------------
.../crunch/lib/join/DefaultJoinStrategy.java | 21 ++++++++++++++++----
.../apache/crunch/lib/join/OneToManyJoin.java | 19 ++++++++++++++++--
2 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/acdf396b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
index 3edceeb..bfc8ab3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
@@ -32,7 +32,17 @@ import org.apache.crunch.types.PTypeFamily;
* efficient due to its passing all data through the shuffle phase.
*/
public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
-
+
+ private final int numReducers;
+
+ public DefaultJoinStrategy() {
+ this(-1);
+ }
+
+ public DefaultJoinStrategy(int numReducers) {
+ this.numReducers = numReducers;
+ }
+
@Override
public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
switch (joinType) {
@@ -60,14 +70,15 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
*/
public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinFn<K, U, V> joinFn) {
PTypeFamily ptf = left.getTypeFamily();
- PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right);
+ PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right, numReducers);
PTableType<K, Pair<U, V>> ret = ptf
.tableOf(left.getKeyType(), ptf.pairs(left.getValueType(), right.getValueType()));
return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret);
}
- static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) {
+ static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right,
+ int numReducers) {
PTypeFamily ptf = left.getTypeFamily();
PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
ptf.pairs(left.getValueType(), right.getValueType()));
@@ -89,7 +100,9 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
-
+ if (numReducers > 0) {
+ optionsBuilder.numReducers(numReducers);
+ }
return (tag1.union(tag2)).groupByKey(optionsBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/acdf396b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
index 25556ec..c09fd05 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
@@ -62,9 +62,24 @@ public class OneToManyJoin {
* @return the post-processed output of the join
*/
public static <K, U, V, T> PCollection<T> oneToManyJoin(PTable<K, U> left, PTable<K, V> right,
- DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype) {
+ DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype) {
+ return oneToManyJoin(left, right, postProcessFn, ptype, -1);
+ }
+
+ /**
+ * Supports a user-specified number of reducers for the one-to-many join.
+ *
+ * @param left left-side table to join
+ * @param right right-side table to join
+ * @param postProcessFn DoFn to process the results of the join
+ * @param ptype type of the output of the postProcessFn
+ * @param numReducers The number of reducers to use
+ * @return the post-processed output of the join
+ */
+ public static <K, U, V, T> PCollection<T> oneToManyJoin(PTable<K, U> left, PTable<K, V> right,
+ DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype, int numReducers) {
- PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = DefaultJoinStrategy.preJoin(left, right);
+ PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = DefaultJoinStrategy.preJoin(left, right, numReducers);
return grouped.parallelDo("One to many join " + grouped.getName(),
new OneToManyJoinFn<K, U, V, T>(left.getValueType(), postProcessFn), ptype);
}