You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2014/06/20 05:41:42 UTC

git commit: CRUNCH-424: Aggregators for Scrunch based on Algebird's Monoid type (for sums) and Scala's Ordering type (for min/max).

Repository: crunch
Updated Branches:
  refs/heads/master 189febe62 -> a8a8a9b86


CRUNCH-424: Aggregators for Scrunch based on Algebird's Monoid type (for sums) and
Scala's Ordering type (for min/max).


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/a8a8a9b8
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/a8a8a9b8
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/a8a8a9b8

Branch: refs/heads/master
Commit: a8a8a9b86168e4e38a4cee2e542c4aa3f8df870c
Parents: 189febe
Author: Josh Wills <jw...@apache.org>
Authored: Tue Jun 17 23:40:32 2014 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Thu Jun 19 19:49:45 2014 -0700

----------------------------------------------------------------------
 .../java/org/apache/crunch/PCollection.java     |   1 -
 .../java/org/apache/crunch/fn/Aggregators.java  |   2 +-
 .../java/org/apache/crunch/lib/Aggregate.java   |   8 +-
 crunch-scrunch/pom.xml                          |   4 +
 .../apache/crunch/scrunch/ScalaTypesTest.scala  |   4 +-
 .../org/apache/crunch/scrunch/Aggregators.scala | 271 +++++++++++++++++++
 .../org/apache/crunch/scrunch/PCollection.scala |   2 +
 .../apache/crunch/scrunch/AggregatorsTest.scala |  49 ++++
 pom.xml                                         |   7 +
 9 files changed, 340 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-core/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PCollection.java b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
index bf5bacc..878fbb9 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PCollection.java
@@ -277,5 +277,4 @@ public interface PCollection<S> {
    * Returns a {@code PCollection} that contains the result of aggregating all values in this instance.
    */
   PCollection<S> aggregate(Aggregator<S> aggregator);
-
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
index 0ac79e2..084cca4 100644
--- a/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
+++ b/crunch-core/src/main/java/org/apache/crunch/fn/Aggregators.java
@@ -121,7 +121,7 @@ public final class Aggregators {
    * @return The newly constructed instance
    */
   public static Aggregator<Long> MAX_LONGS(int n) {
-    return new MaxLongs();
+    return new MaxNAggregator<Long>(n);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
index 5ef437c..3d2170c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Aggregate.java
@@ -305,11 +305,11 @@ public class Aggregate {
   
   public static <S> PCollection<S> aggregate(PCollection<S> collect, Aggregator<S> aggregator) {
     PTypeFamily tf = collect.getTypeFamily();
-    return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Boolean, S>>() {
-      public Pair<Boolean, S> map(S input) {
-        return Pair.of(false, input);
+    return collect.parallelDo("Aggregate.aggregator", new MapFn<S, Pair<Void, S>>() {
+      public Pair<Void, S> map(S input) {
+        return Pair.of(null, input);
       }
-    }, tf.tableOf(tf.booleans(), collect.getPType()))
+    }, tf.tableOf(tf.nulls(), collect.getPType()))
     .groupByKey(1)
     .combineValues(aggregator)
     .values();

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-scrunch/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-scrunch/pom.xml b/crunch-scrunch/pom.xml
index 1774431..84fe223 100644
--- a/crunch-scrunch/pom.xml
+++ b/crunch-scrunch/pom.xml
@@ -42,6 +42,10 @@ under the License.
       <artifactId>jline</artifactId>
     </dependency>
     <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>algebird-core_${scala.base.version}</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.crunch</groupId>
       <artifactId>crunch-core</artifactId>
     </dependency>

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
index de9a5f9..e4dc771 100644
--- a/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
+++ b/crunch-scrunch/src/it/scala/org/apache/crunch/scrunch/ScalaTypesTest.scala
@@ -40,10 +40,10 @@ class ScalaTypesTest extends CrunchSuite {
 
     val out = pipeline.read(From.textFile(shakespeare))
         .map(x => if (x.startsWith("a")) Some(x) else None)
+        .aggregate(Aggregators.sum) // uses Algebird Monoid[Option[String]]
         .materialize
-        .take(100)
-    pipeline.done
     assert(out.exists(!_.isEmpty))
+    pipeline.done
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala
new file mode 100644
index 0000000..238a01f
--- /dev/null
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/Aggregators.scala
@@ -0,0 +1,271 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.apache.crunch.fn.{Aggregators => JAgg}
+import org.apache.crunch._
+import com.google.common.collect.ImmutableList
+import org.apache.hadoop.conf.Configuration
+import java.lang.{Iterable => JIterable}
+import scala.collection.JavaConversions
+import com.twitter.algebird.Monoid
+
+/**
+ * Scrunch versions of the common Aggregator types from Crunch.
+ */
+object Aggregators {
+
+  import scala.math.Ordering._
+
+  def sum[T: Monoid]: Aggregator[T] = sumUsing(implicitly[Monoid[T]])
+
+  def sumUsing[T](m: Monoid[T]): Aggregator[T] = new SimpleAggregator[T] {
+    def reset {
+      sum = m.zero
+    }
+
+    def update(next: T) {
+      sum = m.plus(sum, next)
+    }
+
+    def results: JIterable[T] = {
+      return ImmutableList.of(sum)
+    }
+
+    private var sum: T = m.zero
+  }
+
+  def max[T: Ordering] = new SimpleAggregator[T] {
+    def reset {
+      max = None
+    }
+
+    def update(next: T) {
+      if (max.isEmpty || implicitly[Ordering[T]].lt(max.get, next)) {
+        max = Some(next)
+      }
+    }
+
+    def results: JIterable[T] = {
+      return JavaConversions.asJavaIterable(max.toIterable)
+    }
+
+    private var max: Option[T] = None
+  }
+
+  def min[T: Ordering] = new SimpleAggregator[T] {
+    def reset {
+      min = None
+    }
+
+    def update(next: T) {
+      if (min.isEmpty || implicitly[Ordering[T]].gt(min.get, next)) {
+        min = Some(next)
+      }
+    }
+
+    def results: JIterable[T] = {
+      return JavaConversions.asJavaIterable(min.toIterable)
+    }
+
+    private var min: Option[T] = None
+  }
+
+  /**
+   * Return the first {@code n} values (or fewer if there are fewer values than {@code n}).
+   *
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  def first[V](n: Int): Aggregator[V] = JAgg.FIRST_N(n)
+
+  /**
+   * Return the last {@code n} values (or fewer if there are fewer values than {@code n}).
+   *
+   * @param n The number of values to return
+   * @return The newly constructed instance
+   */
+  def last[V](n: Int) = JAgg.LAST_N(n)
+
+  /**
+   * Concatenate strings, with a separator between strings. There
+   * is no limits of length for the concatenated string.
+   *
+   * <p><em>Note: String concatenation is not commutative, which means the
+   * result of the aggregation is not deterministic!</em></p>
+   *
+   * @param separator
+   * the separator which will be appended between each string
+   * @param skipNull
+   * define if we should skip null values. Throw
+   * NullPointerException if set to false and there is a null
+   * value.
+   * @return The newly constructed instance
+   */
+  def concat(separator: String, skipNull: Boolean) = JAgg.STRING_CONCAT(separator, skipNull)
+
+  /**
+   * Concatenate strings, with a separator between strings. You can specify
+   * the maximum length of the output string and of the input strings, if
+   * they are &gt; 0. If a value is &lt;= 0, there is no limit.
+   *
+   * <p>Any too large string (or any string which would made the output too
+   * large) will be silently discarded.</p>
+   *
+   * <p><em>Note: String concatenation is not commutative, which means the
+   * result of the aggregation is not deterministic!</em></p>
+   *
+   * @param separator
+   * the separator which will be appended between each string
+   * @param skipNull
+   * define if we should skip null values. Throw
+   * NullPointerException if set to false and there is a null
+   * value.
+   * @param maxOutputLength
+   * the maximum length of the output string. If it's set &lt;= 0,
+   * there is no limit. The number of characters of the output
+   * string will be &lt; maxOutputLength.
+   * @param maxInputLength
+   * the maximum length of the input strings. If it's set <= 0,
+   * there is no limit. The number of characters of the input string
+   * will be &lt; maxInputLength to be concatenated.
+   * @return The newly constructed instance
+   */
+  def concat(separator: String, skipNull: Boolean, maxOutputLength: Long, maxInputLength: Long) =
+    JAgg.STRING_CONCAT(separator, skipNull, maxOutputLength, maxInputLength)
+
+  /**
+   * Collect the unique elements of the input, as defined by the {@code equals} method for
+   * the input objects. No guarantees are made about the order in which the final elements
+   * will be returned.
+   *
+   * @return The newly constructed instance
+   */
+  def unique[V]: Aggregator[V] = JAgg.UNIQUE_ELEMENTS()
+
+  /**
+   * Collect a sample of unique elements from the input, where 'unique' is defined by
+   * the {@code equals} method for the input objects. No guarantees are made about which
+   * elements will be returned, simply that there will not be any more than the given sample
+   * size for any key.
+   *
+   * @param maximumSampleSize The maximum number of unique elements to return per key
+   * @return The newly constructed instance
+   */
+  def sampleUnique(maximumSampleSize: Int) = JAgg.SAMPLE_UNIQUE_ELEMENTS(maximumSampleSize)
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple2}.
+   */
+  def pair[V1, V2](a1: Aggregator[V1], a2: Aggregator[V2]): Aggregator[(V1, V2)] = {
+    return new Aggregators.PairAggregator[V1, V2](a1, a2)
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple3}.
+   */
+  def trip[V1, V2, V3](a1: Aggregator[V1], a2: Aggregator[V2], a3: Aggregator[V3]): Aggregator[(V1, V2, V3)] = {
+    return new Aggregators.TripAggregator[V1, V2, V3](a1, a2, a3)
+  }
+
+  /**
+   * Apply separate aggregators to each component of a {@link Tuple4}.
+   */
+  def quad[V1, V2, V3, V4](a1: Aggregator[V1], a2: Aggregator[V2], a3: Aggregator[V3], a4: Aggregator[V4])
+    : Aggregator[(V1, V2, V3, V4)] = {
+    return new Aggregators.QuadAggregator[V1, V2, V3, V4](a1, a2, a3, a4)
+  }
+
+  /**
+   * Base class for aggregators that do not require any initialization.
+   */
+  abstract class SimpleAggregator[T] extends Aggregator[T] {
+    def initialize(conf: Configuration) {
+    }
+  }
+
+  private abstract class ProductAggregator[T <: Product](aggs: Array[Aggregator[_]]) extends Aggregator[T] {
+    def initialize(configuration: Configuration) {
+      for (a <- aggs) {
+        a.initialize(configuration)
+      }
+    }
+
+    def reset {
+      for (a <- aggs) {
+        a.reset
+      }
+    }
+
+    protected def updateTuple(t: T) {
+      var i: Int = 0
+      while (i < aggs.length) {
+        aggs(i).asInstanceOf[Aggregator[Any]].update(t.productElement(i))
+        i = i + 1
+      }
+    }
+
+    protected def results[T](index: Int): Iterable[T] = {
+      return JavaConversions.iterableAsScalaIterable(aggs(index).results.asInstanceOf[JIterable[T]])
+    }
+  }
+
+  private class PairAggregator[A, B](val a1: Aggregator[A], val a2: Aggregator[B])
+    extends ProductAggregator[(A, B)](Array(a1, a2)) {
+
+    def update(value: (A, B)) {
+      updateTuple(value)
+    }
+
+    def results: JIterable[(A, B)] = {
+      return JavaConversions.asJavaIterable(results[A](0).zip(results[B](1)))
+    }
+  }
+
+  private class TripAggregator[A, B, C](val a1: Aggregator[A], val a2: Aggregator[B], val a3: Aggregator[C])
+    extends ProductAggregator[(A, B, C)](Array(a1, a2, a3)) {
+    def update(value: (A, B, C)) {
+      updateTuple(value)
+    }
+
+    def results: JIterable[(A, B, C)] = {
+      return JavaConversions.asJavaIterable(
+        results[A](0).zip(results[B](1)).zip(results[C](2))
+          .map(x => (x._1._1, x._1._2, x._2)))
+    }
+  }
+
+  private class QuadAggregator[A, B, C, D](val a1: Aggregator[A], val a2: Aggregator[B],
+                                           val a3: Aggregator[C], val a4: Aggregator[D])
+    extends ProductAggregator[(A, B, C, D)](Array(a1, a2, a3, a4)) {
+
+    def update(value: (A, B, C, D)) {
+      updateTuple(value)
+    }
+
+    def results: JIterable[(A, B, C, D)] = {
+      return JavaConversions.asJavaIterable(
+        (results[A](0).zip(results[B](1))).zip(results[C](2).zip(results[D](3)))
+          .map(x => (x._1._1, x._1._2, x._2._1, x._2._2)))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
index e2f7b5b..ed39d30 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollection.scala
@@ -53,6 +53,8 @@ class PCollection[S](val native: JCollection[S]) extends PCollectionLike[S, PCol
     new PCollection[S](native.union(others.map(_.native) : _*))
   }
 
+  def aggregate(agg: Aggregator[S]) = wrap(native.aggregate(agg))
+
   def by[K: PTypeH](f: S => K): PTable[K, S] = {
     val ptype = getTypeFamily().tableOf(implicitly[PTypeH[K]].get(getTypeFamily()), native.getPType())
     parallelDo(mapKeyFn[S, K](f), ptype)

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala
new file mode 100644
index 0000000..e5d4d78
--- /dev/null
+++ b/crunch-scrunch/src/test/scala/org/apache/crunch/scrunch/AggregatorsTest.scala
@@ -0,0 +1,49 @@
+/*
+ * *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.crunch.scrunch
+
+import org.scalatest.junit.JUnitSuite
+import _root_.org.junit.{Assert, Test}
+
+class AggregatorsTest extends JUnitSuite {
+
+  @Test def testSum {
+    val pc = Mem.collectionOf((1, 0.1), (2, 1.2), (3, 2.2), (4, 2.0), (5, 0.0))
+    val sum = pc.aggregate(Aggregators.sum).materialize().toList
+    Assert.assertEquals(1, sum.size)
+    Assert.assertEquals(15, sum(0)._1)
+    Assert.assertEquals(5.5, sum(0)._2, 0.001)
+  }
+
+  @Test def testMin {
+    val pc = Mem.collectionOf(1, 2, 3, 4, 5)
+    val min = pc.aggregate(Aggregators.min).materialize().toList
+    Assert.assertEquals(1, min.size)
+    Assert.assertEquals(1, min(0))
+  }
+
+  @Test def testMax {
+    val pc = Mem.collectionOf(1, 2, 3, 4, 5)
+    val max = pc.aggregate(Aggregators.max).materialize().toList
+    Assert.assertEquals(1, max.size)
+    Assert.assertEquals(5, max(0))
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/a8a8a9b8/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 990fe39..92028bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -81,6 +81,7 @@ under the License.
     <libthrift.version>0.8.0</libthrift.version>
     <slf4j.version>1.6.1</slf4j.version>
     <log4j.version>1.2.15</log4j.version>
+    <algebird.version>0.6.0</algebird.version>
     <junit.version>4.10</junit.version>
     <hamcrest.version>1.1</hamcrest.version>
     <mockito.version>1.9.0</mockito.version>
@@ -425,6 +426,12 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>com.twitter</groupId>
+        <artifactId>algebird-core_${scala.base.version}</artifactId>
+        <version>${algebird.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.scala-lang</groupId>
         <artifactId>scala-library</artifactId>
         <version>${scala.version}</version>