You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/12 19:26:01 UTC
crunch git commit: CRUNCH-484: Add library features from
spotify/crunch-lib into crunch-core.
Repository: crunch
Updated Branches:
refs/heads/master 48e4e7941 -> 3477ea431
CRUNCH-484: Add library features from spotify/crunch-lib into crunch-core.
Signed-off-by: Josh Wills <jw...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/3477ea43
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/3477ea43
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/3477ea43
Branch: refs/heads/master
Commit: 3477ea431015a81ad2cee825ca9410f6da441dee
Parents: 48e4e79
Author: David Whiting <da...@spotify.com>
Authored: Mon Jan 12 15:04:27 2015 +0100
Committer: Josh Wills <jw...@apache.org>
Committed: Mon Jan 12 10:15:55 2015 -0800
----------------------------------------------------------------------
.../java/org/apache/crunch/lib/Average.java | 59 +++++
.../main/java/org/apache/crunch/lib/DoFns.java | 101 ++++++++
.../java/org/apache/crunch/lib/PTables.java | 17 ++
.../java/org/apache/crunch/lib/Quantiles.java | 254 +++++++++++++++++++
.../java/org/apache/crunch/lib/TopList.java | 110 ++++++++
.../java/org/apache/crunch/lib/AverageTest.java | 52 ++++
.../java/org/apache/crunch/lib/DoFnsTest.java | 86 +++++++
.../java/org/apache/crunch/lib/PTablesTest.java | 40 +++
.../org/apache/crunch/lib/QuantilesTest.java | 126 +++++++++
.../java/org/apache/crunch/lib/TopListTest.java | 68 +++++
10 files changed, 913 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/Average.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Average.java b/crunch-core/src/main/java/org/apache/crunch/lib/Average.java
new file mode 100644
index 0000000..df6ed3c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Average.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTypeFamily;
+
+import static org.apache.crunch.fn.Aggregators.SUM_DOUBLES;
+import static org.apache.crunch.fn.Aggregators.SUM_LONGS;
+import static org.apache.crunch.fn.Aggregators.pairAggregator;
+
+public class Average {
+
+ /**
+ * Calculate the mean average value by key for a table with numeric values.
+ * @param table PTable of (key, value) pairs to operate on
+ * @param <K> Key type, can be any type
+ * @param <V> Value type, must be numeric (ie. extend java.lang.Number)
+ * @return PTable<K, Double> of (key, mean(values)) pairs
+ */
+ public static <K, V extends Number> PTable<K, Double> meanValue(PTable<K, V> table) {
+ PTypeFamily ptf = table.getTypeFamily();
+ PTable<K, Pair<Double, Long>> withCounts = table.mapValues(new MapFn<V, Pair<Double, Long>>() {
+
+ @Override
+ public Pair<Double, Long> map(V input) {
+ return Pair.of(input.doubleValue(), 1L);
+ }
+ }, ptf.pairs(ptf.doubles(), ptf.longs()));
+ PGroupedTable<K, Pair<Double, Long>> grouped = withCounts.groupByKey();
+
+ return grouped.combineValues(pairAggregator(SUM_DOUBLES(), SUM_LONGS()))
+ .mapValues(new MapFn<Pair<Double, Long>, Double>() {
+ @Override
+ public Double map(Pair<Double, Long> input) {
+ return input.first() / input.second();
+ }
+ }, ptf.doubles());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java
new file mode 100644
index 0000000..cbf819f
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/DoFns.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Iterables;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Emitter;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.Serializable;
+
+public class DoFns {
+ /**
+ * "Reduce" DoFn wrapper which detaches the values in the iterable, preventing the unexpected behaviour related to
+ * object reuse often observed when using Avro. Wrap your DoFn in a detach(...) and pass in a PType for the Iterable
+ * value, and then you'll be handed an Iterable of real distinct objects, instead of the same object being handed to
+ * you multiple times with different data.
+ *
+ * You should use this when you have a parallelDo after a groupBy, and you'd like to capture the objects arriving in
+ * the Iterable part of the incoming Pair and pass it through to the output (for example if you want to create an
+ * array of outputs from the values to be output as one record).
+ *
+ * The will incur a performance hit, as it means that every object read from the Iterable will allocate a new Java
+ * object for the record and objects for all its non-primitive fields too. If you are rolling up records into a
+ * collection then this will be necessary anyway, but if you are only outputting derived data this may impact the
+ * speed and memory usage of your job unnecessarily.
+ *
+ * @param reduceFn Underlying DoFn to wrap
+ * @param valueType PType of the object contained within the Iterable
+ * @param <K> Reduce key
+ * @param <V> Iterable value
+ * @param <T> Output type of DoFn
+ * @return DoFn which will detach values for you
+ */
+ public static <K, V, T> DoFn<Pair<K, Iterable<V>>, T> detach(final DoFn<Pair<K, Iterable<V>>, T> reduceFn, final PType<V> valueType) {
+ return new DetachingDoFn<K, V, T>(reduceFn, valueType);
+ }
+
+ private static class DetachFunction<T> implements Function<T, T>, Serializable {
+ private final PType<T> pType;
+
+ public DetachFunction(PType<T> initializedPType) {
+ this.pType = initializedPType;
+ }
+
+ @Override
+ public T apply(T t) {
+ return pType.getDetachedValue(t);
+ }
+ }
+
+ private static class DetachingDoFn<K, V, T> extends DoFn<Pair<K, Iterable<V>>, T> {
+
+ private final DoFn<Pair<K, Iterable<V>>, T> reduceFn;
+ private final PType<V> valueType;
+
+ public DetachingDoFn(DoFn<Pair<K, Iterable<V>>, T> reduceFn, PType<V> valueType) {
+ this.reduceFn = reduceFn;
+ this.valueType = valueType;
+ }
+
+ @Override
+ public void configure(Configuration configuration) {
+ super.configure(configuration);
+ reduceFn.configure(configuration);
+ }
+
+ @Override
+ public void initialize() {
+ reduceFn.initialize();
+ valueType.initialize(getConfiguration() == null ? new Configuration() : getConfiguration());
+ }
+
+ @Override
+ public void process(Pair<K, Iterable<V>> input, Emitter<T> emitter) {
+ reduceFn.process(Pair.of(input.first(), detachIterable(input.second(), valueType)), emitter);
+ }
+
+ public Iterable<V> detachIterable(Iterable<V> iterable, final PType<V> pType) {
+ return Iterables.transform(iterable, new DetachFunction<V>(pType));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
index e0a3bf3..465e3f1 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/PTables.java
@@ -213,4 +213,21 @@ public class PTables {
return Pair.of(tableType.getKeyType().getDetachedValue(value.first()),
(Iterable<V>) detachedIterable);
}
+
+ /**
+ * Swap the key and value part of a table. The original PTypes are used in the opposite order
+ * @param table PTable to process
+ * @param <K> Key type (will become value type)
+ * @param <V> Value type (will become key type)
+ * @return PType<V, K> containing the same data as the original
+ */
+ public static <K, V> PTable<V, K> swapKeyValue(PTable<K, V> table) {
+ PTypeFamily ptf = table.getTypeFamily();
+ return table.parallelDo(new MapFn<Pair<K, V>, Pair<V, K>>() {
+ @Override
+ public Pair<V, K> map(Pair<K, V> input) {
+ return Pair.of(input.second(), input.first());
+ }
+ }, ptf.tableOf(table.getValueType(), table.getKeyType()));
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
new file mode 100644
index 0000000..d6fc454
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Quantiles.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.base.Function;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.PeekingIterator;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+public class Quantiles {
+
+ /**
+ * Calculate a set of quantiles for each key in a numerically-valued table.
+ *
+ * Quantiles are calculated on a per-key basis by counting, joining and sorting. This is highly scalable, but takes
+ * 2 more map-reduce cycles than if you can guarantee that the value set will fit into memory. Use inMemory
+ * if you have less than the order of 10M values per key.
+ *
+ * The quantile definition that we use here is the "nearest rank" defined here:
+ * http://en.wikipedia.org/wiki/Percentile#Definition
+ *
+ * @param table numerically-valued PTable
+ * @param p1 First quantile (in the range 0.0 - 1.0)
+ * @param pn More quantiles (in the range 0.0 - 1.0)
+ * @param <K> Key type of the table
+ * @param <V> Value type of the table (must extends java.lang.Number)
+ * @return PTable of each key with a collection of pairs of the quantile provided and it's result.
+ */
+ public static <K, V extends Number> PTable<K, Result<V>> distributed(PTable<K, V> table,
+ double p1, double... pn) {
+ final List<Double> quantileList = createListFromVarargs(p1, pn);
+
+ PTypeFamily ptf = table.getTypeFamily();
+ PTable<K, Long> totalCounts = table.keys().count();
+ PTable<K, Pair<Long, V>> countValuePairs = totalCounts.join(table);
+ PTable<K, Pair<V, Long>> valueCountPairs =
+ countValuePairs.mapValues(new SwapPairComponents<Long, V>(), ptf.pairs(table.getValueType(), ptf.longs()));
+
+
+ return SecondarySort.sortAndApply(
+ valueCountPairs,
+ new DistributedQuantiles<K, V>(quantileList),
+ ptf.tableOf(table.getKeyType(), Result.pType(table.getValueType())));
+ }
+
+ /**
+ * Calculate a set of quantiles for each key in a numerically-valued table.
+ *
+ * Quantiles are calculated on a per-key basis by grouping, reading the data into memory, then sorting and
+ * and calculating. This is much faster than the distributed option, but if you get into the order of 10M+ per key, then
+ * performance might start to degrade or even cause OOMs.
+ *
+ * The quantile definition that we use here is the "nearest rank" defined here:
+ * http://en.wikipedia.org/wiki/Percentile#Definition
+ *
+ * @param table numerically-valued PTable
+ * @param p1 First quantile (in the range 0.0 - 1.0)
+ * @param pn More quantiles (in the range 0.0 - 1.0)
+ * @param <K> Key type of the table
+ * @param <V> Value type of the table (must extends java.lang.Number)
+ * @return PTable of each key with a collection of pairs of the quantile provided and it's result.
+ */
+ public static <K, V extends Comparable> PTable<K, Result<V>> inMemory(PTable<K, V> table,
+ double p1, double... pn) {
+ final List<Double> quantileList = createListFromVarargs(p1, pn);
+
+ PTypeFamily ptf = table.getTypeFamily();
+
+ return table
+ .groupByKey()
+ .parallelDo(new InMemoryQuantiles<K, V>(quantileList),
+ ptf.tableOf(table.getKeyType(), Result.pType(table.getValueType())));
+ }
+
+ private static List<Double> createListFromVarargs(double p1, double[] pn) {
+ final List<Double> quantileList = Lists.newArrayList(p1);
+ for (double p: pn) {
+ quantileList.add(p);
+ }
+ return quantileList;
+ }
+
+ private static class SwapPairComponents<T1, T2> extends MapFn<Pair<T1, T2>, Pair<T2, T1>> {
+ @Override
+ public Pair<T2, T1> map(Pair<T1, T2> input) {
+ return Pair.of(input.second(), input.first());
+ }
+ }
+
+ private static <V> Collection<Pair<Double, V>> findQuantiles(Iterator<V> sortedCollectionIterator,
+ long collectionSize, List<Double> quantiles) {
+ Collection<Pair<Double, V>> output = Lists.newArrayList();
+ Multimap<Long, Double> quantileIndices = ArrayListMultimap.create();
+
+ for (double quantile: quantiles) {
+ long idx = Math.max((int) Math.ceil(quantile * collectionSize) - 1, 0);
+ quantileIndices.put(idx, quantile);
+ }
+
+ long index = 0;
+ while (sortedCollectionIterator.hasNext()) {
+ V value = sortedCollectionIterator.next();
+ if (quantileIndices.containsKey(index)) {
+ for (double quantile: quantileIndices.get(index)) {
+ output.add(Pair.of(quantile, value));
+ }
+ }
+ index++;
+ }
+ return output;
+ }
+
+ private static class InMemoryQuantiles<K, V extends Comparable> extends
+ MapFn<Pair<K, Iterable<V>>, Pair<K, Result<V>>> {
+ private final List<Double> quantileList;
+
+ public InMemoryQuantiles(List<Double> quantiles) {
+ this.quantileList = quantiles;
+ }
+
+ @Override
+ public Pair<K, Result<V>> map(Pair<K, Iterable<V>> input) {
+ List<V> values = Lists.newArrayList(input.second().iterator());
+ Collections.sort(values);
+ return Pair.of(input.first(), new Result<V>(values.size(), findQuantiles(values.iterator(), values.size(), quantileList)));
+ }
+ }
+
+ private static class DistributedQuantiles<K, V> extends
+ MapFn<Pair<K, Iterable<Pair<V, Long>>>, Pair<K, Result<V>>> {
+ private final List<Double> quantileList;
+
+ public DistributedQuantiles(List<Double> quantileList) {
+ this.quantileList = quantileList;
+ }
+
+ @Override
+ public Pair<K, Result<V>> map(Pair<K, Iterable<Pair<V, Long>>> input) {
+
+ PeekingIterator<Pair<V, Long>> iterator = Iterators.peekingIterator(input.second().iterator());
+ long count = iterator.peek().second();
+
+ Iterator<V> valueIterator = Iterators.transform(iterator, new Function<Pair<V, Long>, V>() {
+ @Override
+ public V apply(@Nullable Pair<V, Long> input) {
+ return input.first();
+ }
+ });
+
+ Collection<Pair<Double, V>> output = findQuantiles(valueIterator, count, quantileList);
+ return Pair.of(input.first(), new Result<V>(count, output));
+ }
+ }
+
+ /**
+ * Output type for storing the results of a Quantiles computation
+ * @param <V> Quantile value type
+ */
+ public static class Result<V> {
+ public final long count;
+ public final Map<Double, V> quantiles = Maps.newTreeMap();
+
+ public Result(long count, Iterable<Pair<Double, V>> quantiles) {
+ this.count = count;
+ for (Pair<Double,V> quantile: quantiles) {
+ this.quantiles.put(quantile.first(), quantile.second());
+ }
+ }
+
+ /**
+ * Create a PType for the result type, to be stored as a derived type from Crunch primitives
+ * @param valuePType PType for the V type, whose family will also be used to create the derived type
+ * @param <V> Value type
+ * @return PType for serializing Result<V>
+ */
+ public static <V> PType<Result<V>> pType(PType<V> valuePType) {
+ PTypeFamily ptf = valuePType.getFamily();
+
+ @SuppressWarnings("unchecked")
+ Class<Result<V>> prClass = (Class<Result<V>>)(Class)Result.class;
+
+ return ptf.derivedImmutable(prClass, new MapFn<Pair<Collection<Pair<Double, V>>, Long>, Result<V>>() {
+ @Override
+ public Result<V> map(Pair<Collection<Pair<Double, V>>, Long> input) {
+ return new Result<V>(input.second(), input.first());
+ }
+ }, new MapFn<Result<V>, Pair<Collection<Pair<Double, V>>, Long>>() {
+ @Override
+ public Pair<Collection<Pair<Double, V>>, Long> map(Result<V> input) {
+ return Pair.of(asCollection(input.quantiles), input.count);
+ }
+ }, ptf.pairs(ptf.collections(ptf.pairs(ptf.doubles(), valuePType)), ptf.longs()));
+ }
+
+ private static <K, V> Collection<Pair<K, V>> asCollection(Map<K, V> map) {
+ Collection<Pair<K, V>> collection = Lists.newArrayList();
+ for (Map.Entry<K, V> entry: map.entrySet()) {
+ collection.add(Pair.of(entry.getKey(), entry.getValue()));
+ }
+ return collection;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ Result result = (Result) o;
+
+ if (count != result.count) return false;
+ if (!quantiles.equals(result.quantiles)) return false;
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = (int) (count ^ (count >>> 32));
+ result = 31 * result + quantiles.hashCode();
+ return result;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java b/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java
new file mode 100644
index 0000000..54d20f9
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/TopList.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.Lists;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * Tools for creating top lists of items in PTables and PCollections
+ */
+public class TopList {
+
+ /**
+ * Create a top-list of elements in the provided PTable, categorised by the key of the input table and using the count
+ * of the value part of the input table. Example: if input = Table(Country, Track), then this will give you the most
+ * common n tracks for each country.
+ * @param input table of X Y pairs
+ * @param n How many Y values to include in the toplist per X (this will be in memory, so don't make this ridiculous)
+ * @param <X> group type
+ * @param <Y> value type
+ * @return table of each unique X value mapped to a collection of (count, Y) pairs
+ */
+ public static <X, Y> PTable<X, Collection<Pair<Long, Y>>> topNYbyX(PTable<X, Y> input, final int n) {
+ final PType<X> xType = input.getKeyType();
+ final PType<Y> yType = input.getValueType();
+ PTypeFamily f = xType.getFamily();
+ PTable<X, Pair<Long, Y>> counted = input.count().parallelDo(new MapFn<Pair<Pair<X, Y>, Long>, Pair<X, Pair<Long, Y>>>() {
+ @Override
+ public Pair<X, Pair<Long, Y>> map(Pair<Pair<X, Y>, Long> input) {
+ return Pair.of(input.first().first(), Pair.of(-input.second(), input.first().second()));
+ }
+ }, f.tableOf(xType, f.pairs(f.longs(), yType)));
+ return SecondarySort.sortAndApply(counted, new MapFn<Pair<X, Iterable<Pair<Long, Y>>>, Pair<X, Collection<Pair<Long, Y>>>>() {
+
+ private PTableType<Long, Y> tableType;
+
+ @Override
+ public void initialize() {
+ PTypeFamily ptf = yType.getFamily();
+ tableType = ptf.tableOf(ptf.longs(), yType);
+ tableType.initialize(getConfiguration());
+ }
+
+ @Override
+ public Pair<X, Collection<Pair<Long, Y>>> map(Pair<X, Iterable<Pair<Long, Y>>> input) {
+ Collection<Pair<Long, Y>> values = Lists.newArrayList();
+ Iterator<Pair<Long, Y>> iter = input.second().iterator();
+ for (int i = 0; i < n; i++) {
+ if (!iter.hasNext()) {
+ break;
+ }
+ Pair<Long, Y> pair = PTables.getDetachedValue(tableType, iter.next());
+ values.add(Pair.of(-pair.first(), pair.second()));
+ }
+ return Pair.of(input.first(), values);
+ }
+ }, f.tableOf(xType, f.collections(f.pairs(f.longs(), yType))));
+ }
+
+ /**
+ * Create a list of unique items in the input collection with their count, sorted descending by their frequency.
+ * @param input input collection
+ * @param <X> record type
+ * @return global toplist
+ */
+ public static <X> PTable<X, Long> globalToplist(PCollection<X> input) {
+ return negateCounts(negateCounts(input.count()).groupByKey(1).ungroup());
+ }
+
+ /**
+ * When creating toplists, it is often required to sort by count descending. As some sort operations don't support
+ * order (such as SecondarySort), this method will negate counts so that a natural-ordered sort will produce a
+ * descending order.
+ * @param table PTable to process
+ * @param <K> key type
+ * @return PTable of the same format with the value negated
+ */
+ public static <K> PTable<K, Long> negateCounts(PTable<K, Long> table) {
+ return table.parallelDo(new MapFn<Pair<K, Long>, Pair<K, Long>>() {
+ @Override
+ public Pair<K, Long> map(Pair<K, Long> input) {
+ return Pair.of(input.first(), -input.second());
+ }
+ }, table.getPTableType());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java
new file mode 100644
index 0000000..6fadd35
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/AverageTest.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.PTable;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+public class AverageTest {
+ @Test
+ public void testMeanValue() {
+ PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+ tableOf(strings(), ints()),
+ "a", 2,
+ "a", 10,
+ "b", 3,
+ "c", 3,
+ "c", 4,
+ "c", 5);
+ Map<String, Double> actual = Average.meanValue(testTable).materializeToMap();
+ Map<String, Double> expected = ImmutableMap.of(
+ "a", 6.0,
+ "b", 3.0,
+ "c", 4.0
+ );
+
+ assertEquals(expected, actual);
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java
new file mode 100644
index 0000000..3e70882
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/DoFnsTest.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.base.Strings;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.Lists;
+import org.apache.avro.util.Utf8;
+import org.apache.crunch.DoFn;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.emit.InMemoryEmitter;
+import org.apache.crunch.test.Employee;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+
+public class DoFnsTest {
+
+ private static class AvroIterable implements Iterable<Employee> {
+ @Override
+ public Iterator<Employee> iterator() {
+ final Employee rec = new Employee(new Utf8("something"), 10000, new Utf8(""));
+ return new AbstractIterator<Employee>() {
+ private int n = 0;
+ @Override
+ protected Employee computeNext() {
+ n++;
+ if (n > 3) return endOfData();
+ rec.setDepartment(new Utf8(Strings.repeat("*", n)));
+ return rec;
+ }
+ };
+ }
+ }
+
+ private static class CollectingMapFn extends
+ MapFn<Pair<String, Iterable<Employee>>, Collection<Employee>> {
+
+ @Override
+ public Collection<Employee> map(Pair<String, Iterable<Employee>> input) {
+ return Lists.newArrayList(input.second());
+ }
+ }
+
+ @Test
+ public void testDetach() {
+ Collection<Employee> expected = Lists.newArrayList(
+ new Employee(new Utf8("something"), 10000, new Utf8("*")),
+ new Employee(new Utf8("something"), 10000, new Utf8("**")),
+ new Employee(new Utf8("something"), 10000, new Utf8("***"))
+ );
+ DoFn<Pair<String, Iterable<Employee>>, Collection<Employee>> doFn =
+ DoFns.detach(new CollectingMapFn(), Avros.specifics(Employee.class));
+ Pair<String, Iterable<Employee>> input = Pair.of("key", (Iterable<Employee>) new AvroIterable());
+ InMemoryEmitter<Collection<Employee>> emitter = new InMemoryEmitter<Collection<Employee>>();
+
+ doFn.configure(new Configuration());
+ doFn.initialize();
+ doFn.process(input, emitter);
+ doFn.cleanup(emitter);
+
+ assertEquals(expected, emitter.getOutput().get(0));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java
new file mode 100644
index 0000000..a53a33c
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/PTablesTest.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.crunch.PTable;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.longs;
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.apache.crunch.types.avro.Avros.tableOf;
+import static org.junit.Assert.assertEquals;
+
+public class PTablesTest {
+ @Test
+ public void testSwapKeyValue() {
+ PTable<String, Long> table = MemPipeline.typedTableOf(tableOf(strings(), longs()), "hello", 14L, "goodbye", 21L);
+ PTable<Long, String> actual = PTables.swapKeyValue(table);
+ Map<Long, String> expected = ImmutableMap.of(14L, "hello", 21L, "goodbye");
+ assertEquals(expected, actual.materializeToMap());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java
new file mode 100644
index 0000000..ea2a312
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/QuantilesTest.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.lib.Quantiles.Result;
+
+import org.junit.Test;
+
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.*;
+import static org.junit.Assert.assertEquals;
+
+public class QuantilesTest {
+
+ private static <T> Quantiles.Result<T> result(long count, Pair<Double, T>... quantiles) {
+ return new Quantiles.Result<T>(count, Lists.newArrayList(quantiles));
+ }
+
+ @Test
+ public void testQuantilesExact() {
+ PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+ tableOf(strings(), ints()),
+ "a", 5,
+ "a", 2,
+ "a", 3,
+ "a", 4,
+ "a", 1);
+ Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0, 0.5, 1.0).materializeToMap();
+ Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0, 0.5, 1.0).materializeToMap();
+ Map<String, Result<Integer>> expected = ImmutableMap.of(
+ "a", result(5, Pair.of(0.0, 1), Pair.of(0.5, 3), Pair.of(1.0, 5))
+ );
+
+ assertEquals(expected, actualS);
+ assertEquals(expected, actualM);
+ }
+
+ @Test
+ public void testQuantilesBetween() {
+ PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+ tableOf(strings(), ints()),
+ "a", 5,
+ "a", 2, // We expect the 0.5 to correspond to this element, according to the "nearest rank" %ile definition.
+ "a", 4,
+ "a", 1);
+ Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0.5).materializeToMap();
+ Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0.5).materializeToMap();
+ Map<String, Result<Integer>> expected = ImmutableMap.of(
+ "a", result(4, Pair.of(0.5, 2))
+ );
+
+ assertEquals(expected, actualS);
+ assertEquals(expected, actualM);
+ }
+
+ @Test
+ public void testQuantilesNines() {
+ PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+ tableOf(strings(), ints()),
+ "a", 10,
+ "a", 20,
+ "a", 30,
+ "a", 40,
+ "a", 50,
+ "a", 60,
+ "a", 70,
+ "a", 80,
+ "a", 90,
+ "a", 100);
+ Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0.9, 0.99).materializeToMap();
+ Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0.9, 0.99).materializeToMap();
+ Map<String, Result<Integer>> expected = ImmutableMap.of(
+ "a", result(10, Pair.of(0.9, 90), Pair.of(0.99, 100))
+ );
+
+ assertEquals(expected, actualS);
+ assertEquals(expected, actualM);
+ }
+
+ @Test
+ public void testQuantilesLessThanOrEqual() {
+ PTable<String, Integer> testTable = MemPipeline.typedTableOf(
+ tableOf(strings(), ints()),
+ "a", 10,
+ "a", 20,
+ "a", 30,
+ "a", 40,
+ "a", 50,
+ "a", 60,
+ "a", 70,
+ "a", 80,
+ "a", 90,
+ "a", 100);
+ Map<String, Result<Integer>> actualS = Quantiles.distributed(testTable, 0.5).materializeToMap();
+ Map<String, Result<Integer>> actualM = Quantiles.inMemory(testTable, 0.5).materializeToMap();
+ Map<String, Result<Integer>> expected = ImmutableMap.of(
+ "a", result(10, Pair.of(0.5, 50))
+ );
+
+ assertEquals(expected, actualS);
+ assertEquals(expected, actualM);
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/3477ea43/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java b/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java
new file mode 100644
index 0000000..3d0cdbd
--- /dev/null
+++ b/crunch-core/src/test/java/org/apache/crunch/lib/TopListTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.lib;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Map;
+
+import static org.apache.crunch.types.avro.Avros.strings;
+import static org.apache.crunch.types.avro.Avros.tableOf;
+import static org.junit.Assert.assertEquals;
+
+public class TopListTest {
+
+ private <T> Collection<T> collectionOf(T... elements) {
+ return Lists.newArrayList(elements);
+ }
+
+ @Test
+ public void testTopNYbyX() {
+ PTable<String, String> data = MemPipeline.typedTableOf(tableOf(strings(), strings()),
+ "a", "x",
+ "a", "x",
+ "a", "x",
+ "a", "y",
+ "a", "y",
+ "a", "z",
+ "b", "x",
+ "b", "x",
+ "b", "z");
+ Map<String, Collection<Pair<Long, String>>> actual = TopList.topNYbyX(data, 2).materializeToMap();
+ Map<String, Collection<Pair<Long, String>>> expected = ImmutableMap.of(
+ "a", collectionOf(Pair.of(3L, "x"), Pair.of(2L, "y")),
+ "b", collectionOf(Pair.of(2L, "x"), Pair.of(1L, "z")));
+
+ assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testGlobalToplist() {
+ PCollection<String> data = MemPipeline.typedCollectionOf(strings(), "a", "a", "a", "b", "b", "c", "c", "c", "c");
+ Map<String, Long> actual = TopList.globalToplist(data).materializeToMap();
+ Map<String, Long> expected = ImmutableMap.of("c", 4L, "a", 3L, "b", 2L);
+ assertEquals(expected, actual);
+ }
+}
\ No newline at end of file