You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:33 UTC

[31/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
new file mode 100644
index 0000000..ccf4fb5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
+ * writing to multiple files within a single MapReduce job.
+ */
+public class CrunchOutputs<K, V> {
+  public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
+  
+  private static final char RECORD_SEP = ',';
+  private static final char FIELD_SEP = ';';
+  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+
+  public static void addNamedOutput(Job job, String name,
+      Class<? extends OutputFormat> outputFormatClass,
+      Class keyClass, Class valueClass) {
+    addNamedOutput(job, name, FormatBundle.forOutput(outputFormatClass), keyClass, valueClass);
+  }
+  
+  public static void addNamedOutput(Job job, String name,
+      FormatBundle<? extends OutputFormat> outputBundle,
+      Class keyClass, Class valueClass) {
+    Configuration conf = job.getConfiguration();
+    String inputs = JOINER.join(name, outputBundle.serialize(), keyClass.getName(), valueClass.getName());
+    String existing = conf.get(CRUNCH_OUTPUTS);
+    conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
+  }
+  
+  private static class OutputConfig<K, V> {
+    public FormatBundle<OutputFormat<K, V>> bundle;
+    public Class<K> keyClass;
+    public Class<V> valueClass;
+    
+    public OutputConfig(FormatBundle<OutputFormat<K, V>> bundle,
+        Class<K> keyClass, Class<V> valueClass) {
+      this.bundle = bundle;
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+  }
+  
+  private static Map<String, OutputConfig> getNamedOutputs(
+      TaskInputOutputContext<?, ?, ?, ?> context) {
+    Map<String, OutputConfig> out = Maps.newHashMap();
+    Configuration conf = context.getConfiguration();
+    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
+      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+      String name = fields.get(0);
+      FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1),
+          OutputFormat.class);
+      try {
+        Class<?> keyClass = Class.forName(fields.get(2));
+        Class<?> valueClass = Class.forName(fields.get(3));
+        out.put(name, new OutputConfig(bundle, keyClass, valueClass));
+      } catch (ClassNotFoundException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+    return out;
+  }
+  
+  private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+  private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
+
+  private TaskInputOutputContext<?, ?, K, V> baseContext;
+  private Map<String, OutputConfig> namedOutputs;
+  private Map<String, RecordWriter<K, V>> recordWriters;
+  private Map<String, TaskAttemptContext> taskContextCache;
+  
+  /**
+   * Creates and initializes multiple outputs support,
+   * it should be instantiated in the Mapper/Reducer setup method.
+   *
+   * @param context the TaskInputOutputContext object
+   */
+  public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
+    this.baseContext = context;
+    namedOutputs = getNamedOutputs(context);
+    recordWriters = Maps.newHashMap();
+    taskContextCache = Maps.newHashMap();
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void write(String namedOutput, K key, V value)
+      throws IOException, InterruptedException {
+    if (!namedOutputs.containsKey(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    TaskAttemptContext taskContext = getContext(namedOutput);
+    baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
+    getRecordWriter(taskContext, namedOutput).write(key, value);
+  }
+  
+  public void close() throws IOException, InterruptedException {
+    for (RecordWriter<?, ?> writer : recordWriters.values()) {
+      writer.close(baseContext);
+    }
+  }
+  
+  private TaskAttemptContext getContext(String nameOutput) throws IOException {
+    TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
+    if (taskContext != null) {
+      return taskContext;
+    }
+
+    // The following trick leverages the instantiation of a record writer via
+    // the job thus supporting arbitrary output formats.
+    OutputConfig outConfig = namedOutputs.get(nameOutput);
+    Configuration conf = new Configuration(baseContext.getConfiguration());
+    Job job = new Job(conf);
+    job.getConfiguration().set("crunch.namedoutput", nameOutput);
+    job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+    job.setOutputKeyClass(outConfig.keyClass);
+    job.setOutputValueClass(outConfig.valueClass);
+    outConfig.bundle.configure(job.getConfiguration());
+    taskContext = TaskAttemptContextFactory.create(
+      job.getConfiguration(), baseContext.getTaskAttemptID());
+
+    taskContextCache.put(nameOutput, taskContext);
+    return taskContext;
+  }
+  
+  private synchronized RecordWriter<K, V> getRecordWriter(
+      TaskAttemptContext taskContext, String namedOutput) 
+      throws IOException, InterruptedException {
+    // look for record-writer in the cache
+    RecordWriter<K, V> writer = recordWriters.get(namedOutput);
+    
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
+      try {
+        OutputFormat format = ReflectionUtils.newInstance(
+            taskContext.getOutputFormatClass(),
+            taskContext.getConfiguration());
+        writer = format.getRecordWriter(taskContext);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+      recordWriters.put(namedOutput, writer);
+    }
+    
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java b/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java
new file mode 100644
index 0000000..cf93651
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FileNamingScheme.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Encapsulates rules for naming output files. It is the responsibility of
+ * implementors to avoid file name collisions.
+ */
+public interface FileNamingScheme {
+
+  /**
+   * Get the output file name for a map task. Note that the implementation is
+   * responsible for avoiding naming collisions.
+   * 
+   * @param configuration The configuration of the job for which the map output
+   *          is being written
+   * @param outputDirectory The directory where the output will be written
+   * @return The filename for the output of the map task
+   * @throws IOException if an exception occurs while accessing the output file
+   *           system
+   */
+  String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException;
+
+  /**
+   * Get the output file name for a reduce task. Note that the implementation is
+   * responsible for avoiding naming collisions.
+   * 
+   * @param configuration The configuration of the job for which output is being
+   *          written
+   * @param outputDirectory The directory where the file will be written
+   * @param partitionId The partition of the reduce task being output
+   * @return The filename for the output of the reduce task
+   * @throws IOException if an exception occurs while accessing output file
+   *           system
+   */
+  String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java
new file mode 100644
index 0000000..5cccb7b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FileReaderFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.util.Iterator;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public interface FileReaderFactory<T> {
+  Iterator<T> read(FileSystem fs, Path path);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
new file mode 100644
index 0000000..d969009
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A combination of an {@link InputFormat} or {@link OutputFormat} and any extra 
+ * configuration information that format class needs to run.
+ * 
+ * <p>The {@code FormatBundle} allow us to let different formats act as
+ * if they are the only format that exists in a particular MapReduce job, even
+ * when we have multiple types of inputs and outputs within a single job.
+ */
+public class FormatBundle<K> implements Serializable {
+
+  private Class<K> formatClass;
+  private Map<String, String> extraConf;
+
+  public static <T> FormatBundle<T> fromSerialized(String serialized, Class<T> clazz) {
+    ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
+    try {
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      FormatBundle<T> bundle = (FormatBundle<T>) ois.readObject();
+      ois.close();
+      return bundle;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static <T extends InputFormat<?, ?>> FormatBundle<T> forInput(Class<T> inputFormatClass) {
+    return new FormatBundle<T>(inputFormatClass);
+  }
+  
+  public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> inputFormatClass) {
+    return new FormatBundle<T>(inputFormatClass);
+  }
+  
+  private FormatBundle(Class<K> formatClass) {
+    this.formatClass = formatClass;
+    this.extraConf = Maps.newHashMap();
+  }
+
+  public FormatBundle<K> set(String key, String value) {
+    this.extraConf.put(key, value);
+    return this;
+  }
+
+  public Class<K> getFormatClass() {
+    return formatClass;
+  }
+
+  public Configuration configure(Configuration conf) {
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      conf.set(e.getKey(), e.getValue());
+    }
+    return conf;
+  }
+
+  public String serialize() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(this);
+      oos.close();
+      return Base64.encodeBase64String(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getName() {
+    return formatClass.getSimpleName();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(formatClass).append(extraConf).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof FormatBundle)) {
+      return false;
+    }
+    FormatBundle<K> oib = (FormatBundle<K>) other;
+    return formatClass.equals(oib.formatClass) && extraConf.equals(oib.extraConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/From.java b/crunch-core/src/main/java/org/apache/crunch/io/From.java
new file mode 100644
index 0000000..e4cfb6a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/From.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.avro.specific.SpecificRecord;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.avro.AvroFileSource;
+import org.apache.crunch.io.impl.FileTableSourceImpl;
+import org.apache.crunch.io.seq.SeqFileSource;
+import org.apache.crunch.io.seq.SeqFileTableSource;
+import org.apache.crunch.io.text.TextFileSource;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.PTypeFamily;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+/**
+ * <p>Static factory methods for creating common {@link Source} types.</p>
+ * 
+ * <p>The {@code From} class is intended to provide a literate API for creating
+ * Crunch pipelines from common input file types.
+ * 
+ * <code>
+ *   Pipeline pipeline = new MRPipeline(this.getClass());
+ *   
+ *   // Reference the lines of a text file by wrapping the TextInputFormat class.
+ *   PCollection<String> lines = pipeline.read(From.textFile("/path/to/myfiles"));
+ *   
+ *   // Reference entries from a sequence file where the key is a LongWritable and the
+ *   // value is a custom Writable class.
+ *   PTable<LongWritable, MyWritable> table = pipeline.read(From.sequenceFile(
+ *       "/path/to/seqfiles", LongWritable.class, MyWritable.class));
+ *   
+ *   // Reference the records from an Avro file, where MyAvroObject implements Avro's
+ *   // SpecificRecord interface.
+ *   PCollection<MyAvroObject> myObjects = pipeline.read(From.avroFile("/path/to/avrofiles",
+ *       MyAvroObject.class));
+ *       
+ *   // References the key-value pairs from a custom extension of FileInputFormat:
+ *   PTable<KeyWritable, ValueWritable> custom = pipeline.read(From.formattedFile(
+ *       "/custom", MyFileInputFormat.class, KeyWritable.class, ValueWritable.class));
+ * </code>
+ * </p>
+ */
+public class From {
+
+  /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+   * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
+   * and {@code Source} factory methods.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyClass The {@code Writable} to use for the key
+   * @param valueClass The {@code Writable} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
+      String pathName, Class<? extends FileInputFormat<K, V>> formatClass,
+      Class<K> keyClass, Class<V> valueClass) {
+    return formattedFile(new Path(pathName), formatClass, keyClass, valueClass);
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+   * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
+   * and {@code Source} factory methods.
+   * 
+   * @param  The {@code Path} to the data
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyClass The {@code Writable} to use for the key
+   * @param valueClass The {@code Writable} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
+      Path path, Class<? extends FileInputFormat<K, V>> formatClass,
+      Class<K> keyClass, Class<V> valueClass) {
+    return formattedFile(path, formatClass, Writables.writables(keyClass),
+        Writables.writables(valueClass));
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+   * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
+   * and {@code Source} factory methods.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyType The {@code PType} to use for the key
+   * @param valueType The {@code PType} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> formattedFile(String pathName,
+      Class<? extends FileInputFormat<?, ?>> formatClass,
+      PType<K> keyType, PType<V> valueType) {
+    return formattedFile(new Path(pathName), formatClass, keyType, valueType);
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
+   * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
+   * and {@code Source} factory methods.
+   * 
+   * @param  The {@code Path} to the data
+   * @param formatClass The {@code FileInputFormat} implementation
+   * @param keyType The {@code PType} to use for the key
+   * @param valueType The {@code PType} to use for the value
+   * @return A new {@code TableSource<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> formattedFile(Path path,
+      Class<? extends FileInputFormat<?, ?>> formatClass,
+      PType<K> keyType, PType<V> valueType) {
+    PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
+    return new FileTableSourceImpl<K, V>(path, tableType, formatClass);
+  }
+
+  /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends SpecificRecord> Source<T> avroFile(String pathName, Class<T> avroClass) {
+    return avroFile(new Path(pathName), avroClass);  
+  }
+
+  /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends SpecificRecord> Source<T> avroFile(Path path, Class<T> avroClass) {
+    return avroFile(path, Avros.specifics(avroClass));  
+  }
+  
+  /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param avroType The {@code AvroType} for the Avro records
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
+    return avroFile(new Path(pathName), avroType);
+  }
+
+  /**
+   * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @param avroType The {@code AvroType} for the Avro records
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
+    return new AvroFileSource<T>(path, avroType);
+  }
+
+  /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends Writable> Source<T> sequenceFile(String pathName, Class<T> valueClass) {
+    return sequenceFile(new Path(pathName), valueClass);
+  }
+  
+  /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param path The {@code Path} to the data
+   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T extends Writable> Source<T> sequenceFile(Path path, Class<T> valueClass) {
+    return sequenceFile(path, Writables.writables(valueClass));
+  }
+  
+  /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param ptype The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
+    return sequenceFile(new Path(pathName), ptype);
+  }
+
+  /**
+   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
+   * from the value field of each key-value pair in the SequenceFile(s).
+   * 
+   * @param path The {@code Path} to the data
+   * @param ptype The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
+    return new SeqFileSource<T>(path, ptype);
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
+   * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
+      String pathName, Class<K> keyClass, Class<V> valueClass) {
+    return sequenceFile(new Path(pathName), keyClass, valueClass);
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
+   * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
+      Path path, Class<K> keyClass, Class<V> valueClass) {
+    return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
+  }
+  
+  /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param keyType The {@code PType} for the key of the SequenceFile entry
+   * @param valueType The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
+    return sequenceFile(new Path(pathName), keyType, valueType);
+  }
+
+  /**
+   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @param keyType The {@code PType} for the key of the SequenceFile entry
+   * @param valueType The {@code PType} for the value of the SequenceFile entry
+   * @return A new {@code SourceTable<K, V>} instance
+   */
+  public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
+    PTypeFamily ptf = keyType.getFamily();
+    return new SeqFileTableSource<K, V>(path, ptf.tableOf(keyType, valueType));
+  }
+
+  /**
+   * Creates a {@code Source<String>} instance for the text file(s) at the given path name.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @return A new {@code Source<String>} instance
+   */
+  public static Source<String> textFile(String pathName) {
+    return textFile(new Path(pathName));
+  }
+
+  /**
+   * Creates a {@code Source<String>} instance for the text file(s) at the given {@code Path}.
+   * 
+   * @param path The {@code Path} to the data
+   * @return A new {@code Source<String>} instance
+   */
+  public static Source<String> textFile(Path path) {
+    return textFile(path, Writables.strings());
+  }
+
+  /**
+   * Creates a {@code Source<T>} instance for the text file(s) at the given path name using
+   * the provided {@code PType<T>} to convert the input text.
+   * 
+   * @param pathName The name of the path to the data on the filesystem
+   * @param ptype The {@code PType<T>} to use to process the input text
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
+    return textFile(new Path(pathName), ptype);
+  }
+
+  /**
+   * Creates a {@code Source<T>} instance for the text file(s) at the given {@code Path} using
+   * the provided {@code PType<T>} to convert the input text.
+   * 
+   * @param path The {@code Path} to the data
+   * @param ptype The {@code PType<T>} to use to process the input text
+   * @return A new {@code Source<T>} instance
+   */
+  public static <T> Source<T> textFile(Path path, PType<T> ptype) {
+    return new TextFileSource<T>(path, ptype);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java
new file mode 100644
index 0000000..b484103
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/MapReduceTarget.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+public interface MapReduceTarget extends Target {
+  void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java b/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java
new file mode 100644
index 0000000..01d7f99
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/OutputHandler.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.types.PType;
+
+public interface OutputHandler {
+  boolean configure(Target target, PType<?> ptype);
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
new file mode 100644
index 0000000..7a35209
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTarget.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * A target whose output goes to a given path on a file system.
+ */
+public interface PathTarget extends MapReduceTarget {
+
+  Path getPath();
+
+  /**
+   * Get the naming scheme to be used for outputs being written to an output
+   * path.
+   * 
+   * @return the naming scheme to be used
+   */
+  FileNamingScheme getFileNamingScheme();
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
new file mode 100644
index 0000000..0be3f9a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/PathTargetImpl.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public abstract class PathTargetImpl implements PathTarget {
+
+  private final Path path;
+  private final Class<OutputFormat> outputFormatClass;
+  private final Class keyClass;
+  private final Class valueClass;
+
+  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
+    this(new Path(path), outputFormatClass, keyClass, valueClass);
+  }
+
+  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
+    this.path = path;
+    this.outputFormatClass = outputFormatClass;
+    this.keyClass = keyClass;
+    this.valueClass = valueClass;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    try {
+      FileOutputFormat.setOutputPath(job, path);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (name == null) {
+      job.setOutputFormatClass(outputFormatClass);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(valueClass);
+    } else {
+      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+    }
+  }
+
+  @Override
+  public Path getPath() {
+    return path;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
new file mode 100644
index 0000000..0407167
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSource.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.crunch.Source;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An extension of the {@code Source} interface that indicates that a
+ * {@code Source} instance may be read as a series of records by the client
+ * code. This is used to determine whether a {@code PCollection} instance can be
+ * materialized.
+ */
+public interface ReadableSource<T> extends Source<T> {
+
+  /**
+   * Returns an {@code Iterable} that contains the contents of this source.
+   * 
+   * @param conf The current {@code Configuration} instance
+   * @return the contents of this {@code Source} as an {@code Iterable} instance
+   * @throws IOException
+   */
+  Iterable<T> read(Configuration conf) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
new file mode 100644
index 0000000..95c90aa
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.SourceTarget;
+
+/**
+ * An interface that indicates that a {@code SourceTarget} instance can be read
+ * into the local client.
+ * 
+ * @param <T>
+ *          The type of data read.
+ */
+public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java b/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
new file mode 100644
index 0000000..bdda8e6
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Default {@link FileNamingScheme} that uses an incrementing sequence number in
+ * order to generate unique file names.
+ */
+public class SequentialFileNamingScheme implements FileNamingScheme {
+
+  @Override
+  public String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException {
+    return getSequentialFileName(configuration, outputDirectory, "m");
+  }
+
+  @Override
+  public String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId)
+      throws IOException {
+    return getSequentialFileName(configuration, outputDirectory, "r");
+  }
+
+  private String getSequentialFileName(Configuration configuration, Path outputDirectory, String jobTypeName)
+      throws IOException {
+    FileSystem fileSystem = outputDirectory.getFileSystem(configuration);
+    int fileSequenceNumber = fileSystem.listStatus(outputDirectory).length;
+
+    return String.format("part-%s-%05d", jobTypeName, fileSequenceNumber);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
new file mode 100644
index 0000000..f4400de
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Functions for configuring the inputs/outputs of MapReduce jobs.
+ * 
+ */
+public class SourceTargetHelper {
+
+  public static long getPathSize(Configuration conf, Path path) throws IOException {
+    return getPathSize(path.getFileSystem(conf), path);
+  }
+
+  public static long getPathSize(FileSystem fs, Path path) throws IOException {
+    FileStatus[] stati = fs.globStatus(path);
+    if (stati == null || stati.length == 0) {
+      return -1L;
+    }
+    long size = 0;
+    for (FileStatus status : stati) {
+      size += fs.getContentSummary(status.getPath()).getLength();
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/To.java b/crunch-core/src/main/java/org/apache/crunch/io/To.java
new file mode 100644
index 0000000..d62d294
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/To.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.avro.AvroFileTarget;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.io.seq.SeqFileTarget;
+import org.apache.crunch.io.text.TextFileTarget;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+/**
+ * <p>Static factory methods for creating common {@link Target} types.</p>
+ * 
+ * <p>The {@code To} class is intended to be used as part of a literate API
+ * for writing the output of Crunch pipelines to common file types. We can use
+ * the {@code Target} objects created by the factory methods in the {@code To}
+ * class with either the {@code write} method on the {@code Pipeline} class or
+ * the convenience {@code write} method on {@code PCollection} and {@code PTable}
+ * instances.
+ * 
+ * <code>
+ *   Pipeline pipeline = new MRPipeline(this.getClass());
+ *   ...
+ *   // Write a PCollection<String> to a text file:
+ *   PCollection<String> words = ...;
+ *   pipeline.write(words, To.textFile("/put/my/words/here"));
+ *   
+ *   // Write a PTable<Text, Text> to a sequence file:
+ *   PTable<Text, Text> textToText = ...;
+ *   textToText.write(To.sequenceFile("/words/to/words"));
+ *   
+ *   // Write a PCollection<MyAvroObject> to an Avro data file:
+ *   PCollection<MyAvroObject> objects = ...;
+ *   objects.write(To.avroFile("/my/avro/files"));
+ *   
+ *   // Write a PTable to a custom FileOutputFormat:
+ *   PTable<KeyWritable, ValueWritable> custom = ...;
+ *   pipeline.write(custom, To.formattedFile("/custom", MyFileFormat.class));
+ * </code>
+ * </p>
+ */
+public class To {
+
+  /**
+   * Creates a {@code Target} at the given path name that writes data to
+   * a custom {@code FileOutputFormat}.
+   * 
+   * @param pathName The name of the path to write the data to on the filesystem
+   * @param formatClass The {@code FileOutputFormat<K, V>} to write the data to
+   * @return A new {@code Target} instance
+   */
+  public static <K extends Writable, V extends Writable> Target formattedFile(
+      String pathName, Class<? extends FileOutputFormat<K, V>> formatClass) {
+    return formattedFile(new Path(pathName), formatClass);
+  }
+
+  /**
+   * Creates a {@code Target} at the given {@code Path} that writes data to
+   * a custom {@code FileOutputFormat}.
+   * 
+   * @param path The {@code Path} to write the data to
+   * @param formatClass The {@code FileOutputFormat} to write the data to
+   * @return A new {@code Target} instance
+   */
+  public static <K extends Writable, V extends Writable> Target formattedFile(
+      Path path, Class<? extends FileOutputFormat<K, V>> formatClass) {
+    return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme());
+  }
+
+  /**
+   * Creates a {@code Target} at the given path name that writes data to
+   * Avro files. The {@code PType} for the written data must be for Avro records.
+   * 
+   * @param pathName The name of the path to write the data to on the filesystem
+   * @return A new {@code Target} instance
+   */
+  public static Target avroFile(String pathName) {
+    return avroFile(new Path(pathName));
+  }
+
+  /**
+   * Creates a {@code Target} at the given {@code Path} that writes data to
+   * Avro files. The {@code PType} for the written data must be for Avro records.
+   * 
+   * @param path The {@code Path} to write the data to
+   * @return A new {@code Target} instance
+   */
+  public static Target avroFile(Path path) {
+    return new AvroFileTarget(path);
+  }
+
+  /**
+   * Creates a {@code Target} at the given path name that writes data to
+   * SequenceFiles.
+   * 
+   * @param pathName The name of the path to write the data to on the filesystem
+   * @return A new {@code Target} instance
+   */
+  public static Target sequenceFile(String pathName) {
+    return sequenceFile(new Path(pathName));
+  }
+
+  /**
+   * Creates a {@code Target} at the given {@code Path} that writes data to
+   * SequenceFiles.
+   * 
+   * @param path The {@code Path} to write the data to
+   * @return A new {@code Target} instance
+   */
+  public static Target sequenceFile(Path path) {
+    return new SeqFileTarget(path);
+  }
+
+  /**
+   * Creates a {@code Target} at the given path name that writes data to
+   * text files.
+   * 
+   * @param pathName The name of the path to write the data to on the filesystem
+   * @return A new {@code Target} instance
+   */
+  public static Target textFile(String pathName) {
+    return textFile(new Path(pathName));
+  }
+
+  /**
+   * Creates a {@code Target} at the given {@code Path} that writes data to
+   * text files.
+   * 
+   * @param path The {@code Path} to write the data to
+   * @return A new {@code Target} instance
+   */
+  public static Target textFile(Path path) {
+    return new TextFileTarget(path);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
new file mode 100644
index 0000000..c8fe23a
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.fn.IdentityFn;
+import org.apache.crunch.io.FileReaderFactory;
+import org.apache.crunch.io.impl.AutoClosingIterator;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.UnmodifiableIterator;
+
+public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
+
+  private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
+
+  private final DatumReader<T> recordReader;
+  private final MapFn<T, T> mapFn;
+
+  public AvroFileReaderFactory(AvroType<T> atype) {
+    this.recordReader = createDatumReader(atype);
+    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
+  }
+
+  public AvroFileReaderFactory(Schema schema) {
+    this.recordReader = new GenericDatumReader<T>(schema);
+    this.mapFn = IdentityFn.<T>getInstance();
+  }
+  
+  static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
+    if (avroType.hasReflect()) {
+      if (avroType.hasSpecific()) {
+        Avros.checkCombiningSpecificAndReflectionSchemas();
+      }
+      return new ReflectDatumReader<T>(avroType.getSchema());
+    } else if (avroType.hasSpecific()) {
+      return new SpecificDatumReader<T>(avroType.getSchema());
+    } else {
+      return new GenericDatumReader<T>(avroType.getSchema());
+    }
+  }
+
+  @Override
+  public Iterator<T> read(FileSystem fs, final Path path) {
+    this.mapFn.initialize();
+    try {
+      FsInput fsi = new FsInput(path, fs.getConf());
+      final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
+      return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
+        @Override
+        public boolean hasNext() {
+          return reader.hasNext();
+        }
+
+        @Override
+        public T next() {
+          return mapFn.map(reader.next());
+        }
+      });
+    } catch (IOException e) {
+      LOG.info("Could not read avro file at path: " + path, e);
+      return Iterators.emptyIterator();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
new file mode 100644
index 0000000..15792bf
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import java.io.IOException;
+
+import org.apache.avro.mapred.AvroJob;
+import org.apache.crunch.io.CompositePathIterable;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.impl.FileSourceImpl;
+import org.apache.crunch.types.avro.AvroInputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
+
+  private static <S> FormatBundle getBundle(AvroType<S> ptype) {
+    FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class)
+        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
+        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
+        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
+    return bundle;
+  }
+  
+  public AvroFileSource(Path path, AvroType<T> ptype) {
+    super(path, ptype, getBundle(ptype));
+  }
+
+  @Override
+  public String toString() {
+    return "Avro(" + path.toString() + ")";
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    FileSystem fs = path.getFileSystem(conf);
+    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
new file mode 100644
index 0000000..76103e5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.hadoop.fs.Path;
+
+public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
+  public AvroFileSourceTarget(Path path, AvroType<T> atype) {
+    this(path, atype, new SequentialFileNamingScheme());
+  }
+
+  public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) {
+    super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), fileNamingScheme);
+  }
+
+  @Override
+  public String toString() {
+    return target.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
new file mode 100644
index 0000000..3a9e42c
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.avro;
+
+import org.apache.avro.mapred.AvroWrapper;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.SequentialFileNamingScheme;
+import org.apache.crunch.io.impl.FileTargetImpl;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.avro.AvroOutputFormat;
+import org.apache.crunch.types.avro.AvroType;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Job;
+
+public class AvroFileTarget extends FileTargetImpl {
+
+  public AvroFileTarget(String path) {
+    this(new Path(path));
+  }
+
+  public AvroFileTarget(Path path) {
+    this(path, new SequentialFileNamingScheme());
+  }
+
+  public AvroFileTarget(Path path, FileNamingScheme fileNamingScheme) {
+    super(path, AvroOutputFormat.class, fileNamingScheme);
+  }
+
+  @Override
+  public String toString() {
+    return "Avro(" + path.toString() + ")";
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    if (!(ptype instanceof AvroType)) {
+      return false;
+    }
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    AvroType<?> atype = (AvroType<?>) ptype;
+    Configuration conf = job.getConfiguration();
+    String schemaParam = null;
+    if (name == null) {
+      schemaParam = "avro.output.schema";
+    } else {
+      schemaParam = "avro.output.schema." + name;
+    }
+    String outputSchema = conf.get(schemaParam);
+    if (outputSchema == null) {
+      conf.set(schemaParam, atype.getSchema().toString());
+    } else if (!outputSchema.equals(atype.getSchema().toString())) {
+      throw new IllegalStateException("Avro targets must use the same output schema");
+    }
+    Avros.configureReflectDataFactory(conf);
+    configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class,
+        outputPath, name);
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    if (ptype instanceof AvroType) {
+      return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
+    }
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
new file mode 100644
index 0000000..3bd802e
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+import com.google.common.collect.UnmodifiableIterator;
+import com.google.common.io.Closeables;
+
+/**
+ * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false.  As long a client loops through to
+ * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
+ */
+public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Closeable {
+  private final Iterator<T> iter;
+  private Closeable closeable;
+
+  public AutoClosingIterator(Closeable closeable, Iterator<T> iter) {
+    this.closeable = closeable;
+    this.iter = iter;
+  }
+
+  @Override
+  public boolean hasNext() {
+    if (!iter.hasNext()) {
+      Closeables.closeQuietly(this);
+      return false;
+    } else {
+      return true;
+    }
+  }
+
+  @Override
+  public T next() {
+    return iter.next();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (closeable != null) {
+      closeable.close();
+      closeable = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
new file mode 100644
index 0000000..688c801
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.Source;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.SourceTargetHelper;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public class FileSourceImpl<T> implements Source<T> {
+
+  private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
+
+  protected final Path path;
+  protected final PType<T> ptype;
+  protected final FormatBundle<? extends InputFormat> inputBundle;
+
+  public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
+    this.path = path;
+    this.ptype = ptype;
+    this.inputBundle = FormatBundle.forInput(inputFormatClass);
+  }
+
+  public FileSourceImpl(Path path, PType<T> ptype, FormatBundle<? extends InputFormat> inputBundle) {
+    this.path = path;
+    this.ptype = ptype;
+    this.inputBundle = inputBundle;
+  }
+
+  public Path getPath() {
+    return path;
+  }
+  
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    if (inputId == -1) {
+      FileInputFormat.addInputPath(job, path);
+      job.setInputFormatClass(inputBundle.getFormatClass());
+      inputBundle.configure(job.getConfiguration());
+    } else {
+      CrunchInputs.addInputPath(job, path, inputBundle, inputId);
+    }
+  }
+
+  @Override
+  public PType<T> getType() {
+    return ptype;
+  }
+
+  @Override
+  public long getSize(Configuration configuration) {
+    try {
+      return SourceTargetHelper.getPathSize(configuration, path);
+    } catch (IOException e) {
+      LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
+      throw new IllegalStateException("Failed to get the file size of:" + path, e);
+    }
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !getClass().equals(other.getClass())) {
+      return false;
+    }
+    FileSourceImpl o = (FileSourceImpl) other;
+    return ptype.equals(o.ptype) && path.equals(o.path) && inputBundle.equals(o.inputBundle);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(ptype).append(path).append(inputBundle).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append(inputBundle.getName()).append("(").append(path).append(")").toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
new file mode 100644
index 0000000..295edb5
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.types.PTableType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+
+public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implements TableSource<K, V> {
+
+  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, Class<? extends FileInputFormat> formatClass) {
+    super(path, tableType, formatClass);
+  }
+
+  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, FormatBundle bundle) {
+    super(path, tableType, bundle);
+  }
+  
+  @Override
+  public PTableType<K, V> getTableType() {
+    return (PTableType<K, V>) getType();
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
new file mode 100644
index 0000000..c1c29e4
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.SourceTarget;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.OutputHandler;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+public class FileTargetImpl implements PathTarget {
+
+  private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
+  
+  protected final Path path;
+  private final Class<? extends FileOutputFormat> outputFormatClass;
+  private final FileNamingScheme fileNamingScheme;
+
+  public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
+      FileNamingScheme fileNamingScheme) {
+    this.path = path;
+    this.outputFormatClass = outputFormatClass;
+    this.fileNamingScheme = fileNamingScheme;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    Converter converter = ptype.getConverter();
+    Class keyClass = converter.getKeyClass();
+    Class valueClass = converter.getValueClass();
+    configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
+  }
+
+  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
+      Class outputFormatClass, Path outputPath, String name) {
+    try {
+      FileOutputFormat.setOutputPath(job, outputPath);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+    if (name == null) {
+      job.setOutputFormatClass(outputFormatClass);
+      job.setOutputKeyClass(keyClass);
+      job.setOutputValueClass(valueClass);
+    } else {
+      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+    }
+  }
+
+  @Override
+  public boolean accept(OutputHandler handler, PType<?> ptype) {
+    handler.configure(this, ptype);
+    return true;
+  }
+
+  @Override
+  public Path getPath() {
+    return path;
+  }
+
+  @Override
+  public FileNamingScheme getFileNamingScheme() {
+    return fileNamingScheme;
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !getClass().equals(other.getClass())) {
+      return false;
+    }
+    FileTargetImpl o = (FileTargetImpl) other;
+    return path.equals(o.path);
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(path).toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
+        .toString();
+  }
+
+  @Override
+  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
+    // By default, assume that we cannot do this.
+    return null;
+  }
+
+  @Override
+  public void handleExisting(WriteMode strategy, Configuration conf) {
+    FileSystem fs = null;
+    try {
+      fs = FileSystem.get(conf);
+    } catch (IOException e) {
+      LOG.error("Could not retrieve FileSystem object to check for existing path", e);
+      throw new CrunchRuntimeException(e);
+    }
+    
+    boolean exists = false;
+    try {
+      exists = fs.exists(path);
+    } catch (IOException e) {
+      LOG.error("Exception checking existence of path: " + path, e);
+      throw new CrunchRuntimeException(e);
+    }
+    
+    if (exists) {
+      switch (strategy) {
+      case DEFAULT:
+        LOG.error("Path " + path + " already exists!");
+        throw new CrunchRuntimeException("Path already exists: " + path);
+      case OVERWRITE:
+        LOG.info("Removing data at existing path: " + path);
+        try {
+          fs.delete(path, true);
+        } catch (IOException e) {
+          LOG.error("Exception thrown removing data at path: " + path, e);
+        }
+        break;
+      case APPEND:
+        LOG.info("Adding output files to existing path: " + path);
+        break;
+      default:
+        throw new CrunchRuntimeException("Unknown WriteMode:  " + strategy);
+      }
+    } else {
+      LOG.info("Will write output files to new path: " + path);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
new file mode 100644
index 0000000..6506816
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+
+public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T> implements ReadableSourceTarget<T> {
+
+  public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
+    super(source, target, fileNamingScheme);
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return ((ReadableSource<T>) source).read(conf);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
new file mode 100644
index 0000000..f435b3b
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import java.io.IOException;
+
+import org.apache.crunch.Target;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.io.ReadableSourceTarget;
+import org.apache.hadoop.conf.Configuration;
+
+public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements ReadableSourceTarget<T> {
+
+  public ReadableSourceTargetImpl(ReadableSource<T> source, Target target) {
+    super(source, target);
+  }
+
+  @Override
+  public Iterable<T> read(Configuration conf) throws IOException {
+    return ((ReadableSource<T>) source).read(conf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
new file mode 100644
index 0000000..c0d7ce0
--- /dev/null
+++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io.impl;
+
+import org.apache.crunch.Source;
+import org.apache.crunch.io.FileNamingScheme;
+import org.apache.crunch.io.PathTarget;
+import org.apache.crunch.types.PType;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.Job;
+
+public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements PathTarget {
+
+  private final FileNamingScheme fileNamingScheme;
+
+  public SourcePathTargetImpl(Source<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
+    super(source, target);
+    this.fileNamingScheme = fileNamingScheme;
+  }
+
+  @Override
+  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
+    ((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
+  }
+
+  @Override
+  public Path getPath() {
+    return ((PathTarget) target).getPath();
+  }
+
+  @Override
+  public FileNamingScheme getFileNamingScheme() {
+    return fileNamingScheme;
+  }
+}