You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/11/01 13:52:05 UTC

git commit: CRUNCH-286 Allow distinct Combiner to be supplied

Updated Branches:
  refs/heads/master cb92723af -> 9a689d1ed


CRUNCH-286 Allow distinct Combiner to be supplied

Allow setting a distinct Combiner implementation that is different
than the Reducer implementation, via the addition of a new
combineValues(CombineFn,CombineFn) method.

Original patch contributed by Stefan De Smit.

Signed-off-by: Gabriel Reid <gr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/9a689d1e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/9a689d1e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/9a689d1e

Branch: refs/heads/master
Commit: 9a689d1ed2e89417af597aad8f52354dc42d797b
Parents: cb92723
Author: Josh Wills <jw...@apache.org>
Authored: Fri Oct 25 14:01:51 2013 -0700
Committer: Gabriel Reid <gr...@apache.org>
Committed: Fri Nov 1 13:14:22 2013 +0100

----------------------------------------------------------------------
 .../java/org/apache/crunch/CombineReduceIT.java | 147 +++++++++++++++++++
 .../java/org/apache/crunch/PGroupedTable.java   |  22 +++
 .../impl/mem/collect/MemGroupedTable.java       |  11 ++
 .../crunch/impl/mr/collect/DoTableImpl.java     |  29 +++-
 .../impl/mr/collect/PGroupedTableImpl.java      |  14 +-
 .../crunch/impl/mr/plan/JobPrototype.java       |   2 +-
 6 files changed, 219 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java b/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java
new file mode 100644
index 0000000..a567d38
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/CombineReduceIT.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMultiset;
+import com.google.common.collect.Multiset;
+
+/**
+ * Tests for two phase (combine and reduce) CombineFns.
+ */
+public class CombineReduceIT {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+  private String docsPath;
+
+  @Before
+  public void setUp() throws Exception {
+    docsPath = tmpDir.copyResourceFileName("docs.txt");
+  }
+
+  static class CountCombiner extends CombineFn<String,Long> {
+
+    private final int stringLengthLimit;
+
+    public CountCombiner(int stringLengthLimit) {
+      this.stringLengthLimit = stringLengthLimit;
+    }
+
+    @Override
+    public void process(Pair<String, Iterable<Long>> input, Emitter<Pair<String, Long>> emitter) {
+      String key = input.first();
+      if (key.length() <= stringLengthLimit) {
+        long sum = 0L;
+        for (Long countValue : input.second()) {
+          sum += countValue;
+        }
+          emitter.emit(Pair.of(key, sum));
+      } else {
+        for (Long countValue : input.second()) {
+          emitter.emit(Pair.of(key, countValue));
+        }
+      }
+    }
+
+  }
+
+  @Test
+  public void testCombineValues_NoCombineOrReduceOfLongWords() throws Exception {
+    Iterable<Pair<String, Long>> mrResult = run(
+        new MRPipeline(CombineReduceIT.class, tmpDir.getDefaultConfiguration()),
+        docsPath, false, false);
+    Iterable<Pair<String, Long>> memResult = run(
+        MemPipeline.getInstance(), docsPath, false, false);
+    Multiset<Pair<String, Long>> mrResultSet = ImmutableMultiset.copyOf(mrResult);
+    Multiset<Pair<String, Long>> memResultSet = ImmutableMultiset.copyOf(memResult);
+
+    assertEquals(mrResultSet, memResultSet);
+
+    // Words with more than 3 characters shouldn't be combined at all
+    assertTrue(mrResultSet.contains(Pair.of("this", 1L)));
+    assertEquals(5, mrResultSet.count(Pair.of("this", 1L)));
+  }
+
+  @Test
+  public void testCombineValues_OnlyReduceLongWords() throws Exception {
+    Iterable<Pair<String, Long>> mrResult = run(
+        new MRPipeline(CombineReduceIT.class, tmpDir.getDefaultConfiguration()),
+        docsPath, false, true);
+    Iterable<Pair<String, Long>> memResult = run(
+        MemPipeline.getInstance(), docsPath, false, true);
+
+    Multiset<Pair<String, Long>> mrResultSet = ImmutableMultiset.copyOf(mrResult);
+    Multiset<Pair<String, Long>> memResultSet = ImmutableMultiset.copyOf(memResult);
+
+    assertEquals(mrResultSet, memResultSet);
+
+    // All words should be combined, although longer words will only
+    // have been combined in the reduce phase
+    assertTrue(mrResultSet.contains(Pair.of("this", 5L)));
+    assertEquals(1, mrResultSet.count(Pair.of("this", 5L)));
+  }
+
+  @Test
+  public void testCombineValues_CombineAndReduceLongWords() throws Exception {
+    Iterable<Pair<String, Long>> mrResult = run(
+        new MRPipeline(CombineReduceIT.class, tmpDir.getDefaultConfiguration()),
+        docsPath, true, true);
+    Iterable<Pair<String, Long>> memResult = run(
+        MemPipeline.getInstance(), docsPath, true, true);
+
+    Multiset<Pair<String, Long>> mrResultSet = ImmutableMultiset.copyOf(mrResult);
+    Multiset<Pair<String, Long>> memResultSet = ImmutableMultiset.copyOf(memResult);
+
+    assertEquals(mrResultSet, memResultSet);
+
+    // All words should be combined, both in the combiner and reducer
+    assertTrue(mrResultSet.contains(Pair.of("this", 5L)));
+    assertEquals(1, mrResultSet.count(Pair.of("this", 5L)));
+  }
+
+  public static Iterable<Pair<String, Long>> run(Pipeline p, String inputPath, boolean combineLongWords, boolean reduceLongWords)
+      throws Exception {
+    return p.read(From.textFile(inputPath))
+        .parallelDo("split", new DoFn<String, Pair<String, Long>>() {
+          @Override
+          public void process(String input, Emitter<Pair<String, Long>> emitter) {
+            for (String word : input.split("\\s+")) {
+              emitter.emit(Pair.of(word, 1L));
+            }
+          }
+        }, Avros.tableOf(Avros.strings(), Avros.longs()))
+        .groupByKey()
+        .combineValues(
+            new CountCombiner(combineLongWords ? Integer.MAX_VALUE : 3),
+            new CountCombiner(reduceLongWords ? Integer.MAX_VALUE : 3))
+        .materialize();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
index 68085c6..14bdb32 100644
--- a/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/PGroupedTable.java
@@ -35,6 +35,17 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
    * @return A {@code PTable} where each key has a single value
    */
   PTable<K, V> combineValues(CombineFn<K, V> combineFn);
+  
+  /**
+   * Combines and reduces the values of this grouping using the given {@code CombineFn} instances.
+   * 
+   * @param combineFn
+   *          The combiner function during the combine phase
+   * @param reduceFn
+   *          The combiner function during the reduce phase
+   * @return A {@code PTable} where each key has a single value
+   */
+  PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn);
 
   /**
    * Combine the values in each group using the given {@link Aggregator}.
@@ -47,6 +58,17 @@ public interface PGroupedTable<K, V> extends PCollection<Pair<K, Iterable<V>>> {
   PTable<K, V> combineValues(Aggregator<V> aggregator);
 
   /**
+   * Combine and reduces the values in each group using the given {@link Aggregator} instances.
+   *
+   * @param combineAggregator The aggregator to use during the combine phase
+   * @param reduceAggregator The aggregator to use during the reduce phase
+   * @return A {@link PTable} where each group key maps to an aggregated
+   *         value. Group keys may be repeated if an aggregator returns
+   *         more than one value.
+   */
+  PTable<K, V> combineValues(Aggregator<V> combineAggregator, Aggregator<V> reduceAggregator);
+
+  /**
    * Maps the {@code Iterable<V>} elements of each record to a new type. Just like
    * any {@code parallelDo} operation on a {@code PGroupedTable}, this may only be
    * called once.

http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
index 12c17b6..172fe36 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemGroupedTable.java
@@ -110,11 +110,22 @@ class MemGroupedTable<K, V> extends MemCollection<Pair<K, Iterable<V>>> implemen
   }
 
   @Override
+  public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
+    //no need for special map-side combiner in memory mode
+    return combineValues(reduceFn);
+  }
+
+  @Override
   public PTable<K, V> combineValues(Aggregator<V> agg) {
     return combineValues(Aggregators.<K, V>toCombineFn(agg));
   }
 
   @Override
+  public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg));
+  }
+
+  @Override
   public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> ptype) {
     return PTables.mapValues(this, mapFn, ptype);
   }

http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
index 9c8e53d..f843945 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/DoTableImpl.java
@@ -34,17 +34,36 @@ import com.google.common.collect.ImmutableList;
 public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V> {
 
   private final PCollectionImpl<?> parent;
+  private final DoFn<?, Pair<K, V>> combineFn;
   private final DoFn<?, Pair<K, V>> fn;
   private final PTableType<K, V> type;
 
+  private static <S, K, V> DoFn<S, Pair<K, V>> asCombineFn(final DoFn<S, Pair<K, V>> fn) {
+    if (fn instanceof CombineFn) {
+      return fn;
+    }
+    return null;
+  }
+
   <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype) {
     this(name, parent, fn, ntype, ParallelDoOptions.builder().build());
   }
-  
+
   <S> DoTableImpl(String name, PCollectionImpl<S> parent, DoFn<S, Pair<K, V>> fn, PTableType<K, V> ntype,
-      ParallelDoOptions options) {
+                  ParallelDoOptions options) {
+    this(name, parent, asCombineFn(fn), fn, ntype, options);
+  }
+
+  <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
+                  final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype) {
+    this(name, parent, combineFn, fn, ntype, ParallelDoOptions.builder().build());
+  }
+
+  <S> DoTableImpl(final String name, final PCollectionImpl<S> parent, final DoFn<S, Pair<K, V>> combineFn,
+                  final DoFn<S, Pair<K, V>> fn, final PTableType<K, V> ntype, final ParallelDoOptions options) {
     super(name, options);
     this.parent = parent;
+    this.combineFn = combineFn;
     this.fn = fn;
     this.type = ntype;
   }
@@ -87,8 +106,12 @@ public class DoTableImpl<K, V> extends PTableBase<K, V> implements PTable<K, V>
     return DoNode.createFnNode(getName(), fn, type, doOptions);
   }
 
+  public DoNode createCombineNode() {
+    return DoNode.createFnNode(getName(), combineFn, type, doOptions);
+  }
+  
   public boolean hasCombineFn() {
-    return fn instanceof CombineFn;
+    return combineFn != null;
   }
   
   @Override

http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index e62d9c3..ab5f48c 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -91,10 +91,15 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   public PType<Pair<K, Iterable<V>>> getPType() {
     return ptype;
   }
-
+  
+  @Override
+  public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> reduceFn) {
+      return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, reduceFn, parent.getPTableType());
+  }
+  
   @Override
   public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
-    return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
+    return combineValues(combineFn, combineFn);
   }
 
   @Override
@@ -102,6 +107,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
     return combineValues(Aggregators.<K, V>toCombineFn(agg));
   }
 
+  @Override
+  public PTable<K, V> combineValues(Aggregator<V> combineAgg, Aggregator<V> reduceAgg) {
+    return combineValues(Aggregators.<K, V>toCombineFn(combineAgg), Aggregators.<K, V>toCombineFn(reduceAgg));
+  }
+
   private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
     @Override
     public void process(Pair<K, Iterable<V>> input, Emitter<Pair<K, V>> emitter) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/9a689d1e/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
index 0699db5..a192a22 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java
@@ -188,7 +188,7 @@ class JobPrototype {
       if (combineFnTable != null) {
         job.setCombinerClass(CrunchCombiner.class);
         DoNode combinerInputNode = group.createDoNode();
-        DoNode combineNode = combineFnTable.createDoNode();
+        DoNode combineNode = combineFnTable.createCombineNode();
         combineNode.addChild(group.getGroupingNode());
         combinerInputNode.addChild(combineNode);
         serialize(ImmutableList.of(combinerInputNode), conf, workingPath, NodeContext.COMBINE);