You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/31 02:15:59 UTC

crunch git commit: CRUNCH-494: Add Pipeline.union methods to avoid need to chain long unions of PCollections

Repository: crunch
Updated Branches:
  refs/heads/master 7157c0a1f -> ebb1b2e32


CRUNCH-494: Add Pipeline.union methods to avoid need to chain long unions of PCollections


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ebb1b2e3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ebb1b2e3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ebb1b2e3

Branch: refs/heads/master
Commit: ebb1b2e32a901caaa2858e9d12536ce1095c1696
Parents: 7157c0a
Author: Josh Wills <jw...@apache.org>
Authored: Thu Jan 29 23:21:03 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 30 16:57:04 2015 -0800

----------------------------------------------------------------------
 .../main/java/org/apache/crunch/Pipeline.java   |  6 +++++
 .../crunch/impl/dist/DistributedPipeline.java   | 26 ++++++++++++++++++++
 .../org/apache/crunch/impl/mem/MemPipeline.java | 20 +++++++++++++++
 .../crunch/impl/mem/collect/MemCollection.java  | 10 ++------
 .../crunch/impl/mem/collect/MemTable.java       | 11 +++------
 5 files changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index ee11fee..cd4ce03 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -21,6 +21,8 @@ import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
 
+import java.util.List;
+
 /**
  * Manages the state of a pipeline execution.
  * 
@@ -189,6 +191,10 @@ public interface Pipeline {
    */
   <K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype, CreateOptions options);
 
+  <S> PCollection<S> union(List<PCollection<S>> collections);
+
+  <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables);
+
   /**
    * Executes the given {@code PipelineCallable} on the client after the {@code Targets}
    * that the PipelineCallable depends on (if any) have been created by other pipeline

http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 61c01f1..88da5a6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -17,7 +17,9 @@
  */
 package org.apache.crunch.impl.dist;
 
+import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.crunch.CreateOptions;
@@ -44,6 +46,7 @@ import org.apache.crunch.impl.dist.collect.EmptyPCollection;
 import org.apache.crunch.impl.dist.collect.EmptyPTable;
 import org.apache.crunch.impl.dist.collect.PCollectionImpl;
 import org.apache.crunch.impl.dist.collect.PCollectionFactory;
+import org.apache.crunch.impl.dist.collect.PTableBase;
 import org.apache.crunch.io.From;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.ReadableSourceTarget;
@@ -59,6 +62,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -126,6 +130,28 @@ public abstract class DistributedPipeline implements Pipeline {
   }
 
   @Override
+  public <S> PCollection<S> union(List<PCollection<S>> collections) {
+    return factory.createUnionCollection(
+        Lists.transform(collections, new Function<PCollection<S>, PCollectionImpl<S>>() {
+          @Override
+          public PCollectionImpl<S> apply(PCollection<S> in) {
+            return (PCollectionImpl<S>) in;
+          }
+        }));
+  }
+
+  @Override
+  public <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables) {
+    return factory.createUnionTable(
+        Lists.transform(tables, new Function<PTable<K, V>, PTableBase<K, V>>() {
+          @Override
+          public PTableBase<K, V> apply(PTable<K, V> in) {
+            return (PTableBase<K, V>) in;
+          }
+        }));
+  }
+
+  @Override
   public <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable) {
     allPipelineCallables.put(pipelineCallable, getDependencies(pipelineCallable));
     PipelineCallable last = currentPipelineCallable;

http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index e61b6dc..4d4d5dd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -18,6 +18,7 @@
 package org.apache.crunch.impl.mem;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,6 +28,7 @@ import java.util.concurrent.TimeoutException;
 
 import com.google.common.base.Charsets;
 
+import com.google.common.collect.Iterables;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.io.DatumWriter;
 import org.apache.crunch.CachingOptions;
@@ -364,6 +366,24 @@ public class MemPipeline implements Pipeline {
   }
 
   @Override
+  public <S> PCollection<S> union(List<PCollection<S>> collections) {
+    List<S> output = Lists.newArrayList();
+    for (PCollection<S> pcollect : collections) {
+      Iterables.addAll(output, pcollect.materialize());
+    }
+    return new MemCollection<S>(output, collections.get(0).getPType());
+  }
+
+  @Override
+  public <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables) {
+    List<Pair<K, V>> values = Lists.newArrayList();
+    for (PTable<K, V> table : tables) {
+      Iterables.addAll(values, table.materialize());
+    }
+    return new MemTable<K, V>(values, tables.get(0).getPTableType(), null);
+  }
+
+  @Override
   public <Output> Output sequentialDo(PipelineCallable<Output> callable) {
     Output out = callable.generateOutput(this);
     try {

http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index eaaab59..55b7821 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -93,14 +93,8 @@ public class MemCollection<S> implements PCollection<S> {
   
   @Override
   public PCollection<S> union(PCollection<S>... collections) {
-    Collection<S> output = Lists.newArrayList();
-    for (PCollection<S> pcollect : collections) {
-      for (S s : pcollect.materialize()) {
-        output.add(s);
-      }
-    }
-    output.addAll(collect);
-    return new MemCollection<S>(output, collections[0].getPType());
+    return getPipeline().union(
+        ImmutableList.<PCollection<S>>builder().add(this).add(collections).build());
   }
 
   private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T> doFn) {

http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 60279a9..3f3bd77 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
+import com.google.common.collect.ImmutableList;
 import org.apache.crunch.CachingOptions;
 import org.apache.crunch.FilterFn;
 import org.apache.crunch.GroupingOptions;
@@ -62,14 +63,8 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
   
   @Override
   public PTable<K, V> union(PTable<K, V>... others) {
-    List<Pair<K, V>> values = Lists.newArrayList();
-    values.addAll(getCollection());
-    for (PTable<K, V> ptable : others) {
-      for (Pair<K, V> p : ptable.materialize()) {
-        values.add(p);
-      }
-    }
-    return new MemTable<K, V>(values, others[0].getPTableType(), null);
+    return getPipeline().unionTables(
+        ImmutableList.<PTable<K, V>>builder().add(this).add(others).build());
   }
 
   @Override