You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/05/26 21:13:59 UTC

git commit: CRUNCH-210: Remove deprecated MapValuesFn references from cogroup and add support for user-specified parallelism for cogroup jobs

Updated Branches:
  refs/heads/master 125f1b4e1 -> ceaa6a5e0


CRUNCH-210: Remove deprecated MapValuesFn references from cogroup and add support for user-specified parallelism for cogroup jobs


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ceaa6a5e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ceaa6a5e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ceaa6a5e

Branch: refs/heads/master
Commit: ceaa6a5e0ab4c1d2b23e55c0d9a7cc0b63a41000
Parents: 125f1b4
Author: Josh Wills <jw...@apache.org>
Authored: Sat May 25 17:05:31 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun May 26 12:08:52 2013 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Cogroup.java   |   62 ++++++++++-----
 1 files changed, 41 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ceaa6a5e/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
index 07d873c..3bf3e4d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -19,11 +19,10 @@ package org.apache.crunch.lib;
 
 import java.util.Collection;
 
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
-import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.PTypeFamily;
 
@@ -34,43 +33,65 @@ public class Cogroup {
   /**
    * Co-groups the two {@link PTable} arguments.
    * 
-   * @return a {@code PTable} representing the co-grouped tables.
+   * @param left The left (smaller) PTable
+   * @param right The right (larger) PTable
+   * @return a {@code PTable} representing the co-grouped tables
    */
   public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(PTable<K, U> left, PTable<K, V> right) {
+    return cogroup(left, right, 0);
+  }
+  
+  /**
+   * Co-groups the two {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of
+   * reducers.)
+   * 
+   * @param left The left (smaller) PTable
+   * @param right The right (larger) PTable
+   * @param numReducers The number of reducers to use
+   * @return A new {@code PTable} representing the co-grouped tables
+   */
+  public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(
+      PTable<K, U> left,
+      PTable<K, V> right,
+      int numReducers) {
     PTypeFamily ptf = left.getTypeFamily();
-    PType<K> keyType = left.getPTableType().getKeyType();
     PType<U> leftType = left.getPTableType().getValueType();
     PType<V> rightType = right.getPTableType().getValueType();
     PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType);
 
-    PTable<K, Pair<U, V>> cgLeft = left.parallelDo("coGroupTag1", new CogroupFn1<K, U, V>(),
-        ptf.tableOf(keyType, itype));
-    PTable<K, Pair<U, V>> cgRight = right.parallelDo("coGroupTag2", new CogroupFn2<K, U, V>(),
-        ptf.tableOf(keyType, itype));
+    PTable<K, Pair<U, V>> cgLeft = left.mapValues("coGroupTag1", new CogroupFn1<U, V>(),
+        itype);
+    PTable<K, Pair<U, V>> cgRight = right.mapValues("coGroupTag2", new CogroupFn2<U, V>(),
+        itype);
 
+    PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType),
+        ptf.collections(rightType));
     PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
-
-    PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), ptf.collections(rightType));
-    return both.groupByKey().parallelDo("cogroup", 
-        new PostGroupFn<K, U, V>(leftType, rightType), ptf.tableOf(keyType, otype));
+    PGroupedTable<K, Pair<U, V>> grouped = null;
+    if (numReducers > 0) {
+      grouped = both.groupByKey(numReducers);
+    } else {
+      grouped = both.groupByKey();
+    }
+    return grouped.mapValues("cogroup", new PostGroupFn<U, V>(leftType, rightType), otype);
   }
 
-  private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
+  private static class CogroupFn1<V, U> extends MapFn<V, Pair<V, U>> {
     @Override
     public Pair<V, U> map(V v) {
       return Pair.of(v, null);
     }
   }
 
-  private static class CogroupFn2<K, V, U> extends MapValuesFn<K, U, Pair<V, U>> {
+  private static class CogroupFn2<V, U> extends MapFn<U, Pair<V, U>> {
     @Override
     public Pair<V, U> map(U u) {
       return Pair.of(null, u);
     }
   }
 
-  private static class PostGroupFn<K, V, U> extends
-      DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+  private static class PostGroupFn<V, U> extends
+      MapFn<Iterable<Pair<V, U>>, Pair<Collection<V>, Collection<U>>> {
     
     private PType<V> ptypeV;
     private PType<U> ptypeU;
@@ -88,18 +109,17 @@ public class Cogroup {
     }
     
     @Override
-    public void process(Pair<K, Iterable<Pair<V, U>>> input,
-        Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {
+    public Pair<Collection<V>, Collection<U>> map(Iterable<Pair<V, U>> input) {
       Collection<V> cv = Lists.newArrayList();
       Collection<U> cu = Lists.newArrayList();
-      for (Pair<V, U> pair : input.second()) {
+      for (Pair<V, U> pair : input) {
         if (pair.first() != null) {
           cv.add(ptypeV.getDetachedValue(pair.first()));
         } else if (pair.second() != null) {
           cu.add(ptypeU.getDetachedValue(pair.second()));
         }
       }
-      emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));
+      return Pair.of(cv, cu);
     }
   }