You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/02/11 16:16:31 UTC

git commit: CRUNCH-132: Second cut at adding a WriteMode for Targets and changing the default to fail if an output directory already exists.

Updated Branches:
  refs/heads/master 6f33d586f -> 849ad5fe4


CRUNCH-132: Second cut at adding a WriteMode for Targets and changing the default
to fail if an output directory already exists.


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/849ad5fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/849ad5fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/849ad5fe

Branch: refs/heads/master
Commit: 849ad5fe48a411a4d925556f32ba5d8a366511d4
Parents: 6f33d58
Author: Josh Wills <jw...@apache.org>
Authored: Sat Feb 9 14:02:34 2013 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Sun Feb 10 22:23:28 2013 -0800

----------------------------------------------------------------------
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |    9 ++
 .../apache/crunch/scrunch/PCollectionLike.scala    |    4 +
 .../org/apache/crunch/scrunch/PipelineLike.scala   |   22 +++
 .../crunch/impl/mem/MemPipelineFileWritingIT.java  |    4 +-
 .../main/java/org/apache/crunch/PCollection.java   |   12 ++
 crunch/src/main/java/org/apache/crunch/PTable.java |    6 +
 .../src/main/java/org/apache/crunch/Pipeline.java  |   20 +++-
 crunch/src/main/java/org/apache/crunch/Target.java |   56 ++++++++-
 .../org/apache/crunch/impl/mem/MemPipeline.java    |   27 ++++-
 .../crunch/impl/mem/collect/MemCollection.java     |    6 +
 .../apache/crunch/impl/mem/collect/MemTable.java   |    6 +
 .../java/org/apache/crunch/impl/mr/MRPipeline.java |   22 +++-
 .../crunch/impl/mr/collect/PCollectionImpl.java    |   13 ++-
 .../apache/crunch/impl/mr/collect/PTableBase.java  |   20 +++-
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |   52 ++++++++
 .../apache/crunch/io/impl/SourceTargetImpl.java    |    5 +
 .../test/java/org/apache/crunch/WriteModeTest.java |  103 +++++++++++++++
 17 files changed, 376 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 44864e8..eceb31d 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -20,6 +20,8 @@ package org.apache.crunch.io.hbase;
 import java.io.IOException;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.CrunchOutputs;
@@ -39,6 +41,8 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class HBaseTarget implements MapReduceTarget {
 
+  private static final Log LOG = LogFactory.getLog(HBaseTarget.class);
+  
   protected String table;
 
   public HBaseTarget(String table) {
@@ -110,4 +114,9 @@ public class HBaseTarget implements MapReduceTarget {
   public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
     return null;
   }
+
+  @Override
+  public void handleExisting(WriteMode strategy, Configuration conf) {
+    LOG.info("HBaseTarget ignores checks for existing outputs...");
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
index f6441ac..68fe7a5 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PCollectionLike.scala
@@ -28,6 +28,10 @@ trait PCollectionLike[S, +FullType, +NativeType <: JCollection[S]] {
 
   def write(target: Target): FullType = wrap(native.write(target))
 
+  def write(target: Target, writeMode: Target.WriteMode): FullType = {
+    wrap(native.write(target, writeMode))
+  }
+
   def parallelDo[T](fn: DoFn[S, T], ptype: PType[T]) = {
     new PCollection[T](native.parallelDo(fn, ptype))
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
----------------------------------------------------------------------
diff --git a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
index 5a10ee7..c183e5e 100644
--- a/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
+++ b/crunch-scrunch/src/main/scala/org/apache/crunch/scrunch/PipelineLike.scala
@@ -61,6 +61,17 @@ trait PipelineLike {
   def write(collection: PCollection[_], target: Target): Unit = jpipeline.write(collection.native, target)
 
   /**
+   * Writes a parallel collection to a target using an output strategy.
+   *
+   * @param collection The collection to write.
+   * @param target The destination target for this write.
+   * @param writeMode The WriteMode to use for handling existing outputs.
+   */
+  def write(collection: PCollection[_], target: Target, writeMode: Target.WriteMode): Unit = {
+    jpipeline.write(collection.native, target, writeMode)
+  }
+
+  /**
    * Writes a parallel table to a target.
    *
    * @param table The table to write.
@@ -69,6 +80,17 @@ trait PipelineLike {
   def write(table: PTable[_, _], target: Target): Unit = jpipeline.write(table.native, target)
 
   /**
+   * Writes a parallel table to a target.
+   *
+   * @param table The table to write.
+   * @param target The destination target for this write.
+   * @param writeMode The write mode to use on the target
+   */
+  def write(table: PTable[_, _], target: Target, writeMode: Target.WriteMode): Unit = {
+    jpipeline.write(table.native, target, writeMode)
+  }
+
+  /**
    * Constructs and executes a series of MapReduce jobs in order
    * to write data to the output targets.
    */

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
----------------------------------------------------------------------
diff --git a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
index dc9652d..976a43e 100644
--- a/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
+++ b/crunch/src/it/java/org/apache/crunch/impl/mem/MemPipelineFileWritingIT.java
@@ -40,10 +40,10 @@ public class MemPipelineFileWritingIT {
 
   @Test
   public void testMemPipelineFileWriter() throws Exception {
-    File tmpDir = baseTmpDir.getRootFile();
+    File tmpDir = baseTmpDir.getFile("mempipe");
     Pipeline p = MemPipeline.getInstance();
     PCollection<String> lines = MemPipeline.collectionOf("hello", "world");
-    p.writeTextFile(lines, tmpDir.getAbsolutePath());
+    p.writeTextFile(lines, tmpDir.toString());
     p.done();
     assertTrue(tmpDir.exists());
     File[] files = tmpDir.listFiles();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/PCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PCollection.java b/crunch/src/main/java/org/apache/crunch/PCollection.java
index 798c262..1783677 100644
--- a/crunch/src/main/java/org/apache/crunch/PCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/PCollection.java
@@ -136,6 +136,18 @@ public interface PCollection<S> {
   PCollection<S> write(Target target);
 
   /**
+   * Write the contents of this {@code PCollection} to the given {@code Target},
+   * using the given {@code Target.WriteMode} to handle existing
+   * targets.
+   * 
+   * @param target
+   *          The target
+   * @param writeMode
+   *          The rule for handling existing outputs at the target location
+   */
+  PCollection<S> write(Target target, Target.WriteMode writeMode);
+  
+  /**
    * Returns a reference to the data set represented by this PCollection that
    * may be used by the client to read the data locally.
    */

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/PTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/PTable.java b/crunch/src/main/java/org/apache/crunch/PTable.java
index b754a2c..b32bd80 100644
--- a/crunch/src/main/java/org/apache/crunch/PTable.java
+++ b/crunch/src/main/java/org/apache/crunch/PTable.java
@@ -68,6 +68,12 @@ public interface PTable<K, V> extends PCollection<Pair<K, V>> {
   PTable<K, V> write(Target target);
 
   /**
+   * Writes this {@code PTable} to the given {@code Target}, using the
+   * given {@code Target.WriteMode} to handle existing targets.
+   */
+  PTable<K, V> write(Target target, Target.WriteMode writeMode);
+
+  /**
    * Returns the {@code PTableType} of this {@code PTable}.
    */
   PTableType<K, V> getPTableType();

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/Pipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Pipeline.java b/crunch/src/main/java/org/apache/crunch/Pipeline.java
index bcf8727..af1d86a 100644
--- a/crunch/src/main/java/org/apache/crunch/Pipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/Pipeline.java
@@ -63,7 +63,9 @@ public interface Pipeline {
   <K, V> PTable<K, V> read(TableSource<K, V> tableSource);
 
   /**
-   * Write the given collection to the given target on the next pipeline run.
+   * Write the given collection to the given target on the next pipeline run. The
+   * system will check to see if the target's location already exists using the
+   * {@code WriteMode.DEFAULT} rule for the given {@code Target}.
    * 
    * @param collection
    *          The collection
@@ -73,6 +75,22 @@ public interface Pipeline {
   void write(PCollection<?> collection, Target target);
 
   /**
+  * Write the contents of the {@code PCollection} to the given {@code Target},
+  * using the storage format specified by the target and the given
+  * {@code WriteMode} for cases where the referenced {@code Target}
+  * already exists.
+  *
+  * @param collection
+  *          The collection
+  * @param target
+  *          The target to write to
+  * @param writeMode
+  *          The strategy to use for handling existing outputs
+  */
+ void write(PCollection<?> collection, Target target,
+     Target.WriteMode writeMode);
+
+ /**
    * Create the given PCollection and read the data it contains into the
    * returned Collection instance for client use.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/Target.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/Target.java b/crunch/src/main/java/org/apache/crunch/Target.java
index ea6fd9d..0a0c23d 100644
--- a/crunch/src/main/java/org/apache/crunch/Target.java
+++ b/crunch/src/main/java/org/apache/crunch/Target.java
@@ -19,13 +19,65 @@ package org.apache.crunch;
 
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
 
 /**
- * A {@code Target} represents the output destination of a Crunch job.
- * 
+ * A {@code Target} represents the output destination of a Crunch {@code PCollection}
+ * in the context of a Crunch job.
  */
 public interface Target {
+
+  /**
+   * An enum to represent different options the client may specify
+   * for handling the case where the output path, table, etc. referenced
+   * by a {@code Target} already exists.
+   */
+  enum WriteMode {
+    /**
+     * Check to see if the output target already exists before running
+     * the pipeline, and if it does, print an error and throw an exception.
+     */
+    DEFAULT,
+    
+    /**
+     * Check to see if the output target already exists, and if it does,
+     * delete it and overwrite it with the new output (if any).
+     */
+    OVERWRITE,
+
+    /**
+     * If the output target does not exist, create it. If it does exist,
+     * add the output of this pipeline to the target. This was the
+     * behavior in Crunch up to version 0.4.0.
+     */
+    APPEND
+  }
+
+  /**
+   * Apply the given {@code WriteMode} to this {@code Target} instance.
+   * 
+   * @param writeMode The strategy for handling existing outputs
+   * @param conf The ever-useful {@code Configuration} instance
+   */
+  void handleExisting(WriteMode writeMode, Configuration conf);
+  
+  /**
+   * Checks to see if this {@code Target} instance is compatible with the
+   * given {@code PType}.
+   * 
+   * @param handler The {@link OutputHandler} that is managing the output for the job
+   * @param ptype The {@code PType} to check
+   * @return True if this Target can write data in the form of the given {@code PType},
+   * false otherwise
+   */
   boolean accept(OutputHandler handler, PType<?> ptype);
 
+  /**
+   * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target}
+   * for the given {@code PType}, if possible. If it is not possible, return {@code null}.
+   * 
+   * @param ptype The {@code PType} to use in constructing the {@code SourceTarget}
+   * @return A new {@code SourceTarget} or null if such a {@code SourceTarget} does not exist
+   */
   <T> SourceTarget<T> asSourceTarget(PType<T> ptype);
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
index 95c9e72..488cdd9 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java
@@ -19,9 +19,11 @@ package org.apache.crunch.impl.mem;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.PCollection;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
@@ -30,6 +32,7 @@ import org.apache.crunch.PipelineResult;
 import org.apache.crunch.Source;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
 import org.apache.crunch.impl.mem.collect.MemCollection;
 import org.apache.crunch.impl.mem.collect.MemTable;
 import org.apache.crunch.io.At;
@@ -45,6 +48,7 @@ import org.apache.hadoop.mapreduce.Counters;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 public class MemPipeline implements Pipeline {
 
@@ -52,6 +56,8 @@ public class MemPipeline implements Pipeline {
   private static Counters COUNTERS = new Counters();
   private static final MemPipeline INSTANCE = new MemPipeline();
 
+  private int outputIndex = 0;
+  
   public static Counters getCounters() {
     return COUNTERS;
   }
@@ -103,7 +109,8 @@ public class MemPipeline implements Pipeline {
   }
 
   private Configuration conf = new Configuration();
-
+  private Set<Target> activeTargets = Sets.newHashSet();
+  
   private MemPipeline() {
   }
 
@@ -149,11 +156,24 @@ public class MemPipeline implements Pipeline {
 
   @Override
   public void write(PCollection<?> collection, Target target) {
+    write(collection, target, Target.WriteMode.DEFAULT);
+  }
+  
+  @Override
+  public void write(PCollection<?> collection, Target target,
+      Target.WriteMode writeMode) {
+    target.handleExisting(writeMode, getConfiguration());
+    if (writeMode != WriteMode.APPEND && activeTargets.contains(target)) {
+      throw new CrunchRuntimeException("Target " + target + " is already written in the current run." +
+          " Use WriteMode.APPEND in order to write additional data to it.");
+    }
+    activeTargets.add(target);
     if (target instanceof PathTarget) {
       Path path = ((PathTarget) target).getPath();
       try {
         FileSystem fs = path.getFileSystem(conf);
-        FSDataOutputStream os = fs.create(new Path(path, "out"));
+        FSDataOutputStream os = fs.create(new Path(path, "out" + outputIndex));
+        outputIndex++;
         if (collection instanceof PTable) {
           for (Object o : collection.materialize()) {
             Pair p = (Pair) o;
@@ -193,12 +213,13 @@ public class MemPipeline implements Pipeline {
 
   @Override
   public PipelineResult run() {
+    activeTargets.clear();
     return PipelineResult.EMPTY;
   }
 
   @Override
   public PipelineResult done() {
-    return PipelineResult.EMPTY;
+    return run();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
index cc9f3fc..b1d6be5 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemCollection.java
@@ -142,6 +142,12 @@ public class MemCollection<S> implements PCollection<S> {
   }
 
   @Override
+  public PCollection<S> write(Target target, Target.WriteMode writeMode) {
+    getPipeline().write(this, target, writeMode);
+    return this;
+  }
+
+  @Override
   public Iterable<S> materialize() {
     return collect;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
index 524d492..8d9649d 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mem/collect/MemTable.java
@@ -87,6 +87,12 @@ public class MemTable<K, V> extends MemCollection<Pair<K, V>> implements PTable<
   }
 
   @Override
+  public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
+    getPipeline().write(this, target, writeMode);
+    return this;
+  }
+  
+  @Override
   public PTableType<K, V> getPTableType() {
     return ptype;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
index 9c98937..2d4d137 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/MRPipeline.java
@@ -34,6 +34,7 @@ import org.apache.crunch.Source;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
+import org.apache.crunch.Target.WriteMode;
 import org.apache.crunch.fn.IdentityFn;
 import org.apache.crunch.impl.mr.collect.InputCollection;
 import org.apache.crunch.impl.mr.collect.InputTable;
@@ -206,17 +207,36 @@ public class MRPipeline implements Pipeline {
     return read(From.textFile(pathName));
   }
 
-  @SuppressWarnings("unchecked")
   public void write(PCollection<?> pcollection, Target target) {
+    write(pcollection, target, Target.WriteMode.DEFAULT);
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void write(PCollection<?> pcollection, Target target,
+      Target.WriteMode writeMode) {
     if (pcollection instanceof PGroupedTableImpl) {
       pcollection = ((PGroupedTableImpl<?, ?>) pcollection).ungroup();
     } else if (pcollection instanceof UnionCollection || pcollection instanceof UnionTable) {
       pcollection = pcollection.parallelDo("UnionCollectionWrapper",
           (MapFn) IdentityFn.<Object> getInstance(), pcollection.getPType());
     }
+    target.handleExisting(writeMode, getConfiguration());
+    if (writeMode != WriteMode.APPEND && targetInCurrentRun(target)) {
+      throw new CrunchRuntimeException("Target " + target + " is already written in current run." +
+          " Use WriteMode.APPEND in order to write additional data to it.");
+    }
     addOutput((PCollectionImpl<?>) pcollection, target);
   }
 
+  private boolean targetInCurrentRun(Target target) {
+    for (Set<Target> targets : outputTargets.values()) {
+      if (targets.contains(target)) {
+        return true;
+      }
+    }
+    return false;
+  }
+  
   private void addOutput(PCollectionImpl<?> impl, Target target) {
     if (!outputTargets.containsKey(impl)) {
       outputTargets.put(impl, Sets.<Target> newHashSet());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
index f48308a..79b7c83 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PCollectionImpl.java
@@ -54,7 +54,7 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
 
   private final String name;
   protected MRPipeline pipeline;
-  private SourceTarget<S> materializedAt;
+  protected SourceTarget<S> materializedAt;
   private final ParallelDoOptions options;
   
   public PCollectionImpl(String name) {
@@ -130,6 +130,17 @@ public abstract class PCollectionImpl<S> implements PCollection<S> {
   }
 
   @Override
+  public PCollection<S> write(Target target, Target.WriteMode writeMode) {
+    if (materializedAt != null) {
+      getPipeline().write(new InputCollection<S>(materializedAt, (MRPipeline) getPipeline()), target,
+          writeMode);
+    } else {
+      getPipeline().write(this, target, writeMode);
+    }
+    return this;
+  }
+  
+  @Override
   public Iterable<S> materialize() {
     if (getSize() == 0) {
       LOG.warn("Materializing an empty PCollection: " + this.getName());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
index 69ea8a3..a41e979 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/collect/PTableBase.java
@@ -28,7 +28,9 @@ import org.apache.crunch.PObject;
 import org.apache.crunch.PTable;
 import org.apache.crunch.Pair;
 import org.apache.crunch.ParallelDoOptions;
+import org.apache.crunch.TableSource;
 import org.apache.crunch.Target;
+import org.apache.crunch.impl.mr.MRPipeline;
 import org.apache.crunch.lib.Aggregate;
 import org.apache.crunch.lib.Cogroup;
 import org.apache.crunch.lib.Join;
@@ -81,11 +83,27 @@ abstract class PTableBase<K, V> extends PCollectionImpl<Pair<K, V>> implements P
 
   @Override
   public PTable<K, V> write(Target target) {
-    getPipeline().write(this, target);
+    if (getMaterializedAt() != null) {
+      getPipeline().write(new InputTable<K, V>(
+          (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target);
+    } else {
+      getPipeline().write(this, target);
+    }
     return this;
   }
 
   @Override
+  public PTable<K, V> write(Target target, Target.WriteMode writeMode) {
+    if (getMaterializedAt() != null) {
+      getPipeline().write(new InputTable<K, V>(
+          (TableSource<K, V>) getMaterializedAt(), (MRPipeline) getPipeline()), target, writeMode);
+    } else {
+      getPipeline().write(this, target, writeMode);
+    }
+    return this;
+  }
+  
+  @Override
   public PTable<K, V> filter(FilterFn<Pair<K, V>> filterFn) {
     return parallelDo(filterFn, getPTableType());
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 46a6386..c1c29e4 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -17,7 +17,12 @@
  */
 package org.apache.crunch.io.impl;
 
+import java.io.IOException;
+
 import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FileNamingScheme;
@@ -25,12 +30,17 @@ import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
 import org.apache.crunch.types.Converter;
 import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
 public class FileTargetImpl implements PathTarget {
 
+  private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
+  
   protected final Path path;
   private final Class<? extends FileOutputFormat> outputFormatClass;
   private final FileNamingScheme fileNamingScheme;
@@ -107,4 +117,46 @@ public class FileTargetImpl implements PathTarget {
     // By default, assume that we cannot do this.
     return null;
   }
+
+  @Override
+  public void handleExisting(WriteMode strategy, Configuration conf) {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(conf);
+    } catch (IOException e) {
+      LOG.error("Could not retrieve FileSystem object to check for existing path", e);
+      throw new CrunchRuntimeException(e);
+    }
+    
+    boolean exists = false;
+    try {
+      exists = fs.exists(path);
+    } catch (IOException e) {
+      LOG.error("Exception checking existence of path: " + path, e);
+      throw new CrunchRuntimeException(e);
+    }
+    
+    if (exists) {
+      switch (strategy) {
+      case DEFAULT:
+        LOG.error("Path " + path + " already exists!");
+        throw new CrunchRuntimeException("Path already exists: " + path);
+      case OVERWRITE:
+        LOG.info("Removing data at existing path: " + path);
+        try {
+          fs.delete(path, true);
+        } catch (IOException e) {
+          LOG.error("Exception thrown removing data at path: " + path, e);
+        }
+        break;
+      case APPEND:
+        LOG.info("Adding output files to existing path: " + path);
+        break;
+      default:
+        throw new CrunchRuntimeException("Unknown WriteMode:  " + strategy);
+      }
+    } else {
+      LOG.info("Will write output files to new path: " + path);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
index 9626b26..4d2b88a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java
@@ -81,4 +81,9 @@ class SourceTargetImpl<T> implements SourceTarget<T> {
   public String toString() {
     return source.toString();
   }
+
+  @Override
+  public void handleExisting(WriteMode strategy, Configuration conf) {
+    target.handleExisting(strategy, conf);  
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/849ad5fe/crunch/src/test/java/org/apache/crunch/WriteModeTest.java
----------------------------------------------------------------------
diff --git a/crunch/src/test/java/org/apache/crunch/WriteModeTest.java b/crunch/src/test/java/org/apache/crunch/WriteModeTest.java
new file mode 100644
index 0000000..e99ac7b
--- /dev/null
+++ b/crunch/src/test/java/org/apache/crunch/WriteModeTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.crunch.Target.WriteMode;
+import org.apache.crunch.impl.mem.MemPipeline;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.test.TemporaryPaths;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Rule;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+
+public class WriteModeTest {
+
+  @Rule
+  public TemporaryPath tmpDir = TemporaryPaths.create();
+
+  @Test(expected=CrunchRuntimeException.class)
+  public void testDefault() throws Exception {
+    run(null, true);
+  }
+
+  @Test(expected=CrunchRuntimeException.class)
+  public void testDefaultNoRun() throws Exception {
+    run(null, false);
+  }
+  
+  @Test
+  public void testOverwrite() throws Exception {
+    Path p = run(WriteMode.OVERWRITE, true);
+    PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+    assertEquals(ImmutableList.of("some", "string", "values"), lines.materialize());
+  }
+  
+  @Test(expected=CrunchRuntimeException.class)
+  public void testOverwriteNoRun() throws Exception {
+    run(WriteMode.OVERWRITE, false);
+  }
+  
+  @Test
+  public void testAppend() throws Exception {
+    Path p = run(WriteMode.APPEND, true);
+    PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+    assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"),
+        lines.materialize());
+  }
+  
+  @Test
+  public void testAppendNoRun() throws Exception {
+    Path p = run(WriteMode.APPEND, false);
+    PCollection<String> lines = MemPipeline.getInstance().readTextFile(p.toString());
+    assertEquals(ImmutableList.of("some", "string", "values", "some", "string", "values"),
+        lines.materialize());
+  }
+  
+  Path run(WriteMode writeMode, boolean doRun) throws Exception {
+    Path output = tmpDir.getPath("existing");
+    FileSystem fs = FileSystem.get(tmpDir.getDefaultConfiguration());
+    if (fs.exists(output)) {
+      fs.delete(output, true);
+    }
+    Pipeline p = MemPipeline.getInstance();
+    PCollection<String> data = MemPipeline.typedCollectionOf(Avros.strings(),
+        ImmutableList.of("some", "string", "values"));
+    data.write(To.textFile(output));
+
+    if (doRun) {
+      p.run();
+    }
+    
+    if (writeMode == null) {
+      data.write(To.textFile(output));
+    } else {
+      data.write(To.textFile(output), writeMode);
+    }
+    
+    p.run();
+    
+    return output;
+  }
+}