You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/12/29 20:51:20 UTC
git commit: CRUNCH-315: Add support for Empty PCollections/PTables.
Updated Branches:
refs/heads/master 58eb227d7 -> 5d666fe97
CRUNCH-315: Add support for Empty PCollections/PTables.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/5d666fe9
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/5d666fe9
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/5d666fe9
Branch: refs/heads/master
Commit: 5d666fe97b2273d17accaa9ec1adcb8cbee41885
Parents: 58eb227
Author: Josh Wills <jw...@apache.org>
Authored: Wed Dec 25 21:43:34 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Dec 29 11:48:14 2013 -0800
----------------------------------------------------------------------
.../org/apache/crunch/EmptyPCollectionIT.java | 83 ++++++++++++++++++++
.../main/java/org/apache/crunch/Pipeline.java | 6 ++
.../crunch/impl/dist/DistributedPipeline.java | 20 +++--
.../impl/dist/collect/EmptyPCollection.java | 67 ++++++++++++++++
.../crunch/impl/dist/collect/EmptyPTable.java | 72 +++++++++++++++++
.../impl/dist/collect/EmptyReadableData.java | 45 +++++++++++
.../org/apache/crunch/impl/mem/MemPipeline.java | 10 +++
.../crunch/impl/mr/MRPipelineExecution.java | 2 -
.../apache/crunch/impl/mr/plan/MSCRPlanner.java | 17 +++-
.../java/org/apache/crunch/io/CrunchInputs.java | 7 +-
.../apache/crunch/SparkEmptyPCollectionIT.java | 83 ++++++++++++++++++++
.../apache/crunch/impl/spark/SparkPipeline.java | 15 ++++
.../impl/spark/collect/EmptyPCollection.java | 38 +++++++++
.../crunch/impl/spark/collect/EmptyPTable.java | 38 +++++++++
14 files changed, 488 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
new file mode 100644
index 0000000..2e5a8c3
--- /dev/null
+++ b/crunch-core/src/it/java/org/apache/crunch/EmptyPCollectionIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.CrunchTestSupport;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Test;
+
+import java.io.Serializable;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class EmptyPCollectionIT extends CrunchTestSupport implements Serializable {
+
+ private static class SplitFn extends DoFn<String, Pair<String, Long>> {
+ @Override
+ public void process(String input, Emitter<Pair<String, Long>> emitter) {
+ for (String word : input.split("\\s+")) {
+ emitter.emit(Pair.of(word, 1L));
+ }
+ }
+ }
+
+ @Test
+ public void testEmptyMR() throws Exception {
+ MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+ assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+ .groupByKey()
+ .combineValues(Aggregators.SUM_LONGS())
+ .materialize()));
+ p.done();
+ }
+
+ @Test
+ public void testUnionWithEmptyMR() throws Exception {
+ MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+ assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+ .union(
+ p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+ .groupByKey()
+ .combineValues(Aggregators.SUM_LONGS())
+ .materialize()));
+ p.done();
+ }
+
+ @Test
+ public void testUnionTableWithEmptyMR() throws Exception {
+ MRPipeline p = new MRPipeline(EmptyPCollectionIT.class, tempDir.getDefaultConfiguration());
+ assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs()))
+ .union(
+ p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+ .groupByKey()
+ .combineValues(Aggregators.SUM_LONGS())
+ .materialize()));
+ p.done();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
index 3b0bac2..f34d0ef 100644
--- a/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/Pipeline.java
@@ -17,6 +17,8 @@
*/
package org.apache.crunch;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
/**
@@ -109,6 +111,10 @@ public interface Pipeline {
*/
<T> void cache(PCollection<T> pcollection, CachingOptions options);
+ <T> PCollection<T> emptyPCollection(PType<T> ptype);
+
+ <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype);
+
/**
* Constructs and executes a series of MapReduce jobs in order to write data
* to the output targets.
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
index 28dbaec..82517f3 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/DistributedPipeline.java
@@ -37,6 +37,8 @@ import org.apache.crunch.impl.dist.collect.BaseInputTable;
import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
import org.apache.crunch.impl.dist.collect.BaseUnionTable;
+import org.apache.crunch.impl.dist.collect.EmptyPCollection;
+import org.apache.crunch.impl.dist.collect.EmptyPTable;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.dist.collect.PCollectionFactory;
import org.apache.crunch.io.From;
@@ -44,6 +46,7 @@ import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.io.To;
import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
@@ -170,20 +173,15 @@ public abstract class DistributedPipeline implements Pipeline {
outputTargets.get(impl).add(target);
}
- // TODO: sort this out
- /*
@Override
- public <T> Iterable<T> materialize(PCollection<T> pcollection) {
- C pcollectionImpl = toPCollectionImpl(pcollection);
- ReadableSource<?> readableSrc = getMaterializeSourceTarget(pcollectionImpl);
+ public <S> PCollection<S> emptyPCollection(PType<S> ptype) {
+ return new EmptyPCollection<S>(this, ptype);
+ }
- MaterializableIterable<?> c = new MaterializableIterable(this, readableSrc);
- if (!outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
- outputTargetsToMaterialize.put(pcollectionImpl, c);
- }
- return (Iterable<T>) c;
+ @Override
+ public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+ return new EmptyPTable<K, V>(this, ptype);
}
- */
/**
* Retrieve a ReadableSourceTarget that provides access to the contents of a {@link PCollection}.
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
new file mode 100644
index 0000000..bc2d141
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPCollection.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class EmptyPCollection<T> extends PCollectionImpl<T> {
+
+ private final PType<T> ptype;
+
+ public EmptyPCollection(DistributedPipeline pipeline, PType<T> ptype) {
+ super("EMPTY", pipeline);
+ this.ptype = Preconditions.checkNotNull(ptype);
+ }
+
+ @Override
+ protected void acceptInternal(Visitor visitor) {
+ // No-op
+ }
+
+ @Override
+ public List<PCollectionImpl<?>> getParents() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ protected ReadableData<T> getReadableDataInternal() {
+ return new EmptyReadableData<T>();
+ }
+
+ @Override
+ protected long getSizeInternal() {
+ return 0;
+ }
+
+ @Override
+ public long getLastModifiedAt() {
+ return 0;
+ }
+
+ @Override
+ public PType<T> getPType() {
+ return ptype;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
new file mode 100644
index 0000000..6b8c516
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyPTable.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+
+import java.util.List;
+
+public class EmptyPTable<K, V> extends PTableBase<K, V> {
+
+ private final PTableType<K, V> ptype;
+
+ public EmptyPTable(DistributedPipeline pipeline, PTableType<K, V> ptype) {
+ super("EMPTY", pipeline);
+ this.ptype = ptype;
+ }
+
+ @Override
+ protected void acceptInternal(Visitor visitor) {
+ // No-op
+ }
+
+ @Override
+ public List<PCollectionImpl<?>> getParents() {
+ return ImmutableList.of();
+ }
+
+ @Override
+ protected ReadableData<Pair<K, V>> getReadableDataInternal() {
+ return new EmptyReadableData<Pair<K, V>>();
+ }
+
+ @Override
+ protected long getSizeInternal() {
+ return 0;
+ }
+
+ @Override
+ public long getLastModifiedAt() {
+ return 0;
+ }
+
+ @Override
+ public PTableType<K, V> getPTableType() {
+ return ptype;
+ }
+
+ @Override
+ public PType<Pair<K, V>> getPType() {
+ return ptype;
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
new file mode 100644
index 0000000..65825d4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/dist/collect/EmptyReadableData.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.dist.collect;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+
+import java.io.IOException;
+import java.util.Set;
+
+class EmptyReadableData<T> implements ReadableData<T> {
+
+ @Override
+ public Set<SourceTarget<?>> getSourceTargets() {
+ return ImmutableSet.of();
+ }
+
+ @Override
+ public void configure(Configuration conf) {
+ }
+
+ @Override
+ public Iterable<T> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
+ return ImmutableList.of();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index ced1700..7ef9f4f 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -312,6 +312,16 @@ public class MemPipeline implements Pipeline {
}
@Override
+ public <T> PCollection<T> emptyPCollection(PType<T> ptype) {
+ return typedCollectionOf(ptype, ImmutableList.<T>of());
+ }
+
+ @Override
+ public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+ return typedTableOf(ptype, ImmutableList.<Pair<K, V>>of());
+ }
+
+ @Override
public PipelineExecution runAsync() {
activeTargets.clear();
return new MemExecution();
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
index b9d53fe..b7df522 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/MRPipelineExecution.java
@@ -22,7 +22,5 @@ import org.apache.crunch.PipelineExecution;
import java.util.List;
public interface MRPipelineExecution extends PipelineExecution {
-
List<MRJob> getJobs();
-
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
index 97ac866..bce7010 100644
--- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
+++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/MSCRPlanner.java
@@ -24,6 +24,8 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
@@ -42,6 +44,8 @@ import com.google.common.collect.Sets;
public class MSCRPlanner {
+ private static final Log LOG = LogFactory.getLog(MSCRPlanner.class);
+
private final MRPipeline pipeline;
private final Map<PCollectionImpl<?>, Set<Target>> outputs;
private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
@@ -98,7 +102,18 @@ public class MSCRPlanner {
}
Graph baseGraph = graphBuilder.getGraph();
-
+ boolean hasInputs = false;
+ for (Vertex v : baseGraph) {
+ if (v.isInput()) {
+ hasInputs = true;
+ break;
+ }
+ }
+ if (!hasInputs) {
+ LOG.warn("No input sources for pipeline, nothing to do...");
+ return new MRExecutor(conf, jarClass, outputs, toMaterialize);
+ }
+
// Create a new graph that splits up up dependent GBK nodes.
Graph graph = prepareFinalGraph(baseGraph);
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
index c1a0eef..bcdcb55 100644
--- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -20,6 +20,7 @@ package org.apache.crunch.io;
import java.util.List;
import java.util.Map;
+import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
@@ -53,7 +54,11 @@ public class CrunchInputs {
public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
Configuration conf = job.getConfiguration();
- for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
+ String crunchInputs = conf.get(CRUNCH_INPUTS);
+ if (crunchInputs == null || crunchInputs.isEmpty()) {
+ return ImmutableMap.of();
+ }
+ for (String input : Splitter.on(RECORD_SEP).split(crunchInputs)) {
List<String> fields = Lists.newArrayList(SPLITTER.split(input));
FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), job.getConfiguration());
if (!formatNodeMap.containsKey(inputBundle)) {
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
new file mode 100644
index 0000000..3137252
--- /dev/null
+++ b/crunch-spark/src/it/java/org/apache/crunch/SparkEmptyPCollectionIT.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import com.google.common.collect.Iterables;
+import org.apache.crunch.fn.Aggregators;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.impl.spark.SparkPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.writable.Writables;
+import org.junit.Rule;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SparkEmptyPCollectionIT {
+ private static class SplitFn extends DoFn<String, Pair<String, Long>> {
+ @Override
+ public void process(String input, Emitter<Pair<String, Long>> emitter) {
+ for (String word : input.split("\\s+")) {
+ emitter.emit(Pair.of(word, 1L));
+ }
+ }
+ }
+
+ @Rule
+ public TemporaryPath tempDir = new TemporaryPath();
+
+ @Test
+ public void testEmptyMR() throws Exception {
+ Pipeline p = new SparkPipeline("local", "empty");
+ assertTrue(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+ .groupByKey()
+ .combineValues(Aggregators.SUM_LONGS())
+ .materialize()));
+ p.done();
+ }
+
+ @Test
+ public void testUnionWithEmptyMR() throws Exception {
+ Pipeline p = new SparkPipeline("local", "empty");
+ assertFalse(Iterables.isEmpty(p.emptyPCollection(Writables.strings())
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs()))
+ .union(
+ p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+ .groupByKey()
+ .combineValues(Aggregators.SUM_LONGS())
+ .materialize()));
+ p.done();
+ }
+
+ @Test
+ public void testUnionTableWithEmptyMR() throws Exception {
+ Pipeline p = new SparkPipeline("local", "empty");
+ assertFalse(Iterables.isEmpty(p.emptyPTable(Writables.tableOf(Writables.strings(), Writables.longs()))
+ .union(
+ p.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")))
+ .parallelDo(new SplitFn(), Writables.tableOf(Writables.strings(), Writables.longs())))
+ .groupByKey()
+ .combineValues(Aggregators.SUM_LONGS())
+ .materialize()));
+ p.done();
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
index 674f0c8..49e1d35 100644
--- a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/SparkPipeline.java
@@ -21,13 +21,18 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
+import org.apache.crunch.impl.spark.collect.EmptyPCollection;
+import org.apache.crunch.impl.spark.collect.EmptyPTable;
import org.apache.crunch.impl.spark.collect.SparkCollectFactory;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.materialize.MaterializableIterable;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
@@ -62,6 +67,16 @@ public class SparkPipeline extends DistributedPipeline {
}
@Override
+ public <S> PCollection<S> emptyPCollection(PType<S> ptype) {
+ return new EmptyPCollection<S>(this, ptype);
+ }
+
+ @Override
+ public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> ptype) {
+ return new EmptyPTable<K, V>(this, ptype);
+ }
+
+ @Override
public <T> void cache(PCollection<T> pcollection, CachingOptions options) {
cachedCollections.put(pcollection, toStorageLevel(options));
}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
new file mode 100644
index 0000000..7a298fb
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPCollection.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.types.PType;
+import org.apache.spark.api.java.JavaRDDLike;
+
+public class EmptyPCollection<T> extends org.apache.crunch.impl.dist.collect.EmptyPCollection<T>
+ implements SparkCollection {
+
+ public EmptyPCollection(DistributedPipeline pipeline, PType<T> ptype) {
+ super(pipeline, ptype);
+ }
+
+ @Override
+ public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+ return runtime.getSparkContext().parallelize(ImmutableList.of());
+ }
+}
http://git-wip-us.apache.org/repos/asf/crunch/blob/5d666fe9/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
----------------------------------------------------------------------
diff --git a/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
new file mode 100644
index 0000000..97d42fd
--- /dev/null
+++ b/crunch-spark/src/main/java/org/apache/crunch/impl/spark/collect/EmptyPTable.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.impl.spark.collect;
+
+import com.google.common.collect.ImmutableList;
+import org.apache.crunch.impl.dist.DistributedPipeline;
+import org.apache.crunch.impl.spark.SparkCollection;
+import org.apache.crunch.impl.spark.SparkRuntime;
+import org.apache.crunch.types.PTableType;
+import org.apache.spark.api.java.JavaRDDLike;
+import scala.Tuple2;
+
+public class EmptyPTable<K, V> extends org.apache.crunch.impl.dist.collect.EmptyPTable<K, V> implements SparkCollection {
+
+ public EmptyPTable(DistributedPipeline pipeline, PTableType<K, V> ptype) {
+ super(pipeline, ptype);
+ }
+
+ @Override
+ public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime runtime) {
+ return runtime.getSparkContext().parallelizePairs(ImmutableList.<Tuple2<K, V>>of());
+ }
+}