You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/10/06 09:14:08 UTC
git commit: CRUNCH-88 - route grouped table to multiple outputs
Updated Branches:
refs/heads/master 1c95647be -> b0165faae
CRUNCH-88 - route grouped table to multiple outputs
Fix issue where the contents of a PGroupedTableImpl could only
successfully be sent to a single PCollection down the pipeline.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b0165faa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b0165faa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b0165faa
Branch: refs/heads/master
Commit: b0165faaea013a0d6124308154039c33c7b85002
Parents: 1c95647
Author: Gabriel Reid <gr...@apache.org>
Authored: Thu Oct 4 22:40:59 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Sat Oct 6 09:08:08 2012 +0200
----------------------------------------------------------------------
.../it/java/org/apache/crunch/MRPipelineIT.java | 28 +++++++++++++++
.../crunch/impl/mr/collect/PCollectionImpl.java | 13 ++++++-
.../crunch/impl/mr/collect/PGroupedTableImpl.java | 11 +++++-
3 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
index e4ff91d..0865820 100644
--- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -17,13 +17,19 @@
*/
package org.apache.crunch;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
+import java.io.IOException;
import java.io.Serializable;
+import java.util.Arrays;
+import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.io.To;
import org.apache.crunch.test.TemporaryPath;
import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
import org.junit.Rule;
import org.junit.Test;
@@ -51,5 +57,27 @@ public class MRPipelineIT implements Serializable {
write.materialize();
pipeline.run();
}
+
+
+
+ @Test
+ public void testPGroupedTableToMultipleOutputs() throws IOException{
+ Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+ PGroupedTable<String, String> groupedLineTable = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")).by(IdentityFn.<String>getInstance(), Writables.strings()).groupByKey();
+
+ PTable<String, String> ungroupedTableA = groupedLineTable.ungroup();
+ PTable<String, String> ungroupedTableB = groupedLineTable.ungroup();
+
+ File outputDirA = tmpDir.getFile("output_a");
+ File outputDirB = tmpDir.getFile("output_b");
+
+ pipeline.writeTextFile(ungroupedTableA, outputDirA.getAbsolutePath());
+ pipeline.writeTextFile(ungroupedTableB, outputDirB.getAbsolutePath());
+ pipeline.done();
+
+ // Verify that output from a single PGroupedTable can be sent to multiple collections
+ assertTrue(new File(outputDirA, "part-r-00000").exists());
+ assertTrue(new File(outputDirB, "part-r-00000").exists());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index d4948c0..f0d8187 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -86,7 +86,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
@Override
public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
- return new DoCollectionImpl<T>(name, this, fn, type);
+ return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type);
}
@Override
@@ -97,7 +97,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
@Override
public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
- return new DoTableImpl<K, V>(name, this, fn, type);
+ return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type);
}
public PCollection<S> write(Target target) {
@@ -254,4 +254,13 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
}
protected abstract long getSizeInternal();
+
+ /**
+ * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline.
+ * @return The PCollectionImpl instance to be chained
+ */
+ protected PCollectionImpl<S> getChainingCollection(){
+ return this;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index 5a40413..fee381d 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -42,7 +42,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
private final PTableBase<K, V> parent;
private final GroupingOptions groupingOptions;
private final PGroupedTableType<K, V> ptype;
-
+
PGroupedTableImpl(PTableBase<K, V> parent) {
this(parent, null);
}
@@ -79,7 +79,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
}
public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
- return new DoTableImpl<K, V>("combine", this, combineFn, parent.getPTableType());
+ return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
}
private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
@@ -113,4 +113,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
public DoNode getGroupingNode() {
return DoNode.createGroupingNode("", ptype);
}
+
+ @Override
+ protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() {
+ // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs
+ // TODO This should be implemented in a cleaner way in the planner
+ return new PGroupedTableImpl<K, V>(parent, groupingOptions);
+ }
}