You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/01/07 17:24:35 UTC

git commit: CRUNCH-139 Use 1 reducer for PCollection#length

Updated Branches:
  refs/heads/master 08fa84e59 -> 2bf556177


CRUNCH-139 Use 1 reducer for PCollection#length


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/2bf55617
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/2bf55617
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/2bf55617

Branch: refs/heads/master
Commit: 2bf55617739645d1747d13a4aca4daa06a560931
Parents: 08fa84e
Author: Gabriel Reid <gr...@apache.org>
Authored: Mon Jan 7 17:19:08 2013 +0100
Committer: Gabriel Reid <gr...@apache.org>
Committed: Mon Jan 7 17:19:08 2013 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Aggregate.java |    4 +++-
 1 files changed, 3 insertions(+), 1 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/2bf55617/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
index 453b920..1c7ac80 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -26,6 +26,7 @@ import java.util.PriorityQueue;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.GroupingOptions;
 import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PObject;
@@ -74,7 +75,8 @@ public class Aggregate {
           public Pair<Integer, Long> map(S input) {
             return Pair.of(1, 1L);
           }
-        }, tf.tableOf(tf.ints(), tf.longs())).groupByKey()
+        }, tf.tableOf(tf.ints(), tf.longs()))
+        .groupByKey(GroupingOptions.builder().numReducers(1).build())
         .combineValues(Aggregators.SUM_LONGS());
     PCollection<Long> count = countTable.values();
     return new FirstElementPObject<Long>(count);