You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/12 19:26:01 UTC

crunch git commit: CRUNCH-484: Add library features from spotify/crunch-lib into crunch-core.

Repository: crunch
Updated Branches:
  refs/heads/master 48e4e7941 -> 3477ea431


CRUNCH-484: Add library features from spotify/crunch-lib into crunch-core.

Signed-off-by: Josh Wills <jw...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3477ea43
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3477ea43
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3477ea43

Branch: refs/heads/master
Commit: 3477ea431015a81ad2cee825ca9410f6da441dee
Parents: 48e4e79
Author: David Whiting <da...@spotify.com>
Authored: Mon Jan 12 15:04:27 2015 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Jan 12 10:15:55 2015 -0800

----------------------------------------------------------------------
 .../java/org/apache/crunch/lib/Average.java     |  59 +++++
 .../main/java/org/apache/crunch/lib/DoFns.java  | 101 ++++++++
 .../java/org/apache/crunch/lib/PTables.java     |  17 ++
 .../java/org/apache/crunch/lib/Quantiles.java   | 254 +++++++++++++++++++
 .../java/org/apache/crunch/lib/TopList.java     | 110 ++++++++
 .../java/org/apache/crunch/lib/AverageTest.java |  52 ++++
 .../java/org/apache/crunch/lib/DoFnsTest.java   |  86 +++++++
 .../java/org/apache/crunch/lib/PTablesTest.java |  40 +++
 .../org/apache/crunch/lib/QuantilesTest.java    | 126 +++++++++
 .../java/org/apache/crunch/lib/TopListTest.java |  68 +++++
 10 files changed, 913 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/Average.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Average.java b/crunch-core/src/main/java/org/apache/crunch/lib/Average.java
new file mode 100644
index 0000000..df6ed3c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Average.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.SUM_LONGS;
+import static org.apache.crunch.fn.Aggregators.pairAggregator;
+
+public class Average {
+
+  /**
+   * Calculate the mean average value by key for a table with numeric values.
+   * @param table PTable of (key, value) pairs to operate on
+   * @param <K> Key type, can be any type
+   * @param <V> Value type, must be numeric (ie. extend java.lang.Number)
+   * @return PTable&lt;K, Double&gt; of (key, mean(values)) pairs
+   */
+  public static <K, V extends Number> PTable<K, Double> meanValue(PTable<K, V> table) {
+    PTypeFamily ptf = table.getTypeFamily();
+    PTable<K, Pair<Double, Long>> withCounts = table.mapValues(new MapFn<V, Pair<Double, Long>>() {
+
+      @Override
+      public Pair<Double, Long> map(V input) {
+        return Pair.of(input.doubleValue(), 1L);
+      }
+    }, ptf.pairs(ptf.doubles(), ptf.longs()));
+    PGroupedTable<K, Pair<Double, Long>> grouped = withCounts.groupByKey();
+
+    return grouped.combineValues(pairAggregator(SUM_DOUBLES(), SUM_LONGS()))
+            .mapValues(new MapFn<Pair<Double, Long>, Double>() {
+             @Override
+             public Double map(Pair<Double, Long> input) {
+               return input.first() / input.second();
+             }
+            }, ptf.doubles());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java
new file mode 100644
index 0000000..cbf819f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.Serializable;
+
+public class DoFns {
+  /**
+   * "Reduce" DoFn wrapper which detaches the values in the iterable, preventing the unexpected behaviour related to
+   * object reuse often observed when using Avro. Wrap your DoFn in a detach(...) and pass in a PType for the Iterable
+   * value, and then you'll be handed an Iterable of real distinct objects, instead of the same object being handed to
+   * you multiple times with different data.
+   *
+   * You should use this when you have a parallelDo after a groupBy, and you'd like to capture the objects arriving in
+   * the Iterable part of the incoming Pair and pass it through to the output (for example if you want to create an
+   * array of outputs from the values to be output as one record).
+   *
+   * The will incur a performance hit, as it means that every object read from the Iterable will allocate a new Java
+   * object for the record and objects for all its non-primitive fields too. If you are rolling up records into a
+   * collection then this will be necessary anyway, but if you are only outputting derived data this may impact the
+   * speed and memory usage of your job unnecessarily.
+   *
+   * @param reduceFn Underlying DoFn to wrap
+   * @param valueType PType of the object contained within the Iterable
+   * @param <K> Reduce key
+   * @param <V> Iterable value
+   * @param <T> Output type of DoFn
+   * @return DoFn which will detach values for you
+   */
+  public static <K, V, T> DoFn<Pair<K, Iterable<V>>, T> detach(final DoFn<Pair<K, Iterable<V>>, T> reduceFn, final PType<V> valueType) {
+    return new DetachingDoFn<K, V, T>(reduceFn, valueType);
+  }
+
+  private static class DetachFunction<T> implements Function<T, T>, Serializable {
+    private final PType<T> pType;
+
+    public DetachFunction(PType<T> initializedPType) {
+      this.pType = initializedPType;
+    }
+
+    @Override
+    public T apply(T t) {
+      return pType.getDetachedValue(t);
+    }
+  }
+
+  private static class DetachingDoFn<K, V, T> extends DoFn<Pair<K, Iterable<V>>, T> {
+
+    private final DoFn<Pair<K, Iterable<V>>, T> reduceFn;
+    private final PType<V> valueType;
+
+    public DetachingDoFn(DoFn<Pair<K, Iterable<V>>, T> reduceFn, PType<V> valueType) {
+      this.reduceFn = reduceFn;
+      this.valueType = valueType;
+    }
+
+    @Override
+    public void configure(Configuration configuration) {
+      super.configure(configuration);
+      reduceFn.configure(configuration);
+    }
+
+    @Override
+    public void initialize() {
+      reduceFn.initialize();
+      valueType.initialize(getConfiguration() == null ? new Configuration() : getConfiguration());
+    }
+
+    @Override
+    public void process(Pair<K, Iterable<V>> input, Emitter<T> emitter) {
+      reduceFn.process(Pair.of(input.first(), detachIterable(input.second(), valueType)), emitter);
+    }
+
+    public Iterable<V> detachIterable(Iterable<V> iterable, final PType<V> pType) {
+      return Iterables.transform(iterable, new DetachFunction<V>(pType));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
index e0a3bf3..465e3f1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
@@ -213,4 +213,21 @@ public class PTables {
     return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
         (Iterable<V>) detachedIterable);
   }
+
+  /**
+   * Swap the key and value part of a table. The original PTypes are used in the opposite order
+   * @param table PTable to process
+   * @param <K> Key type (will become value type)
+   * @param <V> Value type (will become key type)
+   * @return PType&lt;V, K&gt; containing the same data as the original
+   */
+  public static <K, V> PTable<V, K> swapKeyValue(PTable<K, V> table) {
+    PTypeFamily ptf = table.getTypeFamily();
+    return table.parallelDo(new MapFn<Pair<K, V>, Pair<V, K>>() {
+      @Override
+      public Pair<V, K> map(Pair<K, V> input) {
+        return Pair.of(input.second(), input.first());
+      }
+    }, ptf.tableOf(table.getValueType(), table.getKeyType()));
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
new file mode 100644
index 0000000..d6fc454
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.PeekingIterator;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class Quantiles {
+
+  /**
+   * Calculate a set of quantiles for each key in a numerically-valued table.
+   *
+   * Quantiles are calculated on a per-key basis by counting, joining and sorting. This is highly scalable, but takes
+   * 2 more map-reduce cycles than if you can guarantee that the value set will fit into memory. Use inMemory
+   * if you have less than the order of 10M values per key.
+   *
+   * The quantile definition that we use here is the "nearest rank" defined here:
+   * http://en.wikipedia.org/wiki/Percentile#Definition
+   *
+   * @param table numerically-valued PTable
+   * @param p1 First quantile (in the range 0.0 - 1.0)
+   * @param pn More quantiles (in the range 0.0 - 1.0)
+   * @param <K> Key type of the table
+   * @param <V> Value type of the table (must extends java.lang.Number)
+   * @return PTable of each key with a collection of pairs of the quantile provided and it's result.
+   */
+  public static <K, V extends Number> PTable<K, Result<V>> distributed(PTable<K, V> table,
+          double p1, double... pn) {
+    final List<Double> quantileList = createListFromVarargs(p1, pn);
+
+    PTypeFamily ptf = table.getTypeFamily();
+    PTable<K, Long> totalCounts = table.keys().count();
+    PTable<K, Pair<Long, V>> countValuePairs = totalCounts.join(table);
+    PTable<K, Pair<V, Long>> valueCountPairs =
+            countValuePairs.mapValues(new SwapPairComponents<Long, V>(), ptf.pairs(table.getValueType(), ptf.longs()));
+
+
+    return SecondarySort.sortAndApply(
+            valueCountPairs,
+            new DistributedQuantiles<K, V>(quantileList),
+            ptf.tableOf(table.getKeyType(), Result.pType(table.getValueType())));
+  }
+
+  /**
+   * Calculate a set of quantiles for each key in a numerically-valued table.
+   *
+   * Quantiles are calculated on a per-key basis by grouping, reading the data into memory, then sorting and
+   * and calculating. This is much faster than the distributed option, but if you get into the order of 10M+ per key, then
+   * performance might start to degrade or even cause OOMs.
+   *
+   * The quantile definition that we use here is the "nearest rank" defined here:
+   * http://en.wikipedia.org/wiki/Percentile#Definition
+   *
+   * @param table numerically-valued PTable
+   * @param p1 First quantile (in the range 0.0 - 1.0)
+   * @param pn More quantiles (in the range 0.0 - 1.0)
+   * @param <K> Key type of the table
+   * @param <V> Value type of the table (must extends java.lang.Number)
+   * @return PTable of each key with a collection of pairs of the quantile provided and it's result.
+   */
+  public static <K, V extends Comparable> PTable<K, Result<V>> inMemory(PTable<K, V> table,
+          double p1, double... pn) {
+    final List<Double> quantileList = createListFromVarargs(p1, pn);
+
+    PTypeFamily ptf = table.getTypeFamily();
+
+    return table
+            .groupByKey()
+            .parallelDo(new InMemoryQuantiles<K, V>(quantileList),
+                        ptf.tableOf(table.getKeyType(), Result.pType(table.getValueType())));
+  }
+
+  private static List<Double> createListFromVarargs(double p1, double[] pn) {
+    final List<Double> quantileList = Lists.newArrayList(p1);
+    for (double p: pn) {
+      quantileList.add(p);
+    }
+    return quantileList;
+  }
+
+  private static class SwapPairComponents<T1, T2> extends MapFn<Pair<T1, T2>, Pair<T2, T1>> {
+    @Override
+    public Pair<T2, T1> map(Pair<T1, T2> input) {
+      return Pair.of(input.second(), input.first());
+    }
+  }
+
+  private static <V> Collection<Pair<Double, V>> findQuantiles(Iterator<V> sortedCollectionIterator,
+          long collectionSize, List<Double> quantiles) {
+    Collection<Pair<Double, V>> output = Lists.newArrayList();
+    Multimap<Long, Double> quantileIndices = ArrayListMultimap.create();
+
+    for (double quantile: quantiles) {
+      long idx = Math.max((int) Math.ceil(quantile * collectionSize) - 1, 0);
+      quantileIndices.put(idx, quantile);
+    }
+
+    long index = 0;
+    while (sortedCollectionIterator.hasNext()) {
+      V value = sortedCollectionIterator.next();
+      if (quantileIndices.containsKey(index)) {
+        for (double quantile: quantileIndices.get(index)) {
+          output.add(Pair.of(quantile, value));
+        }
+      }
+      index++;
+    }
+    return output;
+  }
+
+  private static class InMemoryQuantiles<K, V extends Comparable> extends
+          MapFn<Pair<K, Iterable<V>>, Pair<K, Result<V>>> {
+    private final List<Double> quantileList;
+
+    public InMemoryQuantiles(List<Double> quantiles) {
+      this.quantileList = quantiles;
+    }
+
+    @Override
+    public Pair<K, Result<V>> map(Pair<K, Iterable<V>> input) {
+      List<V> values = Lists.newArrayList(input.second().iterator());
+      Collections.sort(values);
+      return Pair.of(input.first(), new Result<V>(values.size(), findQuantiles(values.iterator(), values.size(), quantileList)));
+    }
+  }
+
+  private static class DistributedQuantiles<K, V> extends
+          MapFn<Pair<K, Iterable<Pair<V, Long>>>, Pair<K, Result<V>>> {
+    private final List<Double> quantileList;
+
+    public DistributedQuantiles(List<Double> quantileList) {
+      this.quantileList = quantileList;
+    }
+
+    @Override
+    public Pair<K, Result<V>> map(Pair<K, Iterable<Pair<V, Long>>> input) {
+
+      PeekingIterator<Pair<V, Long>> iterator = Iterators.peekingIterator(input.second().iterator());
+      long count = iterator.peek().second();
+
+      Iterator<V> valueIterator = Iterators.transform(iterator, new Function<Pair<V, Long>, V>() {
+        @Override
+        public V apply(@Nullable Pair<V, Long> input) {
+          return input.first();
+        }
+      });
+
+      Collection<Pair<Double, V>> output = findQuantiles(valueIterator, count, quantileList);
+      return Pair.of(input.first(), new Result<V>(count, output));
+    }
+  }
+
+  /**
+   * Output type for storing the results of a Quantiles computation
+   * @param <V> Quantile value type
+   */
+  public static class Result<V> {
+    public final long count;
+    public final Map<Double, V> quantiles = Maps.newTreeMap();
+
+    public Result(long count, Iterable<Pair<Double, V>> quantiles) {
+      this.count = count;
+      for (Pair<Double,V> quantile: quantiles) {
+        this.quantiles.put(quantile.first(), quantile.second());
+      }
+    }
+
+    /**
+     * Create a PType for the result type, to be stored as a derived type from Crunch primitives
+     * @param valuePType PType for the V type, whose family will also be used to create the derived type
+     * @param <V> Value type
+     * @return PType for serializing Result&lt;V&gt;
+     */
+    public static <V> PType<Result<V>> pType(PType<V> valuePType) {
+      PTypeFamily ptf = valuePType.getFamily();
+
+      @SuppressWarnings("unchecked")
+      Class<Result<V>> prClass = (Class<Result<V>>)(Class)Result.class;
+
+      return ptf.derivedImmutable(prClass, new MapFn<Pair<Collection<Pair<Double, V>>, Long>, Result<V>>() {
+        @Override
+        public Result<V> map(Pair<Collection<Pair<Double, V>>, Long> input) {
+          return new Result<V>(input.second(), input.first());
+        }
+      }, new MapFn<Result<V>, Pair<Collection<Pair<Double, V>>, Long>>() {
+        @Override
+        public Pair<Collection<Pair<Double, V>>, Long> map(Result<V> input) {
+          return Pair.of(asCollection(input.quantiles), input.count);
+        }
+      }, ptf.pairs(ptf.collections(ptf.pairs(ptf.doubles(), valuePType)), ptf.longs()));
+    }
+
+    private static <K, V> Collection<Pair<K, V>> asCollection(Map<K, V> map) {
+      Collection<Pair<K, V>> collection = Lists.newArrayList();
+      for (Map.Entry<K, V> entry: map.entrySet()) {
+        collection.add(Pair.of(entry.getKey(), entry.getValue()));
+      }
+      return collection;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      Result result = (Result) o;
+
+      if (count != result.count) return false;
+      if (!quantiles.equals(result.quantiles)) return false;
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = (int) (count ^ (count >>> 32));
+      result = 31 * result + quantiles.hashCode();
+      return result;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java b/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java
new file mode 100644
index 0000000..54d20f9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Tools for creating top lists of items in PTables and PCollections
+ */
+public class TopList {
+
+  /**
+   * Create a top-list of elements in the provided PTable, categorised by the key of the input table and using the count
+   * of the value part of the input table. Example: if input = Table(Country, Track), then this will give you the most
+   * common n tracks for each country.
+   * @param input table of X Y pairs
+   * @param n How many Y values to include in the toplist per X (this will be in memory, so don't make this ridiculous)
+   * @param <X> group type
+   * @param <Y> value type
+   * @return table of each unique X value mapped to a collection of (count, Y) pairs
+   */
+  public static <X, Y> PTable<X, Collection<Pair<Long, Y>>> topNYbyX(PTable<X, Y> input, final int n) {
+    final PType<X> xType = input.getKeyType();
+    final PType<Y> yType = input.getValueType();
+    PTypeFamily f = xType.getFamily();
+    PTable<X, Pair<Long, Y>> counted = input.count().parallelDo(new MapFn<Pair<Pair<X, Y>, Long>, Pair<X, Pair<Long, Y>>>() {
+      @Override
+      public Pair<X, Pair<Long, Y>> map(Pair<Pair<X, Y>, Long> input) {
+        return Pair.of(input.first().first(), Pair.of(-input.second(), input.first().second()));
+      }
+    }, f.tableOf(xType, f.pairs(f.longs(), yType)));
+    return SecondarySort.sortAndApply(counted, new MapFn<Pair<X, Iterable<Pair<Long, Y>>>, Pair<X, Collection<Pair<Long, Y>>>>() {
+
+      private PTableType<Long, Y> tableType;
+
+      @Override
+      public void initialize() {
+        PTypeFamily ptf = yType.getFamily();
+        tableType = ptf.tableOf(ptf.longs(), yType);
+        tableType.initialize(getConfiguration());
+      }
+
+      @Override
+      public Pair<X, Collection<Pair<Long, Y>>> map(Pair<X, Iterable<Pair<Long, Y>>> input) {
+        Collection<Pair<Long, Y>> values = Lists.newArrayList();
+        Iterator<Pair<Long, Y>> iter = input.second().iterator();
+        for (int i = 0; i < n; i++) {
+          if (!iter.hasNext()) {
+            break;
+          }
+          Pair<Long, Y> pair = PTables.getDetachedValue(tableType, iter.next());
+          values.add(Pair.of(-pair.first(), pair.second()));
+        }
+        return Pair.of(input.first(), values);
+      }
+    }, f.tableOf(xType, f.collections(f.pairs(f.longs(), yType))));
+  }
+
+  /**
+   * Create a list of unique items in the input collection with their count, sorted descending by their frequency.
+   * @param input input collection
+   * @param <X> record type
+   * @return global toplist
+   */
+  public static <X> PTable<X, Long> globalToplist(PCollection<X> input) {
+    return negateCounts(negateCounts(input.count()).groupByKey(1).ungroup());
+  }
+
+  /**
+   * When creating toplists, it is often required to sort by count descending. As some sort operations don't support
+   * order (such as SecondarySort), this method will negate counts so that a natural-ordered sort will produce a
+   * descending order.
+   * @param table PTable to process
+   * @param <K> key type
+   * @return PTable of the same format with the value negated
+   */
+  public static <K> PTable<K, Long> negateCounts(PTable<K, Long> table) {
+    return table.parallelDo(new MapFn<Pair<K, Long>, Pair<K, Long>>() {
+      @Override
+      public Pair<K, Long> map(Pair<K, Long> input) {
+        return Pair.of(input.first(), -input.second());
+      }
+    }, table.getPTableType());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java
new file mode 100644
index 0000000..6fadd35
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.PTable;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+public class AverageTest {
+  @Test
+  public void testMeanValue() {
+    PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+            tableOf(strings(), ints()),
+            "a", 2,
+            "a", 10,
+            "b", 3,
+            "c", 3,
+            "c", 4,
+            "c", 5);
+    Map<String, Double> actual = Average.meanValue(testTable).materializeToMap();
+    Map<String, Double> expected = ImmutableMap.of(
+            "a", 6.0,
+            "b", 3.0,
+            "c", 4.0
+    );
+
+    assertEquals(expected, actual);
+  }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java
new file mode 100644
index 0000000..3e70882
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import org.apache.avro.util.Utf8;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+public class DoFnsTest {
+
+  private static class AvroIterable implements Iterable<Employee> {
+    @Override
+    public Iterator<Employee> iterator() {
+      final Employee rec = new Employee(new Utf8("something"), 10000, new Utf8(""));
+      return new AbstractIterator<Employee>() {
+        private int n = 0;
+        @Override
+        protected Employee computeNext() {
+          n++;
+          if (n > 3) return endOfData();
+          rec.setDepartment(new Utf8(Strings.repeat("*", n)));
+          return rec;
+        }
+      };
+    }
+  }
+
+  private static class CollectingMapFn extends
+          MapFn<Pair<String, Iterable<Employee>>, Collection<Employee>> {
+
+    @Override
+    public Collection<Employee> map(Pair<String, Iterable<Employee>> input) {
+      return Lists.newArrayList(input.second());
+    }
+  }
+
+  @Test
+  public void testDetach() {
+    Collection<Employee> expected = Lists.newArrayList(
+            new Employee(new Utf8("something"), 10000, new Utf8("*")),
+            new Employee(new Utf8("something"), 10000, new Utf8("**")),
+            new Employee(new Utf8("something"), 10000, new Utf8("***"))
+    );
+    DoFn<Pair<String, Iterable<Employee>>, Collection<Employee>> doFn =
+            DoFns.detach(new CollectingMapFn(), Avros.specifics(Employee.class));
+    Pair<String, Iterable<Employee>> input = Pair.of("key", (Iterable<Employee>) new AvroIterable());
+    InMemoryEmitter<Collection<Employee>> emitter = new InMemoryEmitter<Collection<Employee>>();
+
+    doFn.configure(new Configuration());
+    doFn.initialize();
+    doFn.process(input, emitter);
+    doFn.cleanup(emitter);
+
+    assertEquals(expected, emitter.getOutput().get(0));
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java
new file mode 100644
index 0000000..a53a33c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.PTable;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.longs;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.apache.crunch.types.avro.Avros.tableOf;
+import static org.junit.Assert.assertEquals;
+
+public class PTablesTest {
+  @Test
+  public void testSwapKeyValue() {
+    PTable<String, Long> table = MemPipeline.typedTableOf(tableOf(strings(), longs()), "hello", 14L, "goodbye", 21L);
+    PTable<Long, String> actual = PTables.swapKeyValue(table);
+    Map<Long, String> expected = ImmutableMap.of(14L, "hello", 21L, "goodbye");
+    assertEquals(expected, actual.materializeToMap());
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java
new file mode 100644
index 0000000..ea2a312
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.lib.Quantiles.Result;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+public class QuantilesTest {
+
+  private static <T> Quantiles.Result<T> result(long count, Pair<Double, T>... quantiles) {
+    return new Quantiles.Result<T>(count, Lists.newArrayList(quantiles));
+  }
+
+  @Test
+  public void testQuantilesExact() {
+    PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+            tableOf(strings(), ints()),
+            "a", 5,
+            "a", 2,
+            "a", 3,
+            "a", 4,
+            "a", 1);
+    Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0, 0.5, 1.0).materializeToMap();
+    Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0, 0.5, 1.0).materializeToMap();
+    Map<String, Result<Integer>> expected = ImmutableMap.of(
+            "a", result(5, Pair.of(0.0, 1), Pair.of(0.5, 3), Pair.of(1.0, 5))
+    );
+
+    assertEquals(expected, actualS);
+    assertEquals(expected, actualM);
+  }
+
+  @Test
+  public void testQuantilesBetween() {
+    PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+            tableOf(strings(), ints()),
+            "a", 5,
+            "a", 2, // We expect the 0.5 to correspond to this element, according to the "nearest rank" %ile definition.
+            "a", 4,
+            "a", 1);
+    Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0.5).materializeToMap();
+    Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0.5).materializeToMap();
+    Map<String, Result<Integer>> expected = ImmutableMap.of(
+            "a", result(4, Pair.of(0.5, 2))
+    );
+
+    assertEquals(expected, actualS);
+    assertEquals(expected, actualM);
+  }
+
+  @Test
+  public void testQuantilesNines() {
+    PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+            tableOf(strings(), ints()),
+            "a", 10,
+            "a", 20,
+            "a", 30,
+            "a", 40,
+            "a", 50,
+            "a", 60,
+            "a", 70,
+            "a", 80,
+            "a", 90,
+            "a", 100);
+    Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0.9, 0.99).materializeToMap();
+    Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0.9, 0.99).materializeToMap();
+    Map<String, Result<Integer>> expected = ImmutableMap.of(
+            "a", result(10, Pair.of(0.9, 90), Pair.of(0.99, 100))
+    );
+
+    assertEquals(expected, actualS);
+    assertEquals(expected, actualM);
+  }
+
+  @Test
+  public void testQuantilesLessThanOrEqual() {
+    PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+            tableOf(strings(), ints()),
+            "a", 10,
+            "a", 20,
+            "a", 30,
+            "a", 40,
+            "a", 50,
+            "a", 60,
+            "a", 70,
+            "a", 80,
+            "a", 90,
+            "a", 100);
+    Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0.5).materializeToMap();
+    Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0.5).materializeToMap();
+    Map<String, Result<Integer>> expected = ImmutableMap.of(
+            "a", result(10, Pair.of(0.5, 50))
+    );
+
+    assertEquals(expected, actualS);
+    assertEquals(expected, actualM);
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java
new file mode 100644
index 0000000..3d0cdbd
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.apache.crunch.types.avro.Avros.tableOf;
+import static org.junit.Assert.assertEquals;
+
+public class TopListTest {
+
+  private <T> Collection<T> collectionOf(T... elements) {
+    return Lists.newArrayList(elements);
+  }
+
+  @Test
+  public void testTopNYbyX() {
+    PTable<String, String> data = MemPipeline.typedTableOf(tableOf(strings(), strings()),
+            "a", "x",
+            "a", "x",
+            "a", "x",
+            "a", "y",
+            "a", "y",
+            "a", "z",
+            "b", "x",
+            "b", "x",
+            "b", "z");
+    Map<String, Collection<Pair<Long, String>>> actual = TopList.topNYbyX(data, 2).materializeToMap();
+    Map<String, Collection<Pair<Long, String>>> expected = ImmutableMap.of(
+            "a", collectionOf(Pair.of(3L, "x"), Pair.of(2L, "y")),
+            "b", collectionOf(Pair.of(2L, "x"), Pair.of(1L, "z")));
+
+    assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testGlobalToplist() {
+    PCollection<String> data = MemPipeline.typedCollectionOf(strings(), "a", "a", "a", "b", "b", "c", "c", "c", "c");
+    Map<String, Long> actual = TopList.globalToplist(data).materializeToMap();
+    Map<String, Long> expected = ImmutableMap.of("c", 4L, "a", 3L, "b", 2L);
+    assertEquals(expected, actual);
+  }
+}
\ No newline at end of file