You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/05/13 23:43:56 UTC

git commit: CRUNCH-393 Handle object reuse in Aggregate.top

Repository: crunch
Updated Branches:
  refs/heads/apache-crunch-0.8 1101f913b -> 74c18d453


CRUNCH-393 Handle object reuse in Aggregate.top


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/74c18d45
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/74c18d45
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/74c18d45

Branch: refs/heads/apache-crunch-0.8
Commit: 74c18d4535f9735897fb855bfb61924301eb812b
Parents: 1101f91
Author: Gabriel Reid <gr...@apache.org>
Authored: Tue May 13 23:40:15 2014 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Tue May 13 23:43:02 2014 +0200

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/AggregateIT.java | 21 +++++++++++++
 .../java/org/apache/crunch/lib/Aggregate.java   | 32 ++++++++++++++++----
 2 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/74c18d45/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
index 56ee3ac..1408c73 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.crunch.MapFn;
@@ -33,7 +34,9 @@ import org.apache.crunch.Pair;
 import org.apache.crunch.Pipeline;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
 import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.StringWrapper;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PTableType;
@@ -139,6 +142,24 @@ public class AggregateIT {
   }
 
   @Test
+  public void testTopN_MRPipeline() throws IOException {
+    Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+    PTable<StringWrapper, String> entries = pipeline
+        .read(From.textFile(tmpDir.copyResourceFileName("set1.txt"), Avros.strings()))
+        .by(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
+    PTable<StringWrapper, String> topEntries = Aggregate.top(entries, 3, true);
+    List<Pair<StringWrapper, String>> expectedTop3 = Lists.newArrayList(
+        Pair.of(StringWrapper.wrap("e"), "e"),
+        Pair.of(StringWrapper.wrap("c"), "c"),
+        Pair.of(StringWrapper.wrap("b"), "b"));
+
+    assertEquals(
+        expectedTop3,
+        Lists.newArrayList(topEntries.materialize()));
+
+  }
+
+  @Test
   public void testCollectValues_Writables() throws IOException {
     Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
     Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))

http://git-wip-us.apache.org/repos/asf/crunch/blob/74c18d45/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index 3d132d4..7a71646 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -110,19 +110,22 @@ public class Aggregate {
 
     private final int limit;
     private final boolean maximize;
+    private final PType<Pair<K, V>> pairType;
     private transient PriorityQueue<Pair<K, V>> values;
 
-    public TopKFn(int limit, boolean ascending) {
+    public TopKFn(int limit, boolean ascending, PType<Pair<K, V>> pairType) {
       this.limit = limit;
       this.maximize = ascending;
+      this.pairType = pairType;
     }
 
     public void initialize() {
       this.values = new PriorityQueue<Pair<K, V>>(limit, new PairValueComparator<K, V>(maximize));
+      pairType.initialize(getConfiguration());
     }
 
     public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
-      values.add(input);
+      values.add(pairType.getDetachedValue(input));
       if (values.size() > limit) {
         values.poll();
       }
@@ -139,10 +142,17 @@ public class Aggregate {
 
     private final int limit;
     private final boolean maximize;
+    private PType<Pair<K, V>> pairType;
 
-    public TopKCombineFn(int limit, boolean maximize) {
+    public TopKCombineFn(int limit, boolean maximize, PType<Pair<K, V>> pairType) {
       this.limit = limit;
       this.maximize = maximize;
+      this.pairType = pairType;
+    }
+
+    @Override
+    public void initialize() {
+      pairType.initialize(getConfiguration());
     }
 
     @Override
@@ -151,7 +161,7 @@ public class Aggregate {
       Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize);
       PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit, cmp);
       for (Pair<K, V> pair : input.second()) {
-        queue.add(pair);
+        queue.add(pairType.getDetachedValue(pair));
         if (queue.size() > limit) {
           queue.poll();
         }
@@ -165,13 +175,23 @@ public class Aggregate {
     }
   }
 
+  /**
+   * Selects the top N pairs from the given table, with sorting being performed on the values (i.e. the second
+   * value in the pair) of the table.
+   *
+   * @param ptable table containing the pairs from which the top N is to be selected
+   * @param limit number of top elements to select
+   * @param maximize if true, the maximum N values from the table will be selected, otherwise the minimal
+   *                 N values will be selected
+   * @return table containing the top N values from the incoming table
+   */
   public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit, boolean maximize) {
     PTypeFamily ptf = ptable.getTypeFamily();
     PTableType<K, V> base = ptable.getPTableType();
     PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
     PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
-    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter)
-        .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize))
+    return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize, pairType), inter)
+        .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize, pairType))
         .parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() {
           public void process(Pair<Integer, Pair<K, V>> input, Emitter<Pair<K, V>> emitter) {
             emitter.emit(input.second());