You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2013/04/02 21:53:56 UTC
git commit: CRUNCH-190 Deep-copy retained values in CoGroup
Updated Branches:
refs/heads/master 154c8fbd3 -> 64497fa4f
CRUNCH-190 Deep-copy retained values in CoGroup
Use the PType#getDetachedValue functionality to deep copy
values where necessary before retaining them in the reduce
portion of the CoGroup library.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/64497fa4
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/64497fa4
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/64497fa4
Branch: refs/heads/master
Commit: 64497fa4f40827d740d341d5513bafff6ed20a9d
Parents: 154c8fb
Author: Gabriel Reid <gr...@apache.org>
Authored: Tue Apr 2 21:48:11 2013 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Tue Apr 2 21:48:11 2013 +0200
----------------------------------------------------------------------
.../it/java/org/apache/crunch/lib/CogroupIT.java | 47 +++++++++++++++
.../main/java/org/apache/crunch/lib/Cogroup.java | 23 ++++++-
2 files changed, 67 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/64497fa4/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
index 99950a4..af3329f 100644
--- a/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
+++ b/crunch/src/it/java/org/apache/crunch/lib/CogroupIT.java
@@ -17,16 +17,19 @@
*/
package org.apache.crunch.lib;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
+import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
@@ -36,16 +39,21 @@ import org.apache.crunch.fn.MapKeysFn;
import org.apache.crunch.fn.MapValuesFn;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.From;
+import org.apache.crunch.test.StringWrapper;
+import org.apache.crunch.test.StringWrapper.StringToStringWrapperMapFn;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.crunch.types.avro.AvroTypeFamily;
+import org.apache.crunch.types.avro.Avros;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.base.Splitter;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
import com.google.common.io.Files;
public class CogroupIT {
@@ -123,4 +131,43 @@ public class CogroupIT {
}
assertTrue(passed);
}
+
+ static class ConstantMapFn extends MapFn<StringWrapper, StringWrapper> {
+
+ @Override
+ public StringWrapper map(StringWrapper input) {
+ return StringWrapper.wrap("key");
+ }
+
+ }
+
+ @Test
+ public void testCogroup_CheckObjectResultOnRichObjects() throws IOException {
+ Pipeline pipeline = new MRPipeline(CogroupIT.class, tmpDir.getDefaultConfiguration());
+ PTable<StringWrapper, StringWrapper> tableA = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt"))
+ .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
+ .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
+ PTable<StringWrapper, StringWrapper> tableB = pipeline.readTextFile(tmpDir.copyResourceFileName("set2.txt"))
+ .parallelDo(new StringToStringWrapperMapFn(), Avros.reflects(StringWrapper.class))
+ .by(new ConstantMapFn(), Avros.reflects(StringWrapper.class));
+
+ List<String> set1Values = Lists.newArrayList();
+ List<String> set2Values = Lists.newArrayList();
+ PTable<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> cogroup = Cogroup.cogroup(tableA, tableB);
+ for (Pair<StringWrapper, Pair<Collection<StringWrapper>, Collection<StringWrapper>>> entry : cogroup.materialize()) {
+ for (StringWrapper stringWrapper : entry.second().first()) {
+ set1Values.add(stringWrapper.getValue());
+ }
+ for (StringWrapper stringWrapper : entry.second().second()) {
+ set2Values.add(stringWrapper.getValue());
+ }
+ }
+
+ Collections.sort(set1Values);
+ Collections.sort(set2Values);
+
+ assertEquals(ImmutableList.of("a", "b", "c", "e"), set1Values);
+ assertEquals(ImmutableList.of("a", "c", "d"), set2Values);
+
+ }
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/64497fa4/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
index df1e4a5..07d873c 100644
--- a/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
+++ b/crunch/src/main/java/org/apache/crunch/lib/Cogroup.java
@@ -51,7 +51,8 @@ public class Cogroup {
PTable<K, Pair<U, V>> both = cgLeft.union(cgRight);
PType<Pair<Collection<U>, Collection<V>>> otype = ptf.pairs(ptf.collections(leftType), ptf.collections(rightType));
- return both.groupByKey().parallelDo("cogroup", new PostGroupFn<K, U, V>(), ptf.tableOf(keyType, otype));
+ return both.groupByKey().parallelDo("cogroup",
+ new PostGroupFn<K, U, V>(leftType, rightType), ptf.tableOf(keyType, otype));
}
private static class CogroupFn1<K, V, U> extends MapValuesFn<K, V, Pair<V, U>> {
@@ -70,6 +71,22 @@ public class Cogroup {
private static class PostGroupFn<K, V, U> extends
DoFn<Pair<K, Iterable<Pair<V, U>>>, Pair<K, Pair<Collection<V>, Collection<U>>>> {
+
+ private PType<V> ptypeV;
+ private PType<U> ptypeU;
+
+ public PostGroupFn(PType<V> ptypeV, PType<U> ptypeU) {
+ this.ptypeV = ptypeV;
+ this.ptypeU = ptypeU;
+ }
+
+ @Override
+ public void initialize() {
+ super.initialize();
+ ptypeV.initialize(getConfiguration());
+ ptypeU.initialize(getConfiguration());
+ }
+
@Override
public void process(Pair<K, Iterable<Pair<V, U>>> input,
Emitter<Pair<K, Pair<Collection<V>, Collection<U>>>> emitter) {
@@ -77,9 +94,9 @@ public class Cogroup {
Collection<U> cu = Lists.newArrayList();
for (Pair<V, U> pair : input.second()) {
if (pair.first() != null) {
- cv.add(pair.first());
+ cv.add(ptypeV.getDetachedValue(pair.first()));
} else if (pair.second() != null) {
- cu.add(pair.second());
+ cu.add(ptypeU.getDetachedValue(pair.second()));
}
}
emitter.emit(Pair.of(input.first(), Pair.of(cv, cu)));