You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/09/16 02:03:59 UTC

git commit: CRUNCH-265: Allow clients to specify the number of reducers to use for default and one-to-many joins

Updated Branches:
  refs/heads/master ac17f4f72 -> acdf396b5


CRUNCH-265: Allow clients to specify the number of reducers to use for default and one-to-many joins


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/acdf396b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/acdf396b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/acdf396b

Branch: refs/heads/master
Commit: acdf396b516e9a9c8348eb041be6adc1cd0de278
Parents: ac17f4f
Author: Josh Wills <jw...@apache.org>
Authored: Sun Sep 15 13:14:56 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Sep 15 13:19:46 2013 -0700

----------------------------------------------------------------------
 .../crunch/lib/join/DefaultJoinStrategy.java    | 21 ++++++++++++++++----
 .../apache/crunch/lib/join/OneToManyJoin.java   | 19 ++++++++++++++++--
 2 files changed, 34 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/acdf396b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
index 3edceeb..bfc8ab3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/DefaultJoinStrategy.java
@@ -32,7 +32,17 @@ import org.apache.crunch.types.PTypeFamily;
  * efficient due to its passing all data through the shuffle phase.
  */
 public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
-  
+
+  private final int numReducers;
+
+  public DefaultJoinStrategy() {
+    this(-1);
+  }
+
+  public DefaultJoinStrategy(int numReducers) {
+    this.numReducers = numReducers;
+  }
+
   @Override
   public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {
     switch (joinType) {
@@ -60,14 +70,15 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
    */
   public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinFn<K, U, V> joinFn) {
     PTypeFamily ptf = left.getTypeFamily();
-    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right);
+    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = preJoin(left, right, numReducers);
     PTableType<K, Pair<U, V>> ret = ptf
         .tableOf(left.getKeyType(), ptf.pairs(left.getValueType(), right.getValueType()));
 
     return grouped.parallelDo(joinFn.getJoinType() + grouped.getName(), joinFn, ret);
   }
 
-  static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right) {
+  static <K, U, V> PGroupedTable<Pair<K, Integer>, Pair<U, V>> preJoin(PTable<K, U> left, PTable<K, V> right,
+                                                                       int numReducers) {
     PTypeFamily ptf = left.getTypeFamily();
     PTableType<Pair<K, Integer>, Pair<U, V>> ptt = ptf.tableOf(ptf.pairs(left.getKeyType(), ptf.ints()),
         ptf.pairs(left.getValueType(), right.getValueType()));
@@ -89,7 +100,9 @@ public class DefaultJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
 
     GroupingOptions.Builder optionsBuilder = GroupingOptions.builder();
     optionsBuilder.partitionerClass(JoinUtils.getPartitionerClass(ptf));
-
+    if (numReducers > 0) {
+      optionsBuilder.numReducers(numReducers);
+    }
     return (tag1.union(tag2)).groupByKey(optionsBuilder.build());
   }
 

http://git-wip-us.apache.org/repos/asf/crunch/blob/acdf396b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
index 25556ec..c09fd05 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/OneToManyJoin.java
@@ -62,9 +62,24 @@ public class OneToManyJoin {
    * @return the post-processed output of the join
    */
   public static <K, U, V, T> PCollection<T> oneToManyJoin(PTable<K, U> left, PTable<K, V> right,
-      DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype) {
+                                                          DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype) {
+    return oneToManyJoin(left, right, postProcessFn, ptype, -1);
+  }
+
+  /**
+   * Supports a user-specified number of reducers for the one-to-many join.
+   *
+   * @param left left-side table to join
+   * @param right right-side table to join
+   * @param postProcessFn DoFn to process the results of the join
+   * @param ptype type of the output of the postProcessFn
+   * @param numReducers The number of reducers to use
+   * @return the post-processed output of the join
+   */
+  public static <K, U, V, T> PCollection<T> oneToManyJoin(PTable<K, U> left, PTable<K, V> right,
+      DoFn<Pair<U, Iterable<V>>, T> postProcessFn, PType<T> ptype, int numReducers) {
 
-    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = DefaultJoinStrategy.preJoin(left, right);
+    PGroupedTable<Pair<K, Integer>, Pair<U, V>> grouped = DefaultJoinStrategy.preJoin(left, right, numReducers);
     return grouped.parallelDo("One to many join " + grouped.getName(),
         new OneToManyJoinFn<K, U, V, T>(left.getValueType(), postProcessFn), ptype);
   }