You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/03/19 23:58:31 UTC
crunch git commit: CRUNCH-503: Fix MAX_N and MIN_N aggregators and
add MAX_UNIQUE_N and MIN_UNIQUE_N
Repository: crunch
Updated Branches:
refs/heads/master 8c35a1399 -> 3ab0b078c
CRUNCH-503: Fix MAX_N and MIN_N aggregators and add MAX_UNIQUE_N and MIN_UNIQUE_N
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3ab0b078
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3ab0b078
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3ab0b078
Branch: refs/heads/master
Commit: 3ab0b078c47f23b3ba893fdfb05fd723f663d02b
Parents: 8c35a13
Author: Josh Wills <jw...@apache.org>
Authored: Thu Mar 19 12:47:11 2015 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Mar 19 15:45:32 2015 -0700
----------------------------------------------------------------------
.../java/org/apache/crunch/fn/Aggregators.java | 102 +++++++++++++++++--
.../org/apache/crunch/fn/AggregatorsTest.java | 31 +++---
2 files changed, 109 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/3ab0b078/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index e7aeb18..5a9c157 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -22,7 +22,10 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
+import java.util.TreeSet;
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
import org.apache.crunch.Aggregator;
import org.apache.crunch.CombineFn;
import org.apache.crunch.Emitter;
@@ -209,6 +212,17 @@ public final class Aggregators {
}
/**
+ * Return the {@code n} largest unique values (or fewer if there are fewer
+ * values than {@code n}).
+ * @param n The number of values to return
+ * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+ * @return The newly constructed instance
+ */
+ public static <V extends Comparable<V>> Aggregator<V> MAX_UNIQUE_N(int n, Class<V> cls) {
+ return new MaxUniqueNAggregator<V>(n);
+ }
+
+ /**
* Return the minimum of all given {@code long} values.
* @return The newly constructed instance
*/
@@ -310,6 +324,16 @@ public final class Aggregators {
}
/**
+ * Returns the {@code n} smallest unique values (or fewer if there are fewer unique values than {@code n}).
+ * @param n The number of values to return
+ * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+ * @return The newly constructed instance
+ */
+ public static <V extends Comparable<V>> Aggregator<V> MIN_UNIQUE_N(int n, Class<V> cls) {
+ return new MinUniqueNAggregator<V>(n);
+ }
+
+ /**
* Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
*
* @param n The number of values to return
@@ -810,7 +834,7 @@ public final class Aggregators {
private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
private final int arity;
- private transient SortedSet<V> elements;
+ private transient SortedMultiset<V> elements;
public MaxNAggregator(int arity) {
this.arity = arity;
@@ -819,7 +843,7 @@ public final class Aggregators {
@Override
public void reset() {
if (elements == null) {
- elements = Sets.newTreeSet();
+ elements = TreeMultiset.create();
} else {
elements.clear();
}
@@ -829,7 +853,40 @@ public final class Aggregators {
public void update(V value) {
if (elements.size() < arity) {
elements.add(value);
- } else if (value.compareTo(elements.first()) > 0) {
+ } else if (value.compareTo(elements.firstEntry().getElement()) > 0) {
+ elements.remove(elements.firstEntry().getElement());
+ elements.add(value);
+ }
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return ImmutableList.copyOf(elements);
+ }
+ }
+
+ private static class MaxUniqueNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+ private final int arity;
+ private transient SortedSet<V> elements;
+
+ public MaxUniqueNAggregator(int arity) {
+ this.arity = arity;
+ }
+
+ @Override
+ public void reset() {
+ if (elements == null) {
+ elements = new TreeSet<V>();
+ } else {
+ elements.clear();
+ }
+ }
+
+ @Override
+ public void update(V value) {
+ if (elements.size() < arity) {
+ elements.add(value);
+ } else if (!elements.contains(value) && value.compareTo(elements.first()) > 0) {
elements.remove(elements.first());
elements.add(value);
}
@@ -843,7 +900,7 @@ public final class Aggregators {
private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
private final int arity;
- private transient SortedSet<V> elements;
+ private transient SortedMultiset<V> elements;
public MinNAggregator(int arity) {
this.arity = arity;
@@ -852,7 +909,40 @@ public final class Aggregators {
@Override
public void reset() {
if (elements == null) {
- elements = Sets.newTreeSet();
+ elements = TreeMultiset.create();
+ } else {
+ elements.clear();
+ }
+ }
+
+ @Override
+ public void update(V value) {
+ if (elements.size() < arity) {
+ elements.add(value);
+ } else if (value.compareTo(elements.lastEntry().getElement()) < 0) {
+ elements.remove(elements.lastEntry().getElement());
+ elements.add(value);
+ }
+ }
+
+ @Override
+ public Iterable<V> results() {
+ return ImmutableList.copyOf(elements);
+ }
+ }
+
+ private static class MinUniqueNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+ private final int arity;
+ private transient SortedSet<V> elements;
+
+ public MinUniqueNAggregator(int arity) {
+ this.arity = arity;
+ }
+
+ @Override
+ public void reset() {
+ if (elements == null) {
+ elements = new TreeSet<V>();
} else {
elements.clear();
}
@@ -862,7 +952,7 @@ public final class Aggregators {
public void update(V value) {
if (elements.size() < arity) {
elements.add(value);
- } else if (value.compareTo(elements.last()) < 0) {
+ } else if (!elements.contains(value) && value.compareTo(elements.last()) < 0) {
elements.remove(elements.last());
elements.add(value);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3ab0b078/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
index 6ee1972..9417b08 100644
--- a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
@@ -17,24 +17,7 @@
*/
package org.apache.crunch.fn;
-import static org.apache.crunch.fn.Aggregators.MAX_BIGINTS;
-import static org.apache.crunch.fn.Aggregators.MAX_DOUBLES;
-import static org.apache.crunch.fn.Aggregators.MAX_FLOATS;
-import static org.apache.crunch.fn.Aggregators.MAX_INTS;
-import static org.apache.crunch.fn.Aggregators.MAX_LONGS;
-import static org.apache.crunch.fn.Aggregators.MAX_N;
-import static org.apache.crunch.fn.Aggregators.MIN_BIGINTS;
-import static org.apache.crunch.fn.Aggregators.MIN_DOUBLES;
-import static org.apache.crunch.fn.Aggregators.MIN_FLOATS;
-import static org.apache.crunch.fn.Aggregators.MIN_INTS;
-import static org.apache.crunch.fn.Aggregators.MIN_LONGS;
-import static org.apache.crunch.fn.Aggregators.MIN_N;
-import static org.apache.crunch.fn.Aggregators.STRING_CONCAT;
-import static org.apache.crunch.fn.Aggregators.SUM_BIGINTS;
-import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES;
-import static org.apache.crunch.fn.Aggregators.SUM_FLOATS;
-import static org.apache.crunch.fn.Aggregators.SUM_INTS;
-import static org.apache.crunch.fn.Aggregators.SUM_LONGS;
+import static org.apache.crunch.fn.Aggregators.*;
import static org.hamcrest.Matchers.closeTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertEquals;
@@ -105,6 +88,12 @@ public class AggregatorsTest {
assertThat(apply(MAX_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(98, 1009)));
assertThat(apply(MAX_N(1, String.class), "b", "a"), is(ImmutableList.of("b")));
assertThat(apply(MAX_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("b", "c", "d")));
+ assertThat(apply(MAX_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(3, 3)));
+ }
+
+ @Test
+ public void testMaxUniqueN() {
+ assertThat(apply(MAX_UNIQUE_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(2, 3)));
}
@Test
@@ -112,6 +101,12 @@ public class AggregatorsTest {
assertThat(apply(MIN_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 29)));
assertThat(apply(MIN_N(1, String.class), "b", "a"), is(ImmutableList.of("a")));
assertThat(apply(MIN_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("a", "b", "c")));
+ assertThat(apply(MIN_N(2, Integer.class), 1, 1, 2, 3), is(ImmutableList.of(1, 1)));
+ }
+
+ @Test
+ public void testMinUniqueN() {
+ assertThat(apply(MIN_UNIQUE_N(2, Integer.class), 3, 2, 1, 1), is(ImmutableList.of(1, 2)));
}
@Test