You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/02/11 16:16:31 UTC
git commit: CRUNCH-132: Second cut at adding a WriteMode for Targets
and changing the default to fail if an output directory already exists.
Updated Branches:
refs/heads/master 6f33d586f -> 849ad5fe4
CRUNCH-132: Second cut at adding a WriteMode for Targets and changing the default
to fail if an output directory already exists.
Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/849ad5fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/849ad5fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/849ad5fe
Branch: refs/heads/master
Commit: 849ad5fe48a411a4d925556f32ba5d8a366511d4
Parents: 6f33d58
Author: Josh Wills <jw...@apache.org>
Authored: Sat Feb 9 14:02:34 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Feb 10 22:23:28 2013 -0800
----------------------------------------------------------------------
.../org/apache/crunch/io/hbase/HBaseTarget.java | 9 ++
.../apache/crunch/scrunch/PCollectionLike.scala | 4 +
.../org/apache/crunch/scrunch/PipelineLike.scala | 22 +++
.../crunch/impl/mem/MemPipelineFileWritingIT.java | 4 +-
.../main/java/org/apache/crunch/PCollection.java | 12 ++
crunch/src/main/java/org/apache/crunch/PTable.java | 6 +
.../src/main/java/org/apache/crunch/Pipeline.java | 20 +++-
crunch/src/main/java/org/apache/crunch/Target.java | 56 ++++++++-
.../org/apache/crunch/impl/mem/MemPipeline.java | 27 ++++-
.../crunch/impl/mem/collect/MemCollection.java | 6 +
.../apache/crunch/impl/mem/collect/MemTable.java | 6 +
.../java/org/apache/crunch/impl/mr/MRPipeline.java | 22 +++-
.../crunch/impl/mr/collect/PCollectionImpl.java | 13 ++-
.../apache/crunch/impl/mr/collect/PTableBase.java | 20 +++-
.../org/apache/crunch/io/impl/FileTargetImpl.java | 52 ++++++++
.../apache/crunch/io/impl/SourceTargetImpl.java | 5 +
.../test/java/org/apache/crunch/WriteModeTest.java | 103 +++++++++++++++
17 files changed, 376 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 44864e8..eceb31d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -20,6 +20,8 @@ package org.apache.crunch.io.hbase;
import java.io.IOException;
import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.io.CrunchOutputs;
@@ -39,6 +41,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HBaseTarget implements MapReduceTarget {
+ private static final Log LOG = LogFactory.getLog(HBaseTarget.class);
+
protected String table;
public HBaseTarget(String table) {
@@ -110,4 +114,9 @@ public class HBaseTarget implements MapReduceTarget {
public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
return null;
}
+
+ @Override
+ public void handleExisting(WriteMode strategy, Configuration conf) {
+ LOG.info("HBaseTarget ignores checks for existing outputs...");
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index f6441ac..68fe7a5 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -28,6 +28,10 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
def write(target: Target): FullType = wrap(native.write(target))
+ def write(target: Target, writeMode: Target.WriteMode): FullType = {
+ wrap(native.write(target, writeMode))
+ }
+
def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
new PCollection[T](native.parallelDo(fn, ptype))
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index 5a10ee7..c183e5e 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -61,6 +61,17 @@ trait PipelineLike {
def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target)
/**
+ * Writes a parallel collection to a target using an output strategy.
+ *
+ * @param collection The collection to write.
+ * @param target The destination target for this write.
+ * @param writeMode The WriteMode to use for handling existing outputs.
+ */
+ def write(collection: PCollection[_], target: Target, writeMode: Target.WriteMode): Unit = {
+ jpipeline.write(collection.native, target, writeMode)
+ }
+
+ /**
* Writes a parallel table to a target.
*
* @param table The table to write.
@@ -69,6 +80,17 @@ trait PipelineLike {
def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target)
/**
+ * Writes a parallel table to a target.
+ *
+ * @param table The table to write.
+ * @param target The destination target for this write.
+ * @param writeMode The write mode to use on the target
+ */
+ def write(table: PTable[_, _], target: Target, writeMode: Target.WriteMode): Unit = {
+ jpipeline.write(table.native, target, writeMode)
+ }
+
+ /**
* Constructs and executes a series of MapReduce jobs in order
* to write data to the output targets.
*/
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
index dc9652d..976a43e 100644
--- a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
+++ b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
@@ -40,10 +40,10 @@ public class MemPipelineFileWritingIT {
@Test
public void testMemPipelineFileWriter() throws Exception {
- File tmpDir = baseTmpDir.getRootFile();
+ File tmpDir = baseTmpDir.getFile("mempipe");
Pipeline p = MemPipeline.getInstance();
PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
- p.writeTextFile(lines, tmpDir.getAbsolutePath());
+ p.writeTextFile(lines, tmpDir.toString());
p.done();
assertTrue(tmpDir.exists());
File[] files = tmpDir.listFiles();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
index 798c262..1783677 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -136,6 +136,18 @@ public interface PCollection<S> {
PCollection<S> write(Target target);
/**
+ * Write the contents of this {@code PCollection} to the given {@code Target},
+ * using the given {@code Target.WriteMode} to handle existing
+ * targets.
+ *
+ * @param target
+ * The target
+ * @param writeMode
+ * The rule for handling existing outputs at the target location
+ */
+ PCollection<S> write(Target target, Target.WriteMode writeMode);
+
+ /**
* Returns a reference to the data set represented by this PCollection that
* may be used by the client to read the data locally.
*/
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java b/crunch/src/main/java/org/apache/crunch/PTable.java
index b754a2c..b32bd80 100644
--- a/crunch/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch/src/main/java/org/apache/crunch/PTable.java
@@ -68,6 +68,12 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
PTable<K, V> write(Target target);
/**
+ * Writes this {@code PTable} to the given {@code Target}, using the
+ * given {@code Target.WriteMode} to handle existing targets.
+ */
+ PTable<K, V> write(Target target, Target.WriteMode writeMode);
+
+ /**
* Returns the {@code PTableType} of this {@code PTable}.
*/
PTableType<K, V> getPTableType();
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
index bcf8727..af1d86a 100644
--- a/crunch/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java
@@ -63,7 +63,9 @@ public interface Pipeline {
<K, V> PTable<K, V> read(TableSource<K, V> tableSource);
/**
- * Write the given collection to the given target on the next pipeline run.
+ * Write the given collection to the given target on the next pipeline run. The
+ * system will check to see if the target's location already exists using the
+ * {@code WriteMode.DEFAULT} rule for the given {@code Target}.
*
* @param collection
* The collection
@@ -73,6 +75,22 @@ public interface Pipeline {
void write(PCollection<?> collection, Target target);
/**
+ * Write the contents of the {@code PCollection} to the given {@code Target},
+ * using the storage format specified by the target and the given
+ * {@code WriteMode} for cases where the referenced {@code Target}
+ * already exists.
+ *
+ * @param collection
+ * The collection
+ * @param target
+ * The target to write to
+ * @param writeMode
+ * The strategy to use for handling existing outputs
+ */
+ void write(PCollection<?> collection, Target target,
+ Target.WriteMode writeMode);
+
+ /**
* Create the given PCollection and read the data it contains into the
* returned Collection instance for client use.
*
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Target.java b/crunch/src/main/java/org/apache/crunch/Target.java
index ea6fd9d..0a0c23d 100644
--- a/crunch/src/main/java/org/apache/crunch/Target.java
+++ b/crunch/src/main/java/org/apache/crunch/Target.java
@@ -19,13 +19,65 @@ package org.apache.crunch;
import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
/**
- * A {@code Target} represents the output destination of a Crunch job.
- *
+ * A {@code Target} represents the output destination of a Crunch {@code PCollection}
+ * in the context of a Crunch job.
*/
public interface Target {
+
+ /**
+ * An enum to represent different options the client may specify
+ * for handling the case where the output path, table, etc. referenced
+ * by a {@code Target} already exists.
+ */
+ enum WriteMode {
+ /**
+ * Check to see if the output target already exists before running
+ * the pipeline, and if it does, print an error and throw an exception.
+ */
+ DEFAULT,
+
+ /**
+ * Check to see if the output target already exists, and if it does,
+ * delete it and overwrite it with the new output (if any).
+ */
+ OVERWRITE,
+
+ /**
+ * If the output target does not exist, create it. If it does exist,
+ * add the output of this pipeline to the target. This was the
+ * behavior in Crunch up to version 0.4.0.
+ */
+ APPEND
+ }
+
+ /**
+ * Apply the given {@code WriteMode} to this {@code Target} instance.
+ *
+ * @param writeMode The strategy for handling existing outputs
+ * @param conf The ever-useful {@code Configuration} instance
+ */
+ void handleExisting(WriteMode writeMode, Configuration conf);
+
+ /**
+ * Checks to see if this {@code Target} instance is compatible with the
+ * given {@code PType}.
+ *
+ * @param handler The {@link OutputHandler} that is managing the output for the job
+ * @param ptype The {@code PType} to check
+ * @return True if this Target can write data in the form of the given {@code PType},
+ * false otherwise
+ */
boolean accept(OutputHandler handler, PType<?> ptype);
+ /**
+ * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
+ * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
+ *
+ * @param ptype The {@code PType} to use in constructing the {@code SourceTarget}
+ * @return A new {@code SourceTarget} or null if such a {@code SourceTarget} does not exist
+ */
<T> SourceTarget<T> asSourceTarget(PType<T> ptype);
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 95c9e72..488cdd9 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -19,9 +19,11 @@ package org.apache.crunch.impl.mem;
import java.io.IOException;
import java.util.List;
+import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
@@ -30,6 +32,7 @@ import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
import org.apache.crunch.impl.mem.collect.MemCollection;
import org.apache.crunch.impl.mem.collect.MemTable;
import org.apache.crunch.io.At;
@@ -45,6 +48,7 @@ import org.apache.hadoop.mapreduce.Counters;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
public class MemPipeline implements Pipeline {
@@ -52,6 +56,8 @@ public class MemPipeline implements Pipeline {
private static Counters COUNTERS = new Counters();
private static final MemPipeline INSTANCE = new MemPipeline();
+ private int outputIndex = 0;
+
public static Counters getCounters() {
return COUNTERS;
}
@@ -103,7 +109,8 @@ public class MemPipeline implements Pipeline {
}
private Configuration conf = new Configuration();
-
+ private Set<Target> activeTargets = Sets.newHashSet();
+
private MemPipeline() {
}
@@ -149,11 +156,24 @@ public class MemPipeline implements Pipeline {
@Override
public void write(PCollection<?> collection, Target target) {
+ write(collection, target, Target.WriteMode.DEFAULT);
+ }
+
+ @Override
+ public void write(PCollection<?> collection, Target target,
+ Target.WriteMode writeMode) {
+ target.handleExisting(writeMode, getConfiguration());
+ if (writeMode != WriteMode.APPEND && activeTargets.contains(target)) {
+ throw new CrunchRuntimeException("Target " + target + " is already written in the current run." +
+ " Use WriteMode.APPEND in order to write additional data to it.");
+ }
+ activeTargets.add(target);
if (target instanceof PathTarget) {
Path path = ((PathTarget) target).getPath();
try {
FileSystem fs = path.getFileSystem(conf);
- FSDataOutputStream os = fs.create(new Path(path, "out"));
+ FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
+ outputIndex++;
if (collection instanceof PTable) {
for (Object o : collection.materialize()) {
Pair p = (Pair) o;
@@ -193,12 +213,13 @@ public class MemPipeline implements Pipeline {
@Override
public PipelineResult run() {
+ activeTargets.clear();
return PipelineResult.EMPTY;
}
@Override
public PipelineResult done() {
- return PipelineResult.EMPTY;
+ return run();
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index cc9f3fc..b1d6be5 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -142,6 +142,12 @@ public class MemCollection<S> implements PCollection<S> {
}
@Override
+ public PCollection<S> write(Target target, Target.WriteMode writeMode) {
+ getPipeline().write(this, target, writeMode);
+ return this;
+ }
+
+ @Override
public Iterable<S> materialize() {
return collect;
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 524d492..8d9649d 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -87,6 +87,12 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
}
@Override
+ public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
+ getPipeline().write(this, target, writeMode);
+ return this;
+ }
+
+ @Override
public PTableType<K, V> getPTableType() {
return ptype;
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 9c98937..2d4d137 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -34,6 +34,7 @@ import org.apache.crunch.Source;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.mr.collect.InputCollection;
import org.apache.crunch.impl.mr.collect.InputTable;
@@ -206,17 +207,36 @@ public class MRPipeline implements Pipeline {
return read(From.textFile(pathName));
}
- @SuppressWarnings("unchecked")
public void write(PCollection<?> pcollection, Target target) {
+ write(pcollection, target, Target.WriteMode.DEFAULT);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void write(PCollection<?> pcollection, Target target,
+ Target.WriteMode writeMode) {
if (pcollection instanceof PGroupedTableImpl) {
pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
} else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
pcollection = pcollection.parallelDo("UnionCollectionWrapper",
(MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
}
+ target.handleExisting(writeMode, getConfiguration());
+ if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) {
+ throw new CrunchRuntimeException("Target " + target + " is already written in current run." +
+ " Use WriteMode.APPEND in order to write additional data to it.");
+ }
addOutput((PCollectionImpl<?>) pcollection, target);
}
+ private boolean targetInCurrentRun(Target target) {
+ for (Set<Target> targets : outputTargets.values()) {
+ if (targets.contains(target)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
private void addOutput(PCollectionImpl<?> impl, Target target) {
if (!outputTargets.containsKey(impl)) {
outputTargets.put(impl, Sets.<Target> newHashSet());
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index f48308a..79b7c83 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -54,7 +54,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
private final String name;
protected MRPipeline pipeline;
- private SourceTarget<S> materializedAt;
+ protected SourceTarget<S> materializedAt;
private final ParallelDoOptions options;
public PCollectionImpl(String name) {
@@ -130,6 +130,17 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
}
@Override
+ public PCollection<S> write(Target target, Target.WriteMode writeMode) {
+ if (materializedAt != null) {
+ getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target,
+ writeMode);
+ } else {
+ getPipeline().write(this, target, writeMode);
+ }
+ return this;
+ }
+
+ @Override
public Iterable<S> materialize() {
if (getSize() == 0) {
LOG.warn("Materializing an empty PCollection: " + this.getName());
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index 69ea8a3..a41e979 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -28,7 +28,9 @@ import org.apache.crunch.PObject;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.lib.Aggregate;
import org.apache.crunch.lib.Cogroup;
import org.apache.crunch.lib.Join;
@@ -81,11 +83,27 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P
@Override
public PTable<K, V> write(Target target) {
- getPipeline().write(this, target);
+ if (getMaterializedAt() != null) {
+ getPipeline().write(new InputTable<K, V>(
+ (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target);
+ } else {
+ getPipeline().write(this, target);
+ }
return this;
}
@Override
+ public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
+ if (getMaterializedAt() != null) {
+ getPipeline().write(new InputTable<K, V>(
+ (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target, writeMode);
+ } else {
+ getPipeline().write(this, target, writeMode);
+ }
+ return this;
+ }
+
+ @Override
public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
return parallelDo(filterFn, getPTableType());
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 46a6386..c1c29e4 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -17,7 +17,12 @@
*/
package org.apache.crunch.io.impl;
+import java.io.IOException;
+
import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.FileNamingScheme;
@@ -25,12 +30,17 @@ import org.apache.crunch.io.OutputHandler;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class FileTargetImpl implements PathTarget {
+ private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
+
protected final Path path;
private final Class<? extends FileOutputFormat> outputFormatClass;
private final FileNamingScheme fileNamingScheme;
@@ -107,4 +117,46 @@ public class FileTargetImpl implements PathTarget {
// By default, assume that we cannot do this.
return null;
}
+
+ @Override
+ public void handleExisting(WriteMode strategy, Configuration conf) {
+ FileSystem fs = null;
+ try {
+ fs = FileSystem.get(conf);
+ } catch (IOException e) {
+ LOG.error("Could not retrieve FileSystem object to check for existing path", e);
+ throw new CrunchRuntimeException(e);
+ }
+
+ boolean exists = false;
+ try {
+ exists = fs.exists(path);
+ } catch (IOException e) {
+ LOG.error("Exception checking existence of path: " + path, e);
+ throw new CrunchRuntimeException(e);
+ }
+
+ if (exists) {
+ switch (strategy) {
+ case DEFAULT:
+ LOG.error("Path " + path + " already exists!");
+ throw new CrunchRuntimeException("Path already exists: " + path);
+ case OVERWRITE:
+ LOG.info("Removing data at existing path: " + path);
+ try {
+ fs.delete(path, true);
+ } catch (IOException e) {
+ LOG.error("Exception thrown removing data at path: " + path, e);
+ }
+ break;
+ case APPEND:
+ LOG.info("Adding output files to existing path: " + path);
+ break;
+ default:
+ throw new CrunchRuntimeException("Unknown WriteMode: " + strategy);
+ }
+ } else {
+ LOG.info("Will write output files to new path: " + path);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 9626b26..4d2b88a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -81,4 +81,9 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
public String toString() {
return source.toString();
}
+
+ @Override
+ public void handleExisting(WriteMode strategy, Configuration conf) {
+ target.handleExisting(strategy, conf);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/test/java/org/apache/crunch/WriteModeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/WriteModeTest.java b/crunch/src/test/java/org/apache/crunch/WriteModeTest.java
new file mode 100644
index 0000000..e99ac7b
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/WriteModeTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class WriteModeTest {
+
+ @Rule
+ public TemporaryPath tmpDir = TemporaryPaths.create();
+
+ @Test(expected=CrunchRuntimeException.class)
+ public void testDefault() throws Exception {
+ run(null, true);
+ }
+
+ @Test(expected=CrunchRuntimeException.class)
+ public void testDefaultNoRun() throws Exception {
+ run(null, false);
+ }
+
+ @Test
+ public void testOverwrite() throws Exception {
+ Path p = run(WriteMode.OVERWRITE, true);
+ PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+ assertEquals(ImmutableList.of("some", "string", "values"), lines.materialize());
+ }
+
+ @Test(expected=CrunchRuntimeException.class)
+ public void testOverwriteNoRun() throws Exception {
+ run(WriteMode.OVERWRITE, false);
+ }
+
+ @Test
+ public void testAppend() throws Exception {
+ Path p = run(WriteMode.APPEND, true);
+ PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+ assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"),
+ lines.materialize());
+ }
+
+ @Test
+ public void testAppendNoRun() throws Exception {
+ Path p = run(WriteMode.APPEND, false);
+ PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+ assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"),
+ lines.materialize());
+ }
+
+ Path run(WriteMode writeMode, boolean doRun) throws Exception {
+ Path output = tmpDir.getPath("existing");
+ FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+ if (fs.exists(output)) {
+ fs.delete(output, true);
+ }
+ Pipeline p = MemPipeline.getInstance();
+ PCollection<String> data = MemPipeline.typedCollectionOf(Avros.strings(),
+ ImmutableList.of("some", "string", "values"));
+ data.write(To.textFile(output));
+
+ if (doRun) {
+ p.run();
+ }
+
+ if (writeMode == null) {
+ data.write(To.textFile(output));
+ } else {
+ data.write(To.textFile(output), writeMode);
+ }
+
+ p.run();
+
+ return output;
+ }
+}