You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by gr...@apache.org on 2012/10/06 09:14:08 UTC

git commit: CRUNCH-88 - route grouped table to multiple outputs

Updated Branches:
  refs/heads/master 1c95647be -> b0165faae


CRUNCH-88 - route grouped table to multiple outputs

Fix issue where the contents of a PGroupedTableImpl could only
successfully be sent to a single PCollection down the pipeline.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/b0165faa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/b0165faa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/b0165faa

Branch: refs/heads/master
Commit: b0165faaea013a0d6124308154039c33c7b85002
Parents: 1c95647
Author: Gabriel Reid <gr...@apache.org>
Authored: Thu Oct 4 22:40:59 2012 +0200
Committer: Gabriel Reid <gr...@apache.org>
Committed: Sat Oct 6 09:08:08 2012 +0200

----------------------------------------------------------------------
 .../it/java/org/apache/crunch/MRPipelineIT.java    |   28 +++++++++++++++
 .../crunch/impl/mr/collect/PCollectionImpl.java    |   13 ++++++-
 .../crunch/impl/mr/collect/PGroupedTableImpl.java  |   11 +++++-
 3 files changed, 48 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
index e4ff91d..0865820 100644
--- a/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
+++ b/crunch/src/it/java/org/apache/crunch/MRPipelineIT.java
@@ -17,13 +17,19 @@
  */
 package org.apache.crunch;
 
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
+import java.io.IOException;
 import java.io.Serializable;
+import java.util.Arrays;
 
+import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.io.To;
 import org.apache.crunch.test.TemporaryPath;
 import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.writable.Writables;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -51,5 +57,27 @@ public class MRPipelineIT implements Serializable {
     write.materialize();
     pipeline.run();
   }
+  
+  
+  
+  @Test
+  public void testPGroupedTableToMultipleOutputs() throws IOException{
+    Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration());
+    PGroupedTable<String, String> groupedLineTable = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")).by(IdentityFn.<String>getInstance(), Writables.strings()).groupByKey();
+    
+    PTable<String, String> ungroupedTableA = groupedLineTable.ungroup();
+    PTable<String, String> ungroupedTableB = groupedLineTable.ungroup();
+    
+    File outputDirA = tmpDir.getFile("output_a");
+    File outputDirB = tmpDir.getFile("output_b");
+    
+    pipeline.writeTextFile(ungroupedTableA, outputDirA.getAbsolutePath());
+    pipeline.writeTextFile(ungroupedTableB, outputDirB.getAbsolutePath());
+    pipeline.done();
+
+    // Verify that output from a single PGroupedTable can be sent to multiple collections
+    assertTrue(new File(outputDirA, "part-r-00000").exists());
+    assertTrue(new File(outputDirB, "part-r-00000").exists());
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index d4948c0..f0d8187 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -86,7 +86,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   @Override
   public <T> PCollection<T> parallelDo(String name, DoFn<S, T> fn, PType<T> type) {
-    return new DoCollectionImpl<T>(name, this, fn, type);
+    return new DoCollectionImpl<T>(name, getChainingCollection(), fn, type);
   }
 
   @Override
@@ -97,7 +97,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   @Override
   public <K, V> PTable<K, V> parallelDo(String name, DoFn<S, Pair<K, V>> fn, PTableType<K, V> type) {
-    return new DoTableImpl<K, V>(name, this, fn, type);
+    return new DoTableImpl<K, V>(name, getChainingCollection(), fn, type);
   }
 
   public PCollection<S> write(Target target) {
@@ -254,4 +254,13 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   }
 
   protected abstract long getSizeInternal();
+  
+  /**
+   * Retrieve the PCollectionImpl to be used for chaining within PCollectionImpls further down the pipeline.
+   * @return The PCollectionImpl instance to be chained
+   */
+  protected PCollectionImpl<S> getChainingCollection(){
+    return this;
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/b0165faa/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
index 5a40413..fee381d 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PGroupedTableImpl.java
@@ -42,7 +42,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   private final PTableBase<K, V> parent;
   private final GroupingOptions groupingOptions;
   private final PGroupedTableType<K, V> ptype;
-
+  
   PGroupedTableImpl(PTableBase<K, V> parent) {
     this(parent, null);
   }
@@ -79,7 +79,7 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   }
 
   public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
-    return new DoTableImpl<K, V>("combine", this, combineFn, parent.getPTableType());
+    return new DoTableImpl<K, V>("combine", getChainingCollection(), combineFn, parent.getPTableType());
   }
 
   private static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
@@ -113,4 +113,11 @@ public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>
   public DoNode getGroupingNode() {
     return DoNode.createGroupingNode("", ptype);
   }
+  
+  @Override
+  protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() {
+    // Use a copy for chaining to allow sending the output of a single grouped table to multiple outputs
+    // TODO This should be implemented in a cleaner way in the planner
+    return new PGroupedTableImpl<K, V>(parent, groupingOptions);
+  }
 }