You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/09/20 21:10:17 UTC

git commit: CRUNCH-72 - Reduce memory usage of Cartesian#cross

Updated Branches:
  refs/heads/master 883c565a3 -> a4cf3edf0


CRUNCH-72 - Reduce memory usage of Cartesian#cross

Use PTable#join instead of PTable#cogroup to minimize memory
usage in Cartesian#cross.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/a4cf3edf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/a4cf3edf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/a4cf3edf

Branch: refs/heads/master
Commit: a4cf3edf0bdebb8969bf25b304fca7b13a198623
Parents: 883c565
Author: Gabriel Reid <gr...@apache.org>
Authored: Thu Sep 20 21:00:44 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Thu Sep 20 21:00:44 2012 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/lib/Cartesian.java |   45 ++++------
 .../java/org/apache/crunch/lib/CartesianTest.java  |   72 +++++++++------
 2 files changed, 60 insertions(+), 57 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a4cf3edf/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
index f3fc5f5..08327dd 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Cartesian.java
@@ -17,11 +17,11 @@
  */
 package org.apache.crunch.lib;
 
-import java.util.Collection;
 import java.util.Random;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
@@ -132,26 +132,22 @@ public class Cartesian {
     PTable<Pair<Integer, Integer>, Pair<K2, V>> rightCross = right.parallelDo(new GFCross<Pair<K2, V>>(1, parallelism),
         rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), rtf.pairs(right.getKeyType(), right.getValueType())));
 
-    PTable<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> cg = leftCross
-        .cogroup(rightCross);
+    PTable<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> cg = leftCross.join(rightCross);
 
     PTypeFamily ctf = cg.getTypeFamily();
 
-    return cg
-        .parallelDo(
-            new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() {
-              @Override
-              public void process(
-                  Pair<Pair<Integer, Integer>, Pair<Collection<Pair<K1, U>>, Collection<Pair<K2, V>>>> input,
-                  Emitter<Pair<Pair<K1, K2>, Pair<U, V>>> emitter) {
-                for (Pair<K1, U> l : input.second().first()) {
-                  for (Pair<K2, V> r : input.second().second()) {
-                    emitter.emit(Pair.of(Pair.of(l.first(), r.first()), Pair.of(l.second(), r.second())));
-                  }
-                }
-              }
-            }, ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()),
-                ctf.pairs(left.getValueType(), right.getValueType())));
+    return cg.parallelDo(
+        new MapFn<Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>>, Pair<Pair<K1, K2>, Pair<U, V>>>() {
+
+          @Override
+          public Pair<Pair<K1, K2>, Pair<U, V>> map(Pair<Pair<Integer, Integer>, Pair<Pair<K1, U>, Pair<K2, V>>> input) {
+            Pair<Pair<K1, U>, Pair<K2, V>> valuePair = input.second();
+            return Pair.of(Pair.of(valuePair.first().first(), valuePair.second().first()),
+                Pair.of(valuePair.first().second(), valuePair.second().second()));
+          }
+        },
+        ctf.tableOf(ctf.pairs(left.getKeyType(), right.getKeyType()),
+            ctf.pairs(left.getValueType(), right.getValueType())));
   }
 
   /**
@@ -205,19 +201,14 @@ public class Cartesian {
     PTable<Pair<Integer, Integer>, V> rightCross = right.parallelDo(new GFCross<V>(1, parallelism),
         rtf.tableOf(rtf.pairs(rtf.ints(), rtf.ints()), right.getPType()));
 
-    PTable<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> cg = leftCross.cogroup(rightCross);
+    PTable<Pair<Integer, Integer>, Pair<U, V>> cg = leftCross.join(rightCross);
 
     PTypeFamily ctf = cg.getTypeFamily();
 
-    return cg.parallelDo(new DoFn<Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>>, Pair<U, V>>() {
+    return cg.parallelDo(new MapFn<Pair<Pair<Integer, Integer>, Pair<U, V>>, Pair<U, V>>() {
       @Override
-      public void process(Pair<Pair<Integer, Integer>, Pair<Collection<U>, Collection<V>>> input,
-          Emitter<Pair<U, V>> emitter) {
-        for (U l : input.second().first()) {
-          for (V r : input.second().second()) {
-            emitter.emit(Pair.of(l, r));
-          }
-        }
+      public Pair<U, V> map(Pair<Pair<Integer, Integer>, Pair<U, V>> input) {
+        return input.second();
       }
     }, ctf.pairs(left.getPType(), right.getPType()));
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/a4cf3edf/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
index eba7429..b19097c 100644
--- a/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
+++ b/crunch/src/test/java/org/apache/crunch/lib/CartesianTest.java
@@ -18,48 +18,60 @@
 package org.apache.crunch.lib;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 
-import java.util.HashSet;
-import java.util.Iterator;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.impl.mem.MemPipeline;
 import org.apache.crunch.types.writable.Writables;
 import org.junit.Test;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 
 public class CartesianTest {
 
   @Test
-  public void testCartesianCollection() {
-    ImmutableList<ImmutableList<Integer>> testCases = ImmutableList.of(ImmutableList.of(1, 2, 3, 4, 5),
-        ImmutableList.<Integer> of(1, 2, 3), ImmutableList.<Integer> of());
-
-    for (int t1 = 0; t1 < testCases.size(); t1++) {
-      ImmutableList<Integer> testCase1 = testCases.get(t1);
-      for (int t2 = t1; t2 < testCases.size(); t2++) {
-        ImmutableList<Integer> testCase2 = testCases.get(t2);
-
-        PCollection<Integer> X = MemPipeline.typedCollectionOf(Writables.ints(), testCase1);
-        PCollection<Integer> Y = MemPipeline.typedCollectionOf(Writables.ints(), testCase2);
-
-        PCollection<Pair<Integer, Integer>> cross = Cartesian.cross(X, Y);
-        HashSet<Pair<Integer, Integer>> crossSet = new HashSet<Pair<Integer, Integer>>();
-        for (Iterator<Pair<Integer, Integer>> i = cross.materialize().iterator(); i.hasNext();) {
-          crossSet.add(i.next());
-        }
-        assertEquals(crossSet.size(), testCase1.size() * testCase2.size());
-
-        for (int i = 0; i < testCase1.size(); i++) {
-          for (int j = 0; j < testCase2.size(); j++) {
-            assertTrue(crossSet.contains(Pair.of(testCase1.get(i), testCase2.get(j))));
-          }
-        }
-      }
-    }
+  public void testCartesianCollection_SingleValues() {
+
+    PCollection<String> letters = MemPipeline.typedCollectionOf(Writables.strings(), "a", "b");
+    PCollection<Integer> ints = MemPipeline.typedCollectionOf(Writables.ints(), 1, 2);
+
+    PCollection<Pair<String, Integer>> cartesianProduct = Cartesian.cross(letters, ints);
+
+    @SuppressWarnings("unchecked")
+    List<Pair<String, Integer>> expectedResults = Lists.newArrayList(Pair.of("a", 1), Pair.of("a", 2), Pair.of("b", 1),
+        Pair.of("b", 2));
+    List<Pair<String, Integer>> actualResults = Lists.newArrayList(cartesianProduct.materialize());
+    Collections.sort(actualResults);
+
+    assertEquals(expectedResults, actualResults);
+  }
+
+  @Test
+  public void testCartesianCollection_Tables() {
+
+    PTable<String, Integer> leftTable = MemPipeline.typedTableOf(
+        Writables.tableOf(Writables.strings(), Writables.ints()), "a", 1, "b", 2);
+    PTable<String, Float> rightTable = MemPipeline.typedTableOf(
+        Writables.tableOf(Writables.strings(), Writables.floats()), "A", 1.0f, "B", 2.0f);
+
+    PTable<Pair<String, String>, Pair<Integer, Float>> cartesianProduct = Cartesian.cross(leftTable, rightTable);
+
+    List<Pair<Pair<String, String>, Pair<Integer, Float>>> expectedResults = Lists.newArrayList();
+    expectedResults.add(Pair.of(Pair.of("a", "A"), Pair.of(1, 1.0f)));
+    expectedResults.add(Pair.of(Pair.of("a", "B"), Pair.of(1, 2.0f)));
+    expectedResults.add(Pair.of(Pair.of("b", "A"), Pair.of(2, 1.0f)));
+    expectedResults.add(Pair.of(Pair.of("b", "B"), Pair.of(2, 2.0f)));
+
+    List<Pair<Pair<String, String>, Pair<Integer, Float>>> actualResults = Lists.newArrayList(cartesianProduct
+        .materialize());
+    Collections.sort(actualResults);
+
+    assertEquals(expectedResults, actualResults);
+
   }
 
 }