You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/09/20 21:10:17 UTC
git commit: CRUNCH-72 - Reduce memory usage of Cartesian#cross
Updated Branches:
refs/heads/master 883c565a3 -> a4cf3edf0
CRUNCH-72 - Reduce memory usage of Cartesian#cross
Use PTable#join instead of PTable#cogroup to minimize memory
usage in Cartesian#cross.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a4cf3edf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a4cf3edf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a4cf3edf
Branch: refs/heads/master
Commit: a4cf3edf0bdebb8969bf25b304fca7b13a198623
Parents: 883c565
Author: Gabriel Reid <gr...@apache.org>
Authored: Thu Sep 20 21:00:44 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Thu Sep 20 21:00:44 2012 +0200
----------------------------------------------------------------------
.../main/java/org/apache/crunch/lib/Cartesian.java | 45 ++++------
.../java/org/apache/crunch/lib/CartesianTest.java | 72 +++++++++------
2 files changed, 60 insertions(+), 57 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a4cf3edf/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
index f3fc5f5..08327dd 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
@@ -17,11 +17,11 @@
*/
package org.apache.crunch.lib;
-import java.util.Collection;
import java.util.Random;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
@@ -132,26 +132,22 @@ public class Cartesian {
PTable<Pair<Integer, Integer>, Pair<K2, V>> rightCross = right.parallelDo(new GFCross<Pair<K2, V>>(1, parallelism),
rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), rtf.pairs(right.getKeyType(), right.getValueType())));
- PTable<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> cg = leftCross
- .cogroup(rightCross);
+ PTable<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> cg = leftCross.join(rightCross);
PTypeFamily ctf = cg.getTypeFamily();
- return cg
- .parallelDo(
- new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() {
- @Override
- public void process(
- Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> input,
- Emitter<Pair<Pair<K1, K2>, Pair<U, V>>> emitter) {
- for (Pair<K1, U> l : input.second().first()) {
- for (Pair<K2, V> r : input.second().second()) {
- emitter.emit(Pair.of(Pair.of(l.first(), r.first()), Pair.of(l.second(), r.second())));
- }
- }
- }
- }, ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()),
- ctf.pairs(left.getValueType(), right.getValueType())));
+ return cg.parallelDo(
+ new MapFn<Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() {
+
+ @Override
+ public Pair<Pair<K1, K2>, Pair<U, V>> map(Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> input) {
+ Pair<Pair<K1, U>, Pair<K2, V>> valuePair = input.second();
+ return Pair.of(Pair.of(valuePair.first().first(), valuePair.second().first()),
+ Pair.of(valuePair.first().second(), valuePair.second().second()));
+ }
+ },
+ ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()),
+ ctf.pairs(left.getValueType(), right.getValueType())));
}
/**
@@ -205,19 +201,14 @@ public class Cartesian {
PTable<Pair<Integer, Integer>, V> rightCross = right.parallelDo(new GFCross<V>(1, parallelism),
rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), right.getPType()));
- PTable<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> cg = leftCross.cogroup(rightCross);
+ PTable<Pair<Integer, Integer>, Pair<U, V>> cg = leftCross.join(rightCross);
PTypeFamily ctf = cg.getTypeFamily();
- return cg.parallelDo(new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>>, Pair<U, V>>() {
+ return cg.parallelDo(new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>, Pair<U, V>>() {
@Override
- public void process(Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> input,
- Emitter<Pair<U, V>> emitter) {
- for (U l : input.second().first()) {
- for (V r : input.second().second()) {
- emitter.emit(Pair.of(l, r));
- }
- }
+ public Pair<U, V> map(Pair<Pair<Integer, Integer>, Pair<U, V>> input) {
+ return input.second();
}
}, ctf.pairs(left.getPType(), right.getPType()));
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a4cf3edf/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
index eba7429..b19097c 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
@@ -18,48 +18,60 @@
package org.apache.crunch.lib;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.List;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.impl.mem.MemPipeline;
import org.apache.crunch.types.writable.Writables;
import org.junit.Test;
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
public class CartesianTest {
@Test
- public void testCartesianCollection() {
- ImmutableList<ImmutableList<Integer>> testCases = ImmutableList.of(ImmutableList.of(1, 2, 3, 4, 5),
- ImmutableList.<Integer> of(1, 2, 3), ImmutableList.<Integer> of());
-
- for (int t1 = 0; t1 < testCases.size(); t1++) {
- ImmutableList<Integer> testCase1 = testCases.get(t1);
- for (int t2 = t1; t2 < testCases.size(); t2++) {
- ImmutableList<Integer> testCase2 = testCases.get(t2);
-
- PCollection<Integer> X = MemPipeline.typedCollectionOf(Writables.ints(), testCase1);
- PCollection<Integer> Y = MemPipeline.typedCollectionOf(Writables.ints(), testCase2);
-
- PCollection<Pair<Integer, Integer>> cross = Cartesian.cross(X, Y);
- HashSet<Pair<Integer, Integer>> crossSet = new HashSet<Pair<Integer, Integer>>();
- for (Iterator<Pair<Integer, Integer>> i = cross.materialize().iterator(); i.hasNext();) {
- crossSet.add(i.next());
- }
- assertEquals(crossSet.size(), testCase1.size() * testCase2.size());
-
- for (int i = 0; i < testCase1.size(); i++) {
- for (int j = 0; j < testCase2.size(); j++) {
- assertTrue(crossSet.contains(Pair.of(testCase1.get(i), testCase2.get(j))));
- }
- }
- }
- }
+ public void testCartesianCollection_SingleValues() {
+
+ PCollection<String> letters = MemPipeline.typedCollectionOf(Writables.strings(), "a", "b");
+ PCollection<Integer> ints = MemPipeline.typedCollectionOf(Writables.ints(), 1, 2);
+
+ PCollection<Pair<String, Integer>> cartesianProduct = Cartesian.cross(letters, ints);
+
+ @SuppressWarnings("unchecked")
+ List<Pair<String, Integer>> expectedResults = Lists.newArrayList(Pair.of("a", 1), Pair.of("a", 2), Pair.of("b", 1),
+ Pair.of("b", 2));
+ List<Pair<String, Integer>> actualResults = Lists.newArrayList(cartesianProduct.materialize());
+ Collections.sort(actualResults);
+
+ assertEquals(expectedResults, actualResults);
+ }
+
+ @Test
+ public void testCartesianCollection_Tables() {
+
+ PTable<String, Integer> leftTable = MemPipeline.typedTableOf(
+ Writables.tableOf(Writables.strings(), Writables.ints()), "a", 1, "b", 2);
+ PTable<String, Float> rightTable = MemPipeline.typedTableOf(
+ Writables.tableOf(Writables.strings(), Writables.floats()), "A", 1.0f, "B", 2.0f);
+
+ PTable<Pair<String, String>, Pair<Integer, Float>> cartesianProduct = Cartesian.cross(leftTable, rightTable);
+
+ List<Pair<Pair<String, String>, Pair<Integer, Float>>> expectedResults = Lists.newArrayList();
+ expectedResults.add(Pair.of(Pair.of("a", "A"), Pair.of(1, 1.0f)));
+ expectedResults.add(Pair.of(Pair.of("a", "B"), Pair.of(1, 2.0f)));
+ expectedResults.add(Pair.of(Pair.of("b", "A"), Pair.of(2, 1.0f)));
+ expectedResults.add(Pair.of(Pair.of("b", "B"), Pair.of(2, 2.0f)));
+
+ List<Pair<Pair<String, String>, Pair<Integer, Float>>> actualResults = Lists.newArrayList(cartesianProduct
+ .materialize());
+ Collections.sort(actualResults);
+
+ assertEquals(expectedResults, actualResults);
+
}
}