You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/04/02 21:53:56 UTC

git commit: CRUNCH-190 Deep-copy retained values in CoGroup

Updated Branches:
  refs/heads/master 154c8fbd3 -> 64497fa4f


CRUNCH-190 Deep-copy retained values in CoGroup

Use the PType#getDetachedValue functionality to deep copy
values where necessary before retaining them in the reduce
portion of the CoGroup library.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/64497fa4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/64497fa4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/64497fa4

Branch: refs/heads/master
Commit: 64497fa4f40827d740d341d5513bafff6ed20a9d
Parents: 154c8fb
Author: Gabriel Reid <gr...@apache.org>
Authored: Tue Apr 2 21:48:11 2013 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Tue Apr 2 21:48:11 2013 +0200

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/lib/CogroupIT.java   |   47 +++++++++++++++
 .../main/java/org/apache/crunch/lib/Cogroup.java   |   23 ++++++-
 2 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/64497fa4/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 99950a4..af3329f 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -17,16 +17,19 @@
  */
 package org.apache.crunch.lib;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.crunch.DoFn;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
@@ -36,16 +39,21 @@ import org.apache.crunch.fn.MapKeysFn;
 import org.apache.crunch.fn.MapValuesFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.From;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.StringWrapper.StringToStringWrapperMapFn;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PTypeFamily;
 import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
 import org.apache.crunch.types.writable.WritableTypeFamily;
 import org.junit.Rule;
 import org.junit.Test;
 
 import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class CogroupIT {
@@ -123,4 +131,43 @@ public class CogroupIT {
     }
     assertTrue(passed);
   }
+  
+  static class ConstantMapFn extends MapFn<StringWrapper, StringWrapper> {
+
+    @Override
+    public StringWrapper map(StringWrapper input) {
+      return StringWrapper.wrap("key");
+    }
+    
+  }
+  
+  @Test
+  public void testCogroup_CheckObjectResultOnRichObjects() throws IOException {
+    Pipeline pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
+    PTable<StringWrapper, StringWrapper> tableA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+      .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
+      .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
+    PTable<StringWrapper, StringWrapper> tableB = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+        .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
+        .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
+    
+    List<String> set1Values = Lists.newArrayList();
+    List<String> set2Values = Lists.newArrayList();
+    PTable<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> cogroup = Cogroup.cogroup(tableA, tableB);
+    for (Pair<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> entry : cogroup.materialize()) {
+      for (StringWrapper stringWrapper : entry.second().first()) {
+        set1Values.add(stringWrapper.getValue());
+      }
+      for (StringWrapper stringWrapper : entry.second().second()) {
+        set2Values.add(stringWrapper.getValue());
+      }
+    }
+    
+    Collections.sort(set1Values);
+    Collections.sort(set2Values);
+    
+    assertEquals(ImmutableList.of("a", "b", "c", "e"), set1Values);
+    assertEquals(ImmutableList.of("a", "c", "d"), set2Values);
+    
+  }
 }

http://git-wip-us.apache.org/repos/asf/crunch/blob/64497fa4/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
index df1e4a5..07d873c 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -51,7 +51,8 @@ public class Cogroup {
     PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
 
     PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), ptf.collections(rightType));
-    return both.groupByKey().parallelDo("cogroup", new PostGroupFn<K, U, V>(), ptf.tableOf(keyType, otype));
+    return both.groupByKey().parallelDo("cogroup", 
+        new PostGroupFn<K, U, V>(leftType, rightType), ptf.tableOf(keyType, otype));
   }
 
   private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
@@ -70,6 +71,22 @@ public class Cogroup {
 
   private static class PostGroupFn<K, V, U> extends
       DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+    
+    private PType<V> ptypeV;
+    private PType<U> ptypeU;
+    
+    public PostGroupFn(PType<V> ptypeV, PType<U> ptypeU) {
+      this.ptypeV = ptypeV;
+      this.ptypeU = ptypeU;
+    }
+    
+    @Override
+    public void initialize() {
+      super.initialize();
+      ptypeV.initialize(getConfiguration());
+      ptypeU.initialize(getConfiguration());
+    }
+    
     @Override
     public void process(Pair<K, Iterable<Pair<V, U>>> input,
         Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {
@@ -77,9 +94,9 @@ public class Cogroup {
       Collection<U> cu = Lists.newArrayList();
       for (Pair<V, U> pair : input.second()) {
         if (pair.first() != null) {
-          cv.add(pair.first());
+          cv.add(ptypeV.getDetachedValue(pair.first()));
         } else if (pair.second() != null) {
-          cu.add(pair.second());
+          cu.add(ptypeU.getDetachedValue(pair.second()));
         }
       }
       emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));