You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/03/19 23:58:31 UTC

crunch git commit: CRUNCH-503: Fix MAX_N and MIN_N aggregators and add MAX_UNIQUE_N and MIN_UNIQUE_N

Repository: crunch
Updated Branches:
  refs/heads/master 8c35a1399 -> 3ab0b078c


CRUNCH-503: Fix MAX_N and MIN_N aggregators and add MAX_UNIQUE_N and MIN_UNIQUE_N


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3ab0b078
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3ab0b078
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3ab0b078

Branch: refs/heads/master
Commit: 3ab0b078c47f23b3ba893fdfb05fd723f663d02b
Parents: 8c35a13
Author: Josh Wills <jw...@apache.org>
Authored: Thu Mar 19 12:47:11 2015 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Mar 19 15:45:32 2015 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/fn/Aggregators.java  | 102 +++++++++++++++++--
 .../org/apache/crunch/fn/AggregatorsTest.java   |  31 +++---
 2 files changed, 109 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3ab0b078/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index e7aeb18..5a9c157 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -22,7 +22,10 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.SortedSet;
+import java.util.TreeSet;
 
+import com.google.common.collect.SortedMultiset;
+import com.google.common.collect.TreeMultiset;
 import org.apache.crunch.Aggregator;
 import org.apache.crunch.CombineFn;
 import org.apache.crunch.Emitter;
@@ -209,6 +212,17 @@ public final class Aggregators {
   }
 
   /**
+   * Return the {@code n} largest unique values (or fewer if there are fewer
+   * values than {@code n}).
+   * @param n The number of values to return
+   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+   * @return The newly constructed instance
+   */
+  public static <V extends Comparable<V>> Aggregator<V> MAX_UNIQUE_N(int n, Class<V> cls) {
+    return new MaxUniqueNAggregator<V>(n);
+  }
+
+  /**
    * Return the minimum of all given {@code long} values.
    * @return The newly constructed instance
    */
@@ -310,6 +324,16 @@ public final class Aggregators {
   }
 
   /**
+   * Returns the {@code n} smallest unique values (or fewer if there are fewer unique values than {@code n}).
+   * @param n The number of values to return
+   * @param cls The type of the values to aggregate (must implement {@link Comparable}!)
+   * @return The newly constructed instance
+   */
+  public static <V extends Comparable<V>> Aggregator<V> MIN_UNIQUE_N(int n, Class<V> cls) {
+    return new MinUniqueNAggregator<V>(n);
+  }
+
+  /**
    * Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
    *
    * @param n The number of values to return
@@ -810,7 +834,7 @@ public final class Aggregators {
 
   private static class MaxNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
     private final int arity;
-    private transient SortedSet<V> elements;
+    private transient SortedMultiset<V> elements;
 
     public MaxNAggregator(int arity) {
       this.arity = arity;
@@ -819,7 +843,7 @@ public final class Aggregators {
     @Override
     public void reset() {
       if (elements == null) {
-        elements = Sets.newTreeSet();
+        elements = TreeMultiset.create();
       } else {
         elements.clear();
       }
@@ -829,7 +853,40 @@ public final class Aggregators {
     public void update(V value) {
       if (elements.size() < arity) {
         elements.add(value);
-      } else if (value.compareTo(elements.first()) > 0) {
+      } else if (value.compareTo(elements.firstEntry().getElement()) > 0) {
+        elements.remove(elements.firstEntry().getElement());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class MaxUniqueNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MaxUniqueNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = new TreeSet<V>();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (!elements.contains(value) && value.compareTo(elements.first()) > 0) {
         elements.remove(elements.first());
         elements.add(value);
       }
@@ -843,7 +900,7 @@ public final class Aggregators {
 
   private static class MinNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
     private final int arity;
-    private transient SortedSet<V> elements;
+    private transient SortedMultiset<V> elements;
 
     public MinNAggregator(int arity) {
       this.arity = arity;
@@ -852,7 +909,40 @@ public final class Aggregators {
     @Override
     public void reset() {
       if (elements == null) {
-        elements = Sets.newTreeSet();
+        elements = TreeMultiset.create();
+      } else {
+        elements.clear();
+      }
+    }
+
+    @Override
+    public void update(V value) {
+      if (elements.size() < arity) {
+        elements.add(value);
+      } else if (value.compareTo(elements.lastEntry().getElement()) < 0) {
+        elements.remove(elements.lastEntry().getElement());
+        elements.add(value);
+      }
+    }
+
+    @Override
+    public Iterable<V> results() {
+      return ImmutableList.copyOf(elements);
+    }
+  }
+
+  private static class MinUniqueNAggregator<V extends Comparable<V>> extends SimpleAggregator<V> {
+    private final int arity;
+    private transient SortedSet<V> elements;
+
+    public MinUniqueNAggregator(int arity) {
+      this.arity = arity;
+    }
+
+    @Override
+    public void reset() {
+      if (elements == null) {
+        elements = new TreeSet<V>();
       } else {
         elements.clear();
       }
@@ -862,7 +952,7 @@ public final class Aggregators {
     public void update(V value) {
       if (elements.size() < arity) {
         elements.add(value);
-      } else if (value.compareTo(elements.last()) < 0) {
+      } else if (!elements.contains(value) && value.compareTo(elements.last()) < 0) {
         elements.remove(elements.last());
         elements.add(value);
       }

http://git-wip-us.apache.org/repos/asf/crunch/blob/3ab0b078/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
index 6ee1972..9417b08 100644
--- a/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
+++ b/crunch-core/src/test/java/org/apache/crunch/fn/AggregatorsTest.java
@@ -17,24 +17,7 @@
  */
 package org.apache.crunch.fn;
 
-import static org.apache.crunch.fn.Aggregators.MAX_BIGINTS;
-import static org.apache.crunch.fn.Aggregators.MAX_DOUBLES;
-import static org.apache.crunch.fn.Aggregators.MAX_FLOATS;
-import static org.apache.crunch.fn.Aggregators.MAX_INTS;
-import static org.apache.crunch.fn.Aggregators.MAX_LONGS;
-import static org.apache.crunch.fn.Aggregators.MAX_N;
-import static org.apache.crunch.fn.Aggregators.MIN_BIGINTS;
-import static org.apache.crunch.fn.Aggregators.MIN_DOUBLES;
-import static org.apache.crunch.fn.Aggregators.MIN_FLOATS;
-import static org.apache.crunch.fn.Aggregators.MIN_INTS;
-import static org.apache.crunch.fn.Aggregators.MIN_LONGS;
-import static org.apache.crunch.fn.Aggregators.MIN_N;
-import static org.apache.crunch.fn.Aggregators.STRING_CONCAT;
-import static org.apache.crunch.fn.Aggregators.SUM_BIGINTS;
-import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES;
-import static org.apache.crunch.fn.Aggregators.SUM_FLOATS;
-import static org.apache.crunch.fn.Aggregators.SUM_INTS;
-import static org.apache.crunch.fn.Aggregators.SUM_LONGS;
+import static org.apache.crunch.fn.Aggregators.*;
 import static org.hamcrest.Matchers.closeTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertEquals;
@@ -105,6 +88,12 @@ public class AggregatorsTest {
     assertThat(apply(MAX_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(98, 1009)));
     assertThat(apply(MAX_N(1, String.class), "b", "a"), is(ImmutableList.of("b")));
     assertThat(apply(MAX_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("b", "c", "d")));
+    assertThat(apply(MAX_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(3, 3)));
+  }
+
+  @Test
+  public void testMaxUniqueN() {
+    assertThat(apply(MAX_UNIQUE_N(2, Integer.class), 1, 2, 3, 3), is(ImmutableList.of(2, 3)));
   }
 
   @Test
@@ -112,6 +101,12 @@ public class AggregatorsTest {
     assertThat(apply(MIN_INTS(2), 17, 34, 98, 29, 1009), is(ImmutableList.of(17, 29)));
     assertThat(apply(MIN_N(1, String.class), "b", "a"), is(ImmutableList.of("a")));
     assertThat(apply(MIN_N(3, String.class), "b", "a", "d", "c"), is(ImmutableList.of("a", "b", "c")));
+    assertThat(apply(MIN_N(2, Integer.class), 1, 1, 2, 3), is(ImmutableList.of(1, 1)));
+  }
+
+  @Test
+  public void testMinUniqueN() {
+    assertThat(apply(MIN_UNIQUE_N(2, Integer.class), 3, 2, 1, 1), is(ImmutableList.of(1, 2)));
   }
 
   @Test