You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2014/05/13 23:43:56 UTC
git commit: CRUNCH-393 Handle object reuse in Aggregate.top
Repository: crunch
Updated Branches:
refs/heads/apache-crunch-0.8 1101f913b -> 74c18d453
CRUNCH-393 Handle object reuse in Aggregate.top
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/74c18d45
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/74c18d45
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/74c18d45
Branch: refs/heads/apache-crunch-0.8
Commit: 74c18d4535f9735897fb855bfb61924301eb812b
Parents: 1101f91
Author: Gabriel Reid <gr...@apache.org>
Authored: Tue May 13 23:40:15 2014 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Tue May 13 23:43:02 2014 +0200
----------------------------------------------------------------------
.../java/org/apache/crunch/lib/AggregateIT.java | 21 +++++++++++++
.../java/org/apache/crunch/lib/Aggregate.java | 32 ++++++++++++++++----
2 files changed, 47 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/74c18d45/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
index 56ee3ac..1408c73 100644
--- a/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
+++ b/crunch-core/src/it/java/org/apache/crunch/lib/AggregateIT.java
@@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.crunch.MapFn;
@@ -33,7 +34,9 @@ import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
import org.apache.crunch.test.Employee;
+import org.apache.crunch.test.StringWrapper;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTableType;
@@ -139,6 +142,24 @@ public class AggregateIT {
}
@Test
+ public void testTopN_MRPipeline() throws IOException {
+ Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
+ PTable<StringWrapper, String> entries = pipeline
+ .read(From.textFile(tmpDir.copyResourceFileName("set1.txt"), Avros.strings()))
+ .by(new StringWrapper.StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class));
+ PTable<StringWrapper, String> topEntries = Aggregate.top(entries, 3, true);
+ List<Pair<StringWrapper, String>> expectedTop3 = Lists.newArrayList(
+ Pair.of(StringWrapper.wrap("e"), "e"),
+ Pair.of(StringWrapper.wrap("c"), "c"),
+ Pair.of(StringWrapper.wrap("b"), "b"));
+
+ assertEquals(
+ expectedTop3,
+ Lists.newArrayList(topEntries.materialize()));
+
+ }
+
+ @Test
public void testCollectValues_Writables() throws IOException {
Pipeline pipeline = new MRPipeline(AggregateIT.class, tmpDir.getDefaultConfiguration());
Map<Integer, Collection<Text>> collectionMap = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
http://git-wip-us.apache.org/repos/asf/crunch/blob/74c18d45/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index 3d132d4..7a71646 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -110,19 +110,22 @@ public class Aggregate {
private final int limit;
private final boolean maximize;
+ private final PType<Pair<K, V>> pairType;
private transient PriorityQueue<Pair<K, V>> values;
- public TopKFn(int limit, boolean ascending) {
+ public TopKFn(int limit, boolean ascending, PType<Pair<K, V>> pairType) {
this.limit = limit;
this.maximize = ascending;
+ this.pairType = pairType;
}
public void initialize() {
this.values = new PriorityQueue<Pair<K, V>>(limit, new PairValueComparator<K, V>(maximize));
+ pairType.initialize(getConfiguration());
}
public void process(Pair<K, V> input, Emitter<Pair<Integer, Pair<K, V>>> emitter) {
- values.add(input);
+ values.add(pairType.getDetachedValue(input));
if (values.size() > limit) {
values.poll();
}
@@ -139,10 +142,17 @@ public class Aggregate {
private final int limit;
private final boolean maximize;
+ private PType<Pair<K, V>> pairType;
- public TopKCombineFn(int limit, boolean maximize) {
+ public TopKCombineFn(int limit, boolean maximize, PType<Pair<K, V>> pairType) {
this.limit = limit;
this.maximize = maximize;
+ this.pairType = pairType;
+ }
+
+ @Override
+ public void initialize() {
+ pairType.initialize(getConfiguration());
}
@Override
@@ -151,7 +161,7 @@ public class Aggregate {
Comparator<Pair<K, V>> cmp = new PairValueComparator<K, V>(maximize);
PriorityQueue<Pair<K, V>> queue = new PriorityQueue<Pair<K, V>>(limit, cmp);
for (Pair<K, V> pair : input.second()) {
- queue.add(pair);
+ queue.add(pairType.getDetachedValue(pair));
if (queue.size() > limit) {
queue.poll();
}
@@ -165,13 +175,23 @@ public class Aggregate {
}
}
+ /**
+ * Selects the top N pairs from the given table, with sorting being performed on the values (i.e. the second
+ * value in the pair) of the table.
+ *
+ * @param ptable table containing the pairs from which the top N is to be selected
+ * @param limit number of top elements to select
+ * @param maximize if true, the maximum N values from the table will be selected, otherwise the minimal
+ * N values will be selected
+ * @return table containing the top N values from the incoming table
+ */
public static <K, V> PTable<K, V> top(PTable<K, V> ptable, int limit, boolean maximize) {
PTypeFamily ptf = ptable.getTypeFamily();
PTableType<K, V> base = ptable.getPTableType();
PType<Pair<K, V>> pairType = ptf.pairs(base.getKeyType(), base.getValueType());
PTableType<Integer, Pair<K, V>> inter = ptf.tableOf(ptf.ints(), pairType);
- return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize), inter)
- .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize))
+ return ptable.parallelDo("top" + limit + "map", new TopKFn<K, V>(limit, maximize, pairType), inter)
+ .groupByKey(1).combineValues(new TopKCombineFn<K, V>(limit, maximize, pairType))
.parallelDo("top" + limit + "reduce", new DoFn<Pair<Integer, Pair<K, V>>, Pair<K, V>>() {
public void process(Pair<Integer, Pair<K, V>> input, Emitter<Pair<K, V>> emitter) {
emitter.emit(input.second());