You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/05/26 21:13:59 UTC
git commit: CRUNCH-210: Remove deprecated MapValuesFn references from
cogroup and add support for user-specified parallelism for cogroup jobs
Updated Branches:
refs/heads/master 125f1b4e1 -> ceaa6a5e0
CRUNCH-210: Remove deprecated MapValuesFn references from cogroup and add support for user-specified parallelism for cogroup jobs
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ceaa6a5e
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ceaa6a5e
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ceaa6a5e
Branch: refs/heads/master
Commit: ceaa6a5e0ab4c1d2b23e55c0d9a7cc0b63a41000
Parents: 125f1b4
Author: Josh Wills <jw...@apache.org>
Authored: Sat May 25 17:05:31 2013 -0700
Committer: Josh Wills <jw...@apache.org>
Committed: Sun May 26 12:08:52 2013 -0700
----------------------------------------------------------------------
.../main/java/org/apache/crunch/lib/Cogroup.java | 62 ++++++++++-----
1 files changed, 41 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ceaa6a5e/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
index 07d873c..3bf3e4d 100644
--- a/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch-core/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -19,11 +19,10 @@ package org.apache.crunch.lib;
import java.util.Collection;
-import org.apache.crunch.DoFn;
-import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
-import org.apache.crunch.fn.MapValuesFn;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
@@ -34,43 +33,65 @@ public class Cogroup {
/**
* Co-groups the two {@link PTable} arguments.
*
- * @return a {@code PTable} representing the co-grouped tables.
+ * @param left The left (smaller) PTable
+ * @param right The right (larger) PTable
+ * @return a {@code PTable} representing the co-grouped tables
*/
public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(PTable<K, U> left, PTable<K, V> right) {
+ return cogroup(left, right, 0);
+ }
+
+ /**
+ * Co-groups the two {@link PTable} arguments with a user-specified degree of parallelism (a.k.a, number of
+ * reducers.)
+ *
+ * @param left The left (smaller) PTable
+ * @param right The right (larger) PTable
+ * @param numReducers The number of reducers to use
+ * @return A new {@code PTable} representing the co-grouped tables
+ */
+ public static <K, U, V> PTable<K, Pair<Collection<U>, Collection<V>>> cogroup(
+ PTable<K, U> left,
+ PTable<K, V> right,
+ int numReducers) {
PTypeFamily ptf = left.getTypeFamily();
- PType<K> keyType = left.getPTableType().getKeyType();
PType<U> leftType = left.getPTableType().getValueType();
PType<V> rightType = right.getPTableType().getValueType();
PType<Pair<U, V>> itype = ptf.pairs(leftType, rightType);
- PTable<K, Pair<U, V>> cgLeft = left.parallelDo("coGroupTag1", new CogroupFn1<K, U, V>(),
- ptf.tableOf(keyType, itype));
- PTable<K, Pair<U, V>> cgRight = right.parallelDo("coGroupTag2", new CogroupFn2<K, U, V>(),
- ptf.tableOf(keyType, itype));
+ PTable<K, Pair<U, V>> cgLeft = left.mapValues("coGroupTag1", new CogroupFn1<U, V>(),
+ itype);
+ PTable<K, Pair<U, V>> cgRight = right.mapValues("coGroupTag2", new CogroupFn2<U, V>(),
+ itype);
+ PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType),
+ ptf.collections(rightType));
PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
-
- PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), ptf.collections(rightType));
- return both.groupByKey().parallelDo("cogroup",
- new PostGroupFn<K, U, V>(leftType, rightType), ptf.tableOf(keyType, otype));
+ PGroupedTable<K, Pair<U, V>> grouped = null;
+ if (numReducers > 0) {
+ grouped = both.groupByKey(numReducers);
+ } else {
+ grouped = both.groupByKey();
+ }
+ return grouped.mapValues("cogroup", new PostGroupFn<U, V>(leftType, rightType), otype);
}
- private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
+ private static class CogroupFn1<V, U> extends MapFn<V, Pair<V, U>> {
@Override
public Pair<V, U> map(V v) {
return Pair.of(v, null);
}
}
- private static class CogroupFn2<K, V, U> extends MapValuesFn<K, U, Pair<V, U>> {
+ private static class CogroupFn2<V, U> extends MapFn<U, Pair<V, U>> {
@Override
public Pair<V, U> map(U u) {
return Pair.of(null, u);
}
}
- private static class PostGroupFn<K, V, U> extends
- DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+ private static class PostGroupFn<V, U> extends
+ MapFn<Iterable<Pair<V, U>>, Pair<Collection<V>, Collection<U>>> {
private PType<V> ptypeV;
private PType<U> ptypeU;
@@ -88,18 +109,17 @@ public class Cogroup {
}
@Override
- public void process(Pair<K, Iterable<Pair<V, U>>> input,
- Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {
+ public Pair<Collection<V>, Collection<U>> map(Iterable<Pair<V, U>> input) {
Collection<V> cv = Lists.newArrayList();
Collection<U> cu = Lists.newArrayList();
- for (Pair<V, U> pair : input.second()) {
+ for (Pair<V, U> pair : input) {
if (pair.first() != null) {
cv.add(ptypeV.getDetachedValue(pair.first()));
} else if (pair.second() != null) {
cu.add(ptypeU.getDetachedValue(pair.second()));
}
}
- emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));
+ return Pair.of(cv, cu);
}
}