You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2015/01/31 02:15:59 UTC
crunch git commit: CRUNCH-494: Add Pipeline.union methods to avoid
need to chain long unions of PCollections
Repository: crunch
Updated Branches:
refs/heads/master 7157c0a1f -> ebb1b2e32
CRUNCH-494: Add Pipeline.union methods to avoid need to chain long unions of PCollections
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ebb1b2e3
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ebb1b2e3
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ebb1b2e3
Branch: refs/heads/master
Commit: ebb1b2e32a901caaa2858e9d12536ce1095c1696
Parents: 7157c0a
Author: Josh Wills <jw...@apache.org>
Authored: Thu Jan 29 23:21:03 2015 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Fri Jan 30 16:57:04 2015 -0800
----------------------------------------------------------------------
.../main/java/org/apache/crunch/Pipeline.java | 6 +++++
.../crunch/impl/dist/DistributedPipeline.java | 26 ++++++++++++++++++++
.../org/apache/crunch/impl/mem/MemPipeline.java | 20 +++++++++++++++
.../crunch/impl/mem/collect/MemCollection.java | 10 ++------
.../crunch/impl/mem/collect/MemTable.java | 11 +++------
5 files changed, 57 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index ee11fee..cd4ce03 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -21,6 +21,8 @@ import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
+import java.util.List;
+
/**
* Manages the state of a pipeline execution.
*
@@ -189,6 +191,10 @@ public interface Pipeline {
*/
<K, V> PTable<K, V> create(Iterable<Pair<K, V>> contents, PTableType<K, V> ptype, CreateOptions options);
+ <S> PCollection<S> union(List<PCollection<S>> collections);
+
+ <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables);
+
/**
* Executes the given {@code PipelineCallable} on the client after the {@code Targets}
* that the PipelineCallable depends on (if any) have been created by other pipeline
http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 61c01f1..88da5a6 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -17,7 +17,9 @@
*/
package org.apache.crunch.impl.dist;
+import com.google.common.base.Function;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.crunch.CreateOptions;
@@ -44,6 +46,7 @@ import org.apache.crunch.impl.dist.collect.EmptyPCollection;
import org.apache.crunch.impl.dist.collect.EmptyPTable;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.dist.collect.PCollectionFactory;
+import org.apache.crunch.impl.dist.collect.PTableBase;
import org.apache.crunch.io.From;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.ReadableSourceTarget;
@@ -59,6 +62,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -126,6 +130,28 @@ public abstract class DistributedPipeline implements Pipeline {
}
@Override
+ public <S> PCollection<S> union(List<PCollection<S>> collections) {
+ return factory.createUnionCollection(
+ Lists.transform(collections, new Function<PCollection<S>, PCollectionImpl<S>>() {
+ @Override
+ public PCollectionImpl<S> apply(PCollection<S> in) {
+ return (PCollectionImpl<S>) in;
+ }
+ }));
+ }
+
+ @Override
+ public <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables) {
+ return factory.createUnionTable(
+ Lists.transform(tables, new Function<PTable<K, V>, PTableBase<K, V>>() {
+ @Override
+ public PTableBase<K, V> apply(PTable<K, V> in) {
+ return (PTableBase<K, V>) in;
+ }
+ }));
+ }
+
+ @Override
public <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable) {
allPipelineCallables.put(pipelineCallable, getDependencies(pipelineCallable));
PipelineCallable last = currentPipelineCallable;
http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index e61b6dc..4d4d5dd 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -18,6 +18,7 @@
package org.apache.crunch.impl.mem;
import java.io.IOException;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,6 +28,7 @@ import java.util.concurrent.TimeoutException;
import com.google.common.base.Charsets;
+import com.google.common.collect.Iterables;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.io.DatumWriter;
import org.apache.crunch.CachingOptions;
@@ -364,6 +366,24 @@ public class MemPipeline implements Pipeline {
}
@Override
+ public <S> PCollection<S> union(List<PCollection<S>> collections) {
+ List<S> output = Lists.newArrayList();
+ for (PCollection<S> pcollect : collections) {
+ Iterables.addAll(output, pcollect.materialize());
+ }
+ return new MemCollection<S>(output, collections.get(0).getPType());
+ }
+
+ @Override
+ public <K, V> PTable<K, V> unionTables(List<PTable<K, V>> tables) {
+ List<Pair<K, V>> values = Lists.newArrayList();
+ for (PTable<K, V> table : tables) {
+ Iterables.addAll(values, table.materialize());
+ }
+ return new MemTable<K, V>(values, tables.get(0).getPTableType(), null);
+ }
+
+ @Override
public <Output> Output sequentialDo(PipelineCallable<Output> callable) {
Output out = callable.generateOutput(this);
try {
http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index eaaab59..55b7821 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -93,14 +93,8 @@ public class MemCollection<S> implements PCollection<S> {
@Override
public PCollection<S> union(PCollection<S>... collections) {
- Collection<S> output = Lists.newArrayList();
- for (PCollection<S> pcollect : collections) {
- for (S s : pcollect.materialize()) {
- output.add(s);
- }
- }
- output.addAll(collect);
- return new MemCollection<S>(output, collections[0].getPType());
+ return getPipeline().union(
+ ImmutableList.<PCollection<S>>builder().add(this).add(collections).build());
}
private <S, T> DoFn<S, T> verifySerializable(String name, DoFn<S, T> doFn) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/ebb1b2e3/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 60279a9..3f3bd77 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -21,6 +21,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableList;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.FilterFn;
import org.apache.crunch.GroupingOptions;
@@ -62,14 +63,8 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
@Override
public PTable<K, V> union(PTable<K, V>... others) {
- List<Pair<K, V>> values = Lists.newArrayList();
- values.addAll(getCollection());
- for (PTable<K, V> ptable : others) {
- for (Pair<K, V> p : ptable.materialize()) {
- values.add(p);
- }
- }
- return new MemTable<K, V>(values, others[0].getPTableType(), null);
+ return getPipeline().unionTables(
+ ImmutableList.<PTable<K, V>>builder().add(this).add(others).build());
}
@Override