You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2016/03/10 19:50:58 UTC

crunch git commit: Added the ability to specify the amount of reducers when doing a sharded join.

Repository: crunch
Updated Branches:
  refs/heads/master 20fc3ab79 -> fa04e3c7b


Added the ability to specify the amount of reducers when doing a sharded join.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/fa04e3c7
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/fa04e3c7
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/fa04e3c7

Branch: refs/heads/master
Commit: fa04e3c7b31f2416efb2022757c1866f63e8b5f3
Parents: 20fc3ab
Author: Joel <os...@gmail.com>
Authored: Thu Dec 24 17:55:03 2015 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Mar 10 10:33:24 2016 -0800

----------------------------------------------------------------------
 .../crunch/lib/join/ShardedJoinStrategy.java    | 24 ++++++++++++++++++++
 1 file changed, 24 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/fa04e3c7/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java
index b881e66..2a38457 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/join/ShardedJoinStrategy.java
@@ -63,6 +63,16 @@ public class ShardedJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
   public ShardedJoinStrategy(int numShards) {
     this(new ConstantShardingStrategy<K>(numShards));
   }
+
+  /**
+   * Instantiate with a constant number of shards to use for all keys.
+   *
+   * @param numShards number of shards to use
+   * @param numReducers the amount of reducers to run the join with
+   */
+  public ShardedJoinStrategy(int numShards, int numReducers) {
+    this(new ConstantShardingStrategy<K>(numShards), numReducers);
+  }
   
   /**
    * Instantiate with a custom sharding strategy.
@@ -74,6 +84,20 @@ public class ShardedJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
     this.shardingStrategy = shardingStrategy;
   }
 
+  /**
+   * Instantiate with a custom sharding strategy and a specified number of reducers.
+   *
+   * @param shardingStrategy strategy to be used for sharding
+   * @param numReducers the amount of reducers to run the join with
+   */
+  public ShardedJoinStrategy(ShardingStrategy<K> shardingStrategy, int numReducers) {
+    if (numReducers < 1) {
+      throw new IllegalArgumentException("Num reducers must be > 0, got " + numReducers);
+    }
+    this.wrappedJoinStrategy = new DefaultJoinStrategy<Pair<K, Integer>, U, V>(numReducers);
+    this.shardingStrategy = shardingStrategy;
+  }
+
   @Override
   public PTable<K, Pair<U, V>> join(PTable<K, U> left, PTable<K, V> right, JoinType joinType) {