You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/11/01 13:52:05 UTC
git commit: CRUNCH-286 Allow distinct Combiner to be supplied
Updated Branches:
refs/heads/master cb92723af -> 9a689d1ed
CRUNCH-286 Allow distinct Combiner to be supplied
Allow setting a distinct Combiner implementation that is different
than the Reducer implementation, via the addition of a new
combineValues(CombineFn,CombineFn) method.
Original patch contributed by Stefan De Smit.
Signed-off-by: Gabriel Reid <gr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9a689d1e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9a689d1e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9a689d1e
Branch: refs/heads/master
Commit: 9a689d1ed2e89417af597aad8f52354dc42d797b
Parents: cb92723
Author: Josh Wills <jw...@apache.org>
Authored: Fri Oct 25 14:01:51 2013 -0700
Committer: Gabriel Reid <gr...@apache.org>
Committed: Fri Nov 1 13:14:22 2013 +0100
----------------------------------------------------------------------
.../java/org/apache/crunch/CombineReduceIT.java | 147 +++++++++++++++++++
.../java/org/apache/crunch/PGroupedTable.java | 22 +++
.../impl/mem/collect/MemGroupedTable.java | 11 ++
.../crunch/impl/mr/collect/DoTableImpl.java | 29 +++-
.../impl/mr/collect/PGroupedTableImpl.java | 14 +-
.../crunch/impl/mr/plan/JobPrototype.java | 2 +-
6 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java b/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java
new file mode 100644
index 0000000..a567d38
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Multiset;
+
+/**
+ * Tests for two phase (combine and reduce) CombineFns.
+ */
+public class CombineReduceIT {
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+ private String docsPath;
+
+ @Before
+ public void setUp() throws Exception {
+ docsPath = tmpDir.copyResourceFileName("docs.txt");
+ }
+
+ static class CountCombiner extends CombineFn<String,Long> {
+
+ private final int stringLengthLimit;
+
+ public CountCombiner(int stringLengthLimit) {
+ this.stringLengthLimit = stringLengthLimit;
+ }
+
+ @Override
+ public void process(Pair<String, Iterable<Long>> input, Emitter<Pair<String, Long>> emitter) {
+ String key = input.first();
+ if (key.length() <= stringLengthLimit) {
+ long sum = 0L;
+ for (Long countValue : input.second()) {
+ sum += countValue;
+ }
+ emitter.emit(Pair.of(key, sum));
+ } else {
+ for (Long countValue : input.second()) {
+ emitter.emit(Pair.of(key, countValue));
+ }
+ }
+ }
+
+ }
+
+ @Test
+ public void testCombineValues_NoCombineOrReduceOfLongWords() throws Exception {
+ Iterable<Pair<String, Long>> mrResult = run(
+ new MRPipeline(CombineReduceIT.class, tmpDir.getDefaultConfiguration()),
+ docsPath, false, false);
+ Iterable<Pair<String, Long>> memResult = run(
+ MemPipeline.getInstance(), docsPath, false, false);
+ Multiset<Pair<String, Long>> mrResultSet = ImmutableMultiset.copyOf(mrResult);
+ Multiset<Pair<String, Long>> memResultSet = ImmutableMultiset.copyOf(memResult);
+
+ assertEquals(mrResultSet, memResultSet);
+
+ // Words with more than 3 characters shouldn't be combined at all
+ assertTrue(mrResultSet.contains(Pair.of("this", 1L)));
+ assertEquals(5, mrResultSet.count(Pair.of("this", 1L)));
+ }
+
+ @Test
+ public void testCombineValues_OnlyReduceLongWords() throws Exception {
+ Iterable<Pair<String, Long>> mrResult = run(
+ new MRPipeline(CombineReduceIT.class, tmpDir.getDefaultConfiguration()),
+ docsPath, false, true);
+ Iterable<Pair<String, Long>> memResult = run(
+ MemPipeline.getInstance(), docsPath, false, true);
+
+ Multiset<Pair<String, Long>> mrResultSet = ImmutableMultiset.copyOf(mrResult);
+ Multiset<Pair<String, Long>> memResultSet = ImmutableMultiset.copyOf(memResult);
+
+ assertEquals(mrResultSet, memResultSet);
+
+ // All words should be combined, although longer words will only
+ // have been combined in the reduce phase
+ assertTrue(mrResultSet.contains(Pair.of("this", 5L)));
+ assertEquals(1, mrResultSet.count(Pair.of("this", 5L)));
+ }
+
+ @Test
+ public void testCombineValues_CombineAndReduceLongWords() throws Exception {
+ Iterable<Pair<String, Long>> mrResult = run(
+ new MRPipeline(CombineReduceIT.class, tmpDir.getDefaultConfiguration()),
+ docsPath, true, true);
+ Iterable<Pair<String, Long>> memResult = run(
+ MemPipeline.getInstance(), docsPath, true, true);
+
+ Multiset<Pair<String, Long>> mrResultSet = ImmutableMultiset.copyOf(mrResult);
+ Multiset<Pair<String, Long>> memResultSet = ImmutableMultiset.copyOf(memResult);
+
+ assertEquals(mrResultSet, memResultSet);
+
+ // All words should be combined, both in the combiner and reducer
+ assertTrue(mrResultSet.contains(Pair.of("this", 5L)));
+ assertEquals(1, mrResultSet.count(Pair.of("this", 5L)));
+ }
+
+ public static Iterable<Pair<String, Long>> run(Pipeline p, String inputPath, boolean combineLongWords, boolean reduceLongWords)
+ throws Exception {
+ return p.read(From.textFile(inputPath))
+ .parallelDo("split", new DoFn<String, Pair<String, Long>>() {
+ @Override
+ public void process(String input, Emitter<Pair<String, Long>> emitter) {
+ for (String word : input.split("\\s+")) {
+ emitter.emit(Pair.of(word, 1L));
+ }
+ }
+ }, Avros.tableOf(Avros.strings(), Avros.longs()))
+ .groupByKey()
+ .combineValues(
+ new CountCombiner(combineLongWords ? Integer.MAX_VALUE : 3),
+ new CountCombiner(reduceLongWords ? Integer.MAX_VALUE : 3))
+ .materialize();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index 68085c6..14bdb32 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -35,6 +35,17 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
* @return A {@code PTable} where each key has a single value
*/
PTable<K, V> combineValues(CombineFn<K, V> combineFn);
+
+ /**
+ * Combines and reduces the values of this grouping using the given {@code CombineFn} instances.
+ *
+ * @param combineFn
+ * The combiner function during the combine phase
+ * @param reduceFn
+ * The combiner function during the reduce phase
+ * @return A {@code PTable} where each key has a single value
+ */
+ PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn);
/**
* Combine the values in each group using the given {@link Aggregator}.
@@ -47,6 +58,17 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
PTable<K, V> combineValues(Aggregator<V> aggregator);
/**
+ * Combine and reduces the values in each group using the given {@link Aggregator} instances.
+ *
+ * @param combineAggregator The aggregator to use during the combine phase
+ * @param reduceAggregator The aggregator to use during the reduce phase
+ * @return A {@link PTable} where each group key maps to an aggregated
+ * value. Group keys may be repeated if an aggregator returns
+ * more than one value.
+ */
+ PTable<K, V> combineValues(Aggregator<V> combineAggregator, Aggregator<V> reduceAggregator);
+
+ /**
* Maps the {@code Iterable<V>} elements of each record to a new type. Just like
* any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
* called once.
http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 12c17b6..172fe36 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -110,11 +110,22 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
}
@Override
+ public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
+ //no need for special map-side combiner in memory mode
+ return combineValues(reduceFn);
+ }
+
+ @Override
public PTable<K, V> combineValues(Aggregator<V> agg) {
return combineValues(Aggregators.<K, V>toCombineFn(agg));
}
@Override
+ public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) {
+ return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg));
+ }
+
+ @Override
public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
return PTables.mapValues(this, mapFn, ptype);
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 9c8e53d..f843945 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -34,17 +34,36 @@ import com.google.common.collect.ImmutableList;
public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> {
private final PCollectionImpl<?> parent;
+ private final DoFn<?, Pair<K, V>> combineFn;
private final DoFn<?, Pair<K, V>> fn;
private final PTableType<K, V> type;
+ private static <S, K, V> DoFn<S, Pair<K, V>> asCombineFn(final DoFn<S, Pair<K, V>> fn) {
+ if (fn instanceof CombineFn) {
+ return fn;
+ }
+ return null;
+ }
+
<S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
}
-
+
<S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
- ParallelDoOptions options) {
+ ParallelDoOptions options) {
+ this(name, parent, asCombineFn(fn), fn, ntype, options);
+ }
+
+ <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
+ final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype) {
+ this(name, parent, combineFn, fn, ntype, ParallelDoOptions.builder().build());
+ }
+
+ <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
+ final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype, final ParallelDoOptions options) {
super(name, options);
this.parent = parent;
+ this.combineFn = combineFn;
this.fn = fn;
this.type = ntype;
}
@@ -87,8 +106,12 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V>
return DoNode.createFnNode(getName(), fn, type, doOptions);
}
+ public DoNode createCombineNode() {
+ return DoNode.createFnNode(getName(), combineFn, type, doOptions);
+ }
+
public boolean hasCombineFn() {
- return fn instanceof CombineFn;
+ return combineFn != null;
}
@Override
http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index e62d9c3..ab5f48c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -91,10 +91,15 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
public PType<Pair<K, Iterable<V>>> getPType() {
return ptype;
}
-
+
+ @Override
+ public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
+ return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, reduceFn, parent.getPTableType());
+ }
+
@Override
public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
- return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
+ return combineValues(combineFn, combineFn);
}
@Override
@@ -102,6 +107,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
return combineValues(Aggregators.<K, V>toCombineFn(agg));
}
+ @Override
+ public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) {
+ return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg));
+ }
+
private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
@Override
public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 0699db5..a192a22 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -188,7 +188,7 @@ class JobPrototype {
if (combineFnTable != null) {
job.setCombinerClass(CrunchCombiner.class);
DoNode combinerInputNode = group.createDoNode();
- DoNode combineNode = combineFnTable.createDoNode();
+ DoNode combineNode = combineFnTable.createCombineNode();
combineNode.addChild(group.getGroupingNode());
combinerInputNode.addChild(combineNode);
serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);