You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/04/23 22:41:12 UTC

[10/43] CRUNCH-196: crunch -> crunch-core rename to fix build issues

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
deleted file mode 100644
index ccf4fb5..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-/**
- * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
- * writing to multiple files within a single MapReduce job.
- */
-public class CrunchOutputs<K, V> {
-  public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
-  
-  private static final char RECORD_SEP = ',';
-  private static final char FIELD_SEP = ';';
-  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
-  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
-
-  public static void addNamedOutput(Job job, String name,
-      Class<? extends OutputFormat> outputFormatClass,
-      Class keyClass, Class valueClass) {
-    addNamedOutput(job, name, FormatBundle.forOutput(outputFormatClass), keyClass, valueClass);
-  }
-  
-  public static void addNamedOutput(Job job, String name,
-      FormatBundle<? extends OutputFormat> outputBundle,
-      Class keyClass, Class valueClass) {
-    Configuration conf = job.getConfiguration();
-    String inputs = JOINER.join(name, outputBundle.serialize(), keyClass.getName(), valueClass.getName());
-    String existing = conf.get(CRUNCH_OUTPUTS);
-    conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
-  }
-  
-  private static class OutputConfig<K, V> {
-    public FormatBundle<OutputFormat<K, V>> bundle;
-    public Class<K> keyClass;
-    public Class<V> valueClass;
-    
-    public OutputConfig(FormatBundle<OutputFormat<K, V>> bundle,
-        Class<K> keyClass, Class<V> valueClass) {
-      this.bundle = bundle;
-      this.keyClass = keyClass;
-      this.valueClass = valueClass;
-    }
-  }
-  
-  private static Map<String, OutputConfig> getNamedOutputs(
-      TaskInputOutputContext<?, ?, ?, ?> context) {
-    Map<String, OutputConfig> out = Maps.newHashMap();
-    Configuration conf = context.getConfiguration();
-    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
-      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
-      String name = fields.get(0);
-      FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1),
-          OutputFormat.class);
-      try {
-        Class<?> keyClass = Class.forName(fields.get(2));
-        Class<?> valueClass = Class.forName(fields.get(3));
-        out.put(name, new OutputConfig(bundle, keyClass, valueClass));
-      } catch (ClassNotFoundException e) {
-        throw new CrunchRuntimeException(e);
-      }
-    }
-    return out;
-  }
-  
-  private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
-  private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
-
-  private TaskInputOutputContext<?, ?, K, V> baseContext;
-  private Map<String, OutputConfig> namedOutputs;
-  private Map<String, RecordWriter<K, V>> recordWriters;
-  private Map<String, TaskAttemptContext> taskContextCache;
-  
-  /**
-   * Creates and initializes multiple outputs support,
-   * it should be instantiated in the Mapper/Reducer setup method.
-   *
-   * @param context the TaskInputOutputContext object
-   */
-  public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
-    this.baseContext = context;
-    namedOutputs = getNamedOutputs(context);
-    recordWriters = Maps.newHashMap();
-    taskContextCache = Maps.newHashMap();
-  }
-  
-  @SuppressWarnings("unchecked")
-  public void write(String namedOutput, K key, V value)
-      throws IOException, InterruptedException {
-    if (!namedOutputs.containsKey(namedOutput)) {
-      throw new IllegalArgumentException("Undefined named output '" +
-        namedOutput + "'");
-    }
-    TaskAttemptContext taskContext = getContext(namedOutput);
-    baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
-    getRecordWriter(taskContext, namedOutput).write(key, value);
-  }
-  
-  public void close() throws IOException, InterruptedException {
-    for (RecordWriter<?, ?> writer : recordWriters.values()) {
-      writer.close(baseContext);
-    }
-  }
-  
-  private TaskAttemptContext getContext(String nameOutput) throws IOException {
-    TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
-    if (taskContext != null) {
-      return taskContext;
-    }
-
-    // The following trick leverages the instantiation of a record writer via
-    // the job thus supporting arbitrary output formats.
-    OutputConfig outConfig = namedOutputs.get(nameOutput);
-    Configuration conf = new Configuration(baseContext.getConfiguration());
-    Job job = new Job(conf);
-    job.getConfiguration().set("crunch.namedoutput", nameOutput);
-    job.setOutputFormatClass(outConfig.bundle.getFormatClass());
-    job.setOutputKeyClass(outConfig.keyClass);
-    job.setOutputValueClass(outConfig.valueClass);
-    outConfig.bundle.configure(job.getConfiguration());
-    taskContext = TaskAttemptContextFactory.create(
-      job.getConfiguration(), baseContext.getTaskAttemptID());
-
-    taskContextCache.put(nameOutput, taskContext);
-    return taskContext;
-  }
-  
-  private synchronized RecordWriter<K, V> getRecordWriter(
-      TaskAttemptContext taskContext, String namedOutput) 
-      throws IOException, InterruptedException {
-    // look for record-writer in the cache
-    RecordWriter<K, V> writer = recordWriters.get(namedOutput);
-    
-    // If not in cache, create a new one
-    if (writer == null) {
-      // get the record writer from context output format
-      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
-      try {
-        OutputFormat format = ReflectionUtils.newInstance(
-            taskContext.getOutputFormatClass(),
-            taskContext.getConfiguration());
-        writer = format.getRecordWriter(taskContext);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
-      }
-      recordWriters.put(namedOutput, writer);
-    }
-    
-    return writer;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
deleted file mode 100644
index cf93651..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/FileNamingScheme.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Encapsulates rules for naming output files. It is the responsibility of
- * implementors to avoid file name collisions.
- */
-public interface FileNamingScheme {
-
-  /**
-   * Get the output file name for a map task. Note that the implementation is
-   * responsible for avoiding naming collisions.
-   * 
-   * @param configuration The configuration of the job for which the map output
-   *          is being written
-   * @param outputDirectory The directory where the output will be written
-   * @return The filename for the output of the map task
-   * @throws IOException if an exception occurs while accessing the output file
-   *           system
-   */
-  String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException;
-
-  /**
-   * Get the output file name for a reduce task. Note that the implementation is
-   * responsible for avoiding naming collisions.
-   * 
-   * @param configuration The configuration of the job for which output is being
-   *          written
-   * @param outputDirectory The directory where the file will be written
-   * @param partitionId The partition of the reduce task being output
-   * @return The filename for the output of the reduce task
-   * @throws IOException if an exception occurs while accessing output file
-   *           system
-   */
-  String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId) throws IOException;
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
deleted file mode 100644
index 5cccb7b..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/FileReaderFactory.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.util.Iterator;
-
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public interface FileReaderFactory<T> {
-  Iterator<T> read(FileSystem fs, Path path);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
deleted file mode 100644
index d969009..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.OutputFormat;
-
-import com.google.common.collect.Maps;
-
-/**
- * A combination of an {@link InputFormat} or {@link OutputFormat} and any extra 
- * configuration information that format class needs to run.
- * 
- * <p>The {@code FormatBundle} allow us to let different formats act as
- * if they are the only format that exists in a particular MapReduce job, even
- * when we have multiple types of inputs and outputs within a single job.
- */
-public class FormatBundle<K> implements Serializable {
-
-  private Class<K> formatClass;
-  private Map<String, String> extraConf;
-
-  public static <T> FormatBundle<T> fromSerialized(String serialized, Class<T> clazz) {
-    ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
-    try {
-      ObjectInputStream ois = new ObjectInputStream(bais);
-      FormatBundle<T> bundle = (FormatBundle<T>) ois.readObject();
-      ois.close();
-      return bundle;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static <T extends InputFormat<?, ?>> FormatBundle<T> forInput(Class<T> inputFormatClass) {
-    return new FormatBundle<T>(inputFormatClass);
-  }
-  
-  public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> inputFormatClass) {
-    return new FormatBundle<T>(inputFormatClass);
-  }
-  
-  private FormatBundle(Class<K> formatClass) {
-    this.formatClass = formatClass;
-    this.extraConf = Maps.newHashMap();
-  }
-
-  public FormatBundle<K> set(String key, String value) {
-    this.extraConf.put(key, value);
-    return this;
-  }
-
-  public Class<K> getFormatClass() {
-    return formatClass;
-  }
-
-  public Configuration configure(Configuration conf) {
-    for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      conf.set(e.getKey(), e.getValue());
-    }
-    return conf;
-  }
-
-  public String serialize() {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
-      oos.writeObject(this);
-      oos.close();
-      return Base64.encodeBase64String(baos.toByteArray());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public String getName() {
-    return formatClass.getSimpleName();
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(formatClass).append(extraConf).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof FormatBundle)) {
-      return false;
-    }
-    FormatBundle<K> oib = (FormatBundle<K>) other;
-    return formatClass.equals(oib.formatClass) && extraConf.equals(oib.extraConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/From.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/From.java b/crunch/src/main/java/org/apache/crunch/io/From.java
deleted file mode 100644
index e4cfb6a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/From.java
+++ /dev/null
@@ -1,324 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.avro.specific.SpecificRecord;
-import org.apache.crunch.Source;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.io.avro.AvroFileSource;
-import org.apache.crunch.io.impl.FileTableSourceImpl;
-import org.apache.crunch.io.seq.SeqFileSource;
-import org.apache.crunch.io.seq.SeqFileTableSource;
-import org.apache.crunch.io.text.TextFileSource;
-import org.apache.crunch.types.PTableType;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.PTypeFamily;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.crunch.types.writable.Writables;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-/**
- * <p>Static factory methods for creating common {@link Source} types.</p>
- * 
- * <p>The {@code From} class is intended to provide a literate API for creating
- * Crunch pipelines from common input file types.
- * 
- * <code>
- *   Pipeline pipeline = new MRPipeline(this.getClass());
- *   
- *   // Reference the lines of a text file by wrapping the TextInputFormat class.
- *   PCollection<String> lines = pipeline.read(From.textFile("/path/to/myfiles"));
- *   
- *   // Reference entries from a sequence file where the key is a LongWritable and the
- *   // value is a custom Writable class.
- *   PTable<LongWritable, MyWritable> table = pipeline.read(From.sequenceFile(
- *       "/path/to/seqfiles", LongWritable.class, MyWritable.class));
- *   
- *   // Reference the records from an Avro file, where MyAvroObject implements Avro's
- *   // SpecificRecord interface.
- *   PCollection<MyAvroObject> myObjects = pipeline.read(From.avroFile("/path/to/avrofiles",
- *       MyAvroObject.class));
- *       
- *   // References the key-value pairs from a custom extension of FileInputFormat:
- *   PTable<KeyWritable, ValueWritable> custom = pipeline.read(From.formattedFile(
- *       "/custom", MyFileInputFormat.class, KeyWritable.class, ValueWritable.class));
- * </code>
- * </p>
- */
-public class From {
-
-  /**
-   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
-   * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
-   * and {@code Source} factory methods.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param formatClass The {@code FileInputFormat} implementation
-   * @param keyClass The {@code Writable} to use for the key
-   * @param valueClass The {@code Writable} to use for the value
-   * @return A new {@code TableSource<K, V>} instance
-   */
-  public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
-      String pathName, Class<? extends FileInputFormat<K, V>> formatClass,
-      Class<K> keyClass, Class<V> valueClass) {
-    return formattedFile(new Path(pathName), formatClass, keyClass, valueClass);
-  }
-
-  /**
-   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
-   * {@code FileInputFormat<K, V>} implementations not covered by the provided {@code TableSource}
-   * and {@code Source} factory methods.
-   * 
-   * @param  The {@code Path} to the data
-   * @param formatClass The {@code FileInputFormat} implementation
-   * @param keyClass The {@code Writable} to use for the key
-   * @param valueClass The {@code Writable} to use for the value
-   * @return A new {@code TableSource<K, V>} instance
-   */
-  public static <K extends Writable, V extends Writable> TableSource<K, V> formattedFile(
-      Path path, Class<? extends FileInputFormat<K, V>> formatClass,
-      Class<K> keyClass, Class<V> valueClass) {
-    return formattedFile(path, formatClass, Writables.writables(keyClass),
-        Writables.writables(valueClass));
-  }
-
-  /**
-   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
-   * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
-   * and {@code Source} factory methods.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param formatClass The {@code FileInputFormat} implementation
-   * @param keyType The {@code PType} to use for the key
-   * @param valueType The {@code PType} to use for the value
-   * @return A new {@code TableSource<K, V>} instance
-   */
-  public static <K, V> TableSource<K, V> formattedFile(String pathName,
-      Class<? extends FileInputFormat<?, ?>> formatClass,
-      PType<K> keyType, PType<V> valueType) {
-    return formattedFile(new Path(pathName), formatClass, keyType, valueType);
-  }
-
-  /**
-   * Creates a {@code TableSource<K, V>} for reading data from files that have custom
-   * {@code FileInputFormat} implementations not covered by the provided {@code TableSource}
-   * and {@code Source} factory methods.
-   * 
-   * @param  The {@code Path} to the data
-   * @param formatClass The {@code FileInputFormat} implementation
-   * @param keyType The {@code PType} to use for the key
-   * @param valueType The {@code PType} to use for the value
-   * @return A new {@code TableSource<K, V>} instance
-   */
-  public static <K, V> TableSource<K, V> formattedFile(Path path,
-      Class<? extends FileInputFormat<?, ?>> formatClass,
-      PType<K> keyType, PType<V> valueType) {
-    PTableType<K, V> tableType = keyType.getFamily().tableOf(keyType, valueType);
-    return new FileTableSourceImpl<K, V>(path, tableType, formatClass);
-  }
-
-  /**
-   * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T extends SpecificRecord> Source<T> avroFile(String pathName, Class<T> avroClass) {
-    return avroFile(new Path(pathName), avroClass);  
-  }
-
-  /**
-   * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @param avroClass The subclass of {@code SpecificRecord} to use for the Avro file
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T extends SpecificRecord> Source<T> avroFile(Path path, Class<T> avroClass) {
-    return avroFile(path, Avros.specifics(avroClass));  
-  }
-  
-  /**
-   * Creates a {@code Source<T>} instance from the Avro file(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param avroType The {@code AvroType} for the Avro records
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T> Source<T> avroFile(String pathName, AvroType<T> avroType) {
-    return avroFile(new Path(pathName), avroType);
-  }
-
-  /**
-   * Creates a {@code Source<T>} instance from the Avro file(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @param avroType The {@code AvroType} for the Avro records
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T> Source<T> avroFile(Path path, AvroType<T> avroType) {
-    return new AvroFileSource<T>(path, avroType);
-  }
-
-  /**
-   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T extends Writable> Source<T> sequenceFile(String pathName, Class<T> valueClass) {
-    return sequenceFile(new Path(pathName), valueClass);
-  }
-  
-  /**
-   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param path The {@code Path} to the data
-   * @param valueClass The {@code Writable} type for the value of the SequenceFile entry
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T extends Writable> Source<T> sequenceFile(Path path, Class<T> valueClass) {
-    return sequenceFile(path, Writables.writables(valueClass));
-  }
-  
-  /**
-   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given path name
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param ptype The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T> Source<T> sequenceFile(String pathName, PType<T> ptype) {
-    return sequenceFile(new Path(pathName), ptype);
-  }
-
-  /**
-   * Creates a {@code Source<T>} instance from the SequenceFile(s) at the given {@code Path}
-   * from the value field of each key-value pair in the SequenceFile(s).
-   * 
-   * @param path The {@code Path} to the data
-   * @param ptype The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T> Source<T> sequenceFile(Path path, PType<T> ptype) {
-    return new SeqFileSource<T>(path, ptype);
-  }
-
-  /**
-   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
-   * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
-   * @return A new {@code SourceTable<K, V>} instance
-   */
-  public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
-      String pathName, Class<K> keyClass, Class<V> valueClass) {
-    return sequenceFile(new Path(pathName), keyClass, valueClass);
-  }
-
-  /**
-   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @param keyClass The {@code Writable} subclass for the key of the SequenceFile entry
-   * @param valueClass The {@code Writable} subclass for the value of the SequenceFile entry
-   * @return A new {@code SourceTable<K, V>} instance
-   */
-  public static <K extends Writable, V extends Writable> TableSource<K, V> sequenceFile(
-      Path path, Class<K> keyClass, Class<V> valueClass) {
-    return sequenceFile(path, Writables.writables(keyClass), Writables.writables(valueClass));
-  }
-  
-  /**
-   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param keyType The {@code PType} for the key of the SequenceFile entry
-   * @param valueType The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code SourceTable<K, V>} instance
-   */
-  public static <K, V> TableSource<K, V> sequenceFile(String pathName, PType<K> keyType, PType<V> valueType) {
-    return sequenceFile(new Path(pathName), keyType, valueType);
-  }
-
-  /**
-   * Creates a {@code TableSource<K, V>} instance for the SequenceFile(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @param keyType The {@code PType} for the key of the SequenceFile entry
-   * @param valueType The {@code PType} for the value of the SequenceFile entry
-   * @return A new {@code SourceTable<K, V>} instance
-   */
-  public static <K, V> TableSource<K, V> sequenceFile(Path path, PType<K> keyType, PType<V> valueType) {
-    PTypeFamily ptf = keyType.getFamily();
-    return new SeqFileTableSource<K, V>(path, ptf.tableOf(keyType, valueType));
-  }
-
-  /**
-   * Creates a {@code Source<String>} instance for the text file(s) at the given path name.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @return A new {@code Source<String>} instance
-   */
-  public static Source<String> textFile(String pathName) {
-    return textFile(new Path(pathName));
-  }
-
-  /**
-   * Creates a {@code Source<String>} instance for the text file(s) at the given {@code Path}.
-   * 
-   * @param path The {@code Path} to the data
-   * @return A new {@code Source<String>} instance
-   */
-  public static Source<String> textFile(Path path) {
-    return textFile(path, Writables.strings());
-  }
-
-  /**
-   * Creates a {@code Source<T>} instance for the text file(s) at the given path name using
-   * the provided {@code PType<T>} to convert the input text.
-   * 
-   * @param pathName The name of the path to the data on the filesystem
-   * @param ptype The {@code PType<T>} to use to process the input text
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T> Source<T> textFile(String pathName, PType<T> ptype) {
-    return textFile(new Path(pathName), ptype);
-  }
-
-  /**
-   * Creates a {@code Source<T>} instance for the text file(s) at the given {@code Path} using
-   * the provided {@code PType<T>} to convert the input text.
-   * 
-   * @param path The {@code Path} to the data
-   * @param ptype The {@code PType<T>} to use to process the input text
-   * @return A new {@code Source<T>} instance
-   */
-  public static <T> Source<T> textFile(Path path, PType<T> ptype) {
-    return new TextFileSource<T>(path, ptype);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java b/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
deleted file mode 100644
index b484103..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/MapReduceTarget.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-public interface MapReduceTarget extends Target {
-  void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java b/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
deleted file mode 100644
index 01d7f99..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/OutputHandler.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.types.PType;
-
-public interface OutputHandler {
-  boolean configure(Target target, PType<?> ptype);
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java b/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
deleted file mode 100644
index 7a35209..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/PathTarget.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.hadoop.fs.Path;
-
-/**
- * A target whose output goes to a given path on a file system.
- */
-public interface PathTarget extends MapReduceTarget {
-
-  Path getPath();
-
-  /**
-   * Get the naming scheme to be used for outputs being written to an output
-   * path.
-   * 
-   * @return the naming scheme to be used
-   */
-  FileNamingScheme getFileNamingScheme();
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
deleted file mode 100644
index 0be3f9a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public abstract class PathTargetImpl implements PathTarget {
-
-  private final Path path;
-  private final Class<OutputFormat> outputFormatClass;
-  private final Class keyClass;
-  private final Class valueClass;
-
-  public PathTargetImpl(String path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
-    this(new Path(path), outputFormatClass, keyClass, valueClass);
-  }
-
-  public PathTargetImpl(Path path, Class<OutputFormat> outputFormatClass, Class keyClass, Class valueClass) {
-    this.path = path;
-    this.outputFormatClass = outputFormatClass;
-    this.keyClass = keyClass;
-    this.valueClass = valueClass;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    try {
-      FileOutputFormat.setOutputPath(job, path);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    if (name == null) {
-      job.setOutputFormatClass(outputFormatClass);
-      job.setOutputKeyClass(keyClass);
-      job.setOutputValueClass(valueClass);
-    } else {
-      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
-    }
-  }
-
-  @Override
-  public Path getPath() {
-    return path;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
deleted file mode 100644
index 0407167..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSource.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.crunch.Source;
-import org.apache.hadoop.conf.Configuration;
-
-/**
- * An extension of the {@code Source} interface that indicates that a
- * {@code Source} instance may be read as a series of records by the client
- * code. This is used to determine whether a {@code PCollection} instance can be
- * materialized.
- */
-public interface ReadableSource<T> extends Source<T> {
-
-  /**
-   * Returns an {@code Iterable} that contains the contents of this source.
-   * 
-   * @param conf The current {@code Configuration} instance
-   * @return the contents of this {@code Source} as an {@code Iterable} instance
-   * @throws IOException
-   */
-  Iterable<T> read(Configuration conf) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
deleted file mode 100644
index 95c90aa..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.SourceTarget;
-
-/**
- * An interface that indicates that a {@code SourceTarget} instance can be read
- * into the local client.
- * 
- * @param <T>
- *          The type of data read.
- */
-public interface ReadableSourceTarget<T> extends ReadableSource<T>, SourceTarget<T> {
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java b/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
deleted file mode 100644
index bdda8e6..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/SequentialFileNamingScheme.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Default {@link FileNamingScheme} that uses an incrementing sequence number in
- * order to generate unique file names.
- */
-public class SequentialFileNamingScheme implements FileNamingScheme {
-
-  @Override
-  public String getMapOutputName(Configuration configuration, Path outputDirectory) throws IOException {
-    return getSequentialFileName(configuration, outputDirectory, "m");
-  }
-
-  @Override
-  public String getReduceOutputName(Configuration configuration, Path outputDirectory, int partitionId)
-      throws IOException {
-    return getSequentialFileName(configuration, outputDirectory, "r");
-  }
-
-  private String getSequentialFileName(Configuration configuration, Path outputDirectory, String jobTypeName)
-      throws IOException {
-    FileSystem fileSystem = outputDirectory.getFileSystem(configuration);
-    int fileSequenceNumber = fileSystem.listStatus(outputDirectory).length;
-
-    return String.format("part-%s-%05d", jobTypeName, fileSequenceNumber);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
deleted file mode 100644
index f4400de..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-/**
- * Functions for configuring the inputs/outputs of MapReduce jobs.
- * 
- */
-public class SourceTargetHelper {
-
-  public static long getPathSize(Configuration conf, Path path) throws IOException {
-    return getPathSize(path.getFileSystem(conf), path);
-  }
-
-  public static long getPathSize(FileSystem fs, Path path) throws IOException {
-    FileStatus[] stati = fs.globStatus(path);
-    if (stati == null || stati.length == 0) {
-      return -1L;
-    }
-    long size = 0;
-    for (FileStatus status : stati) {
-      size += fs.getContentSummary(status.getPath()).getLength();
-    }
-    return size;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/To.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/To.java b/crunch/src/main/java/org/apache/crunch/io/To.java
deleted file mode 100644
index d62d294..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/To.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.io.avro.AvroFileTarget;
-import org.apache.crunch.io.impl.FileTargetImpl;
-import org.apache.crunch.io.seq.SeqFileTarget;
-import org.apache.crunch.io.text.TextFileTarget;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-/**
- * <p>Static factory methods for creating common {@link Target} types.</p>
- * 
- * <p>The {@code To} class is intended to be used as part of a literate API
- * for writing the output of Crunch pipelines to common file types. We can use
- * the {@code Target} objects created by the factory methods in the {@code To}
- * class with either the {@code write} method on the {@code Pipeline} class or
- * the convenience {@code write} method on {@code PCollection} and {@code PTable}
- * instances.
- * 
- * <code>
- *   Pipeline pipeline = new MRPipeline(this.getClass());
- *   ...
- *   // Write a PCollection<String> to a text file:
- *   PCollection<String> words = ...;
- *   pipeline.write(words, To.textFile("/put/my/words/here"));
- *   
- *   // Write a PTable<Text, Text> to a sequence file:
- *   PTable<Text, Text> textToText = ...;
- *   textToText.write(To.sequenceFile("/words/to/words"));
- *   
- *   // Write a PCollection<MyAvroObject> to an Avro data file:
- *   PCollection<MyAvroObject> objects = ...;
- *   objects.write(To.avroFile("/my/avro/files"));
- *   
- *   // Write a PTable to a custom FileOutputFormat:
- *   PTable<KeyWritable, ValueWritable> custom = ...;
- *   pipeline.write(custom, To.formattedFile("/custom", MyFileFormat.class));
- * </code>
- * </p>
- */
-public class To {
-
-  /**
-   * Creates a {@code Target} at the given path name that writes data to
-   * a custom {@code FileOutputFormat}.
-   * 
-   * @param pathName The name of the path to write the data to on the filesystem
-   * @param formatClass The {@code FileOutputFormat<K, V>} to write the data to
-   * @return A new {@code Target} instance
-   */
-  public static <K extends Writable, V extends Writable> Target formattedFile(
-      String pathName, Class<? extends FileOutputFormat<K, V>> formatClass) {
-    return formattedFile(new Path(pathName), formatClass);
-  }
-
-  /**
-   * Creates a {@code Target} at the given {@code Path} that writes data to
-   * a custom {@code FileOutputFormat}.
-   * 
-   * @param path The {@code Path} to write the data to
-   * @param formatClass The {@code FileOutputFormat} to write the data to
-   * @return A new {@code Target} instance
-   */
-  public static <K extends Writable, V extends Writable> Target formattedFile(
-      Path path, Class<? extends FileOutputFormat<K, V>> formatClass) {
-    return new FileTargetImpl(path, formatClass, new SequentialFileNamingScheme());
-  }
-
-  /**
-   * Creates a {@code Target} at the given path name that writes data to
-   * Avro files. The {@code PType} for the written data must be for Avro records.
-   * 
-   * @param pathName The name of the path to write the data to on the filesystem
-   * @return A new {@code Target} instance
-   */
-  public static Target avroFile(String pathName) {
-    return avroFile(new Path(pathName));
-  }
-
-  /**
-   * Creates a {@code Target} at the given {@code Path} that writes data to
-   * Avro files. The {@code PType} for the written data must be for Avro records.
-   * 
-   * @param path The {@code Path} to write the data to
-   * @return A new {@code Target} instance
-   */
-  public static Target avroFile(Path path) {
-    return new AvroFileTarget(path);
-  }
-
-  /**
-   * Creates a {@code Target} at the given path name that writes data to
-   * SequenceFiles.
-   * 
-   * @param pathName The name of the path to write the data to on the filesystem
-   * @return A new {@code Target} instance
-   */
-  public static Target sequenceFile(String pathName) {
-    return sequenceFile(new Path(pathName));
-  }
-
-  /**
-   * Creates a {@code Target} at the given {@code Path} that writes data to
-   * SequenceFiles.
-   * 
-   * @param path The {@code Path} to write the data to
-   * @return A new {@code Target} instance
-   */
-  public static Target sequenceFile(Path path) {
-    return new SeqFileTarget(path);
-  }
-
-  /**
-   * Creates a {@code Target} at the given path name that writes data to
-   * text files.
-   * 
-   * @param pathName The name of the path to write the data to on the filesystem
-   * @return A new {@code Target} instance
-   */
-  public static Target textFile(String pathName) {
-    return textFile(new Path(pathName));
-  }
-
-  /**
-   * Creates a {@code Target} at the given {@code Path} that writes data to
-   * text files.
-   * 
-   * @param path The {@code Path} to write the data to
-   * @return A new {@code Target} instance
-   */
-  public static Target textFile(Path path) {
-    return new TextFileTarget(path);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
deleted file mode 100644
index c8fe23a..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileReaderFactory.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.avro.Schema;
-import org.apache.avro.file.DataFileReader;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.mapred.FsInput;
-import org.apache.avro.reflect.ReflectDatumReader;
-import org.apache.avro.specific.SpecificDatumReader;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.MapFn;
-import org.apache.crunch.fn.IdentityFn;
-import org.apache.crunch.io.FileReaderFactory;
-import org.apache.crunch.io.impl.AutoClosingIterator;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Iterators;
-import com.google.common.collect.UnmodifiableIterator;
-
-public class AvroFileReaderFactory<T> implements FileReaderFactory<T> {
-
-  private static final Log LOG = LogFactory.getLog(AvroFileReaderFactory.class);
-
-  private final DatumReader<T> recordReader;
-  private final MapFn<T, T> mapFn;
-
-  public AvroFileReaderFactory(AvroType<T> atype) {
-    this.recordReader = createDatumReader(atype);
-    this.mapFn = (MapFn<T, T>) atype.getInputMapFn();
-  }
-
-  public AvroFileReaderFactory(Schema schema) {
-    this.recordReader = new GenericDatumReader<T>(schema);
-    this.mapFn = IdentityFn.<T>getInstance();
-  }
-  
-  static <T> DatumReader<T> createDatumReader(AvroType<T> avroType) {
-    if (avroType.hasReflect()) {
-      if (avroType.hasSpecific()) {
-        Avros.checkCombiningSpecificAndReflectionSchemas();
-      }
-      return new ReflectDatumReader<T>(avroType.getSchema());
-    } else if (avroType.hasSpecific()) {
-      return new SpecificDatumReader<T>(avroType.getSchema());
-    } else {
-      return new GenericDatumReader<T>(avroType.getSchema());
-    }
-  }
-
-  @Override
-  public Iterator<T> read(FileSystem fs, final Path path) {
-    this.mapFn.initialize();
-    try {
-      FsInput fsi = new FsInput(path, fs.getConf());
-      final DataFileReader<T> reader = new DataFileReader<T>(fsi, recordReader);
-      return new AutoClosingIterator<T>(reader, new UnmodifiableIterator<T>() {
-        @Override
-        public boolean hasNext() {
-          return reader.hasNext();
-        }
-
-        @Override
-        public T next() {
-          return mapFn.map(reader.next());
-        }
-      });
-    } catch (IOException e) {
-      LOG.info("Could not read avro file at path: " + path, e);
-      return Iterators.emptyIterator();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
deleted file mode 100644
index 15792bf..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import java.io.IOException;
-
-import org.apache.avro.mapred.AvroJob;
-import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.impl.FileSourceImpl;
-import org.apache.crunch.types.avro.AvroInputFormat;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
-
-  private static <S> FormatBundle getBundle(AvroType<S> ptype) {
-    FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class)
-        .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
-        .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
-        .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());
-    return bundle;
-  }
-  
-  public AvroFileSource(Path path, AvroType<T> ptype) {
-    super(path, ptype, getBundle(ptype));
-  }
-
-  @Override
-  public String toString() {
-    return "Avro(" + path.toString() + ")";
-  }
-
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-    FileSystem fs = path.getFileSystem(conf);
-    return CompositePathIterable.create(fs, path, new AvroFileReaderFactory<T>((AvroType<T>) ptype));
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
deleted file mode 100644
index 76103e5..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSourceTarget.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.ReadableSourcePathTargetImpl;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.hadoop.fs.Path;
-
-public class AvroFileSourceTarget<T> extends ReadableSourcePathTargetImpl<T> {
-  public AvroFileSourceTarget(Path path, AvroType<T> atype) {
-    this(path, atype, new SequentialFileNamingScheme());
-  }
-
-  public AvroFileSourceTarget(Path path, AvroType<T> atype, FileNamingScheme fileNamingScheme) {
-    super(new AvroFileSource<T>(path, atype), new AvroFileTarget(path), fileNamingScheme);
-  }
-
-  @Override
-  public String toString() {
-    return target.toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
deleted file mode 100644
index 3a9e42c..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.avro;
-
-import org.apache.avro.mapred.AvroWrapper;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.io.SequentialFileNamingScheme;
-import org.apache.crunch.io.impl.FileTargetImpl;
-import org.apache.crunch.types.PType;
-import org.apache.crunch.types.avro.AvroOutputFormat;
-import org.apache.crunch.types.avro.AvroType;
-import org.apache.crunch.types.avro.Avros;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Job;
-
-public class AvroFileTarget extends FileTargetImpl {
-
-  public AvroFileTarget(String path) {
-    this(new Path(path));
-  }
-
-  public AvroFileTarget(Path path) {
-    this(path, new SequentialFileNamingScheme());
-  }
-
-  public AvroFileTarget(Path path, FileNamingScheme fileNamingScheme) {
-    super(path, AvroOutputFormat.class, fileNamingScheme);
-  }
-
-  @Override
-  public String toString() {
-    return "Avro(" + path.toString() + ")";
-  }
-
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-    if (!(ptype instanceof AvroType)) {
-      return false;
-    }
-    handler.configure(this, ptype);
-    return true;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    AvroType<?> atype = (AvroType<?>) ptype;
-    Configuration conf = job.getConfiguration();
-    String schemaParam = null;
-    if (name == null) {
-      schemaParam = "avro.output.schema";
-    } else {
-      schemaParam = "avro.output.schema." + name;
-    }
-    String outputSchema = conf.get(schemaParam);
-    if (outputSchema == null) {
-      conf.set(schemaParam, atype.getSchema().toString());
-    } else if (!outputSchema.equals(atype.getSchema().toString())) {
-      throw new IllegalStateException("Avro targets must use the same output schema");
-    }
-    Avros.configureReflectDataFactory(conf);
-    configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class,
-        outputPath, name);
-  }
-
-  @Override
-  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    if (ptype instanceof AvroType) {
-      return new AvroFileSourceTarget<T>(path, (AvroType<T>) ptype);
-    }
-    return null;
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java b/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
deleted file mode 100644
index 3bd802e..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/AutoClosingIterator.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-import com.google.common.collect.UnmodifiableIterator;
-import com.google.common.io.Closeables;
-
-/**
- * Closes the wrapped {@code Closeable} when {@link #hasNext()} returns false.  As long a client loops through to
- * completion (doesn't abort early due to an exception, short circuit, etc.) resources will be closed automatically.
- */
-public class AutoClosingIterator<T> extends UnmodifiableIterator<T> implements Closeable {
-  private final Iterator<T> iter;
-  private Closeable closeable;
-
-  public AutoClosingIterator(Closeable closeable, Iterator<T> iter) {
-    this.closeable = closeable;
-    this.iter = iter;
-  }
-
-  @Override
-  public boolean hasNext() {
-    if (!iter.hasNext()) {
-      Closeables.closeQuietly(this);
-      return false;
-    } else {
-      return true;
-    }
-  }
-
-  @Override
-  public T next() {
-    return iter.next();
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (closeable != null) {
-      closeable.close();
-      closeable = null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
deleted file mode 100644
index 688c801..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.Source;
-import org.apache.crunch.io.CrunchInputs;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.io.SourceTargetHelper;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class FileSourceImpl<T> implements Source<T> {
-
-  private static final Log LOG = LogFactory.getLog(FileSourceImpl.class);
-
-  protected final Path path;
-  protected final PType<T> ptype;
-  protected final FormatBundle<? extends InputFormat> inputBundle;
-
-  public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
-    this.path = path;
-    this.ptype = ptype;
-    this.inputBundle = FormatBundle.forInput(inputFormatClass);
-  }
-
-  public FileSourceImpl(Path path, PType<T> ptype, FormatBundle<? extends InputFormat> inputBundle) {
-    this.path = path;
-    this.ptype = ptype;
-    this.inputBundle = inputBundle;
-  }
-
-  public Path getPath() {
-    return path;
-  }
-  
-  @Override
-  public void configureSource(Job job, int inputId) throws IOException {
-    if (inputId == -1) {
-      FileInputFormat.addInputPath(job, path);
-      job.setInputFormatClass(inputBundle.getFormatClass());
-      inputBundle.configure(job.getConfiguration());
-    } else {
-      CrunchInputs.addInputPath(job, path, inputBundle, inputId);
-    }
-  }
-
-  @Override
-  public PType<T> getType() {
-    return ptype;
-  }
-
-  @Override
-  public long getSize(Configuration configuration) {
-    try {
-      return SourceTargetHelper.getPathSize(configuration, path);
-    } catch (IOException e) {
-      LOG.warn(String.format("Exception thrown looking up size of: %s", path), e);
-      throw new IllegalStateException("Failed to get the file size of:" + path, e);
-    }
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !getClass().equals(other.getClass())) {
-      return false;
-    }
-    FileSourceImpl o = (FileSourceImpl) other;
-    return ptype.equals(o.ptype) && path.equals(o.path) && inputBundle.equals(o.inputBundle);
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(ptype).append(path).append(inputBundle).toHashCode();
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append(inputBundle.getName()).append("(").append(path).append(")").toString();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
deleted file mode 100644
index 295edb5..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import org.apache.crunch.Pair;
-import org.apache.crunch.TableSource;
-import org.apache.crunch.io.FormatBundle;
-import org.apache.crunch.types.PTableType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
-
-public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implements TableSource<K, V> {
-
-  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, Class<? extends FileInputFormat> formatClass) {
-    super(path, tableType, formatClass);
-  }
-
-  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, FormatBundle bundle) {
-    super(path, tableType, bundle);
-  }
-  
-  @Override
-  public PTableType<K, V> getTableType() {
-    return (PTableType<K, V>) getType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
deleted file mode 100644
index c1c29e4..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.crunch.CrunchRuntimeException;
-import org.apache.crunch.SourceTarget;
-import org.apache.crunch.io.CrunchOutputs;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.OutputHandler;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.types.Converter;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-
-public class FileTargetImpl implements PathTarget {
-
-  private static final Log LOG = LogFactory.getLog(FileTargetImpl.class);
-  
-  protected final Path path;
-  private final Class<? extends FileOutputFormat> outputFormatClass;
-  private final FileNamingScheme fileNamingScheme;
-
-  public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass,
-      FileNamingScheme fileNamingScheme) {
-    this.path = path;
-    this.outputFormatClass = outputFormatClass;
-    this.fileNamingScheme = fileNamingScheme;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    Converter converter = ptype.getConverter();
-    Class keyClass = converter.getKeyClass();
-    Class valueClass = converter.getValueClass();
-    configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name);
-  }
-
-  protected void configureForMapReduce(Job job, Class keyClass, Class valueClass,
-      Class outputFormatClass, Path outputPath, String name) {
-    try {
-      FileOutputFormat.setOutputPath(job, outputPath);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-    if (name == null) {
-      job.setOutputFormatClass(outputFormatClass);
-      job.setOutputKeyClass(keyClass);
-      job.setOutputValueClass(valueClass);
-    } else {
-      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
-    }
-  }
-
-  @Override
-  public boolean accept(OutputHandler handler, PType<?> ptype) {
-    handler.configure(this, ptype);
-    return true;
-  }
-
-  @Override
-  public Path getPath() {
-    return path;
-  }
-
-  @Override
-  public FileNamingScheme getFileNamingScheme() {
-    return fileNamingScheme;
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !getClass().equals(other.getClass())) {
-      return false;
-    }
-    FileTargetImpl o = (FileTargetImpl) other;
-    return path.equals(o.path);
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(path).toHashCode();
-  }
-
-  @Override
-  public String toString() {
-    return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")")
-        .toString();
-  }
-
-  @Override
-  public <T> SourceTarget<T> asSourceTarget(PType<T> ptype) {
-    // By default, assume that we cannot do this.
-    return null;
-  }
-
-  @Override
-  public void handleExisting(WriteMode strategy, Configuration conf) {
-    FileSystem fs = null;
-    try {
-      fs = FileSystem.get(conf);
-    } catch (IOException e) {
-      LOG.error("Could not retrieve FileSystem object to check for existing path", e);
-      throw new CrunchRuntimeException(e);
-    }
-    
-    boolean exists = false;
-    try {
-      exists = fs.exists(path);
-    } catch (IOException e) {
-      LOG.error("Exception checking existence of path: " + path, e);
-      throw new CrunchRuntimeException(e);
-    }
-    
-    if (exists) {
-      switch (strategy) {
-      case DEFAULT:
-        LOG.error("Path " + path + " already exists!");
-        throw new CrunchRuntimeException("Path already exists: " + path);
-      case OVERWRITE:
-        LOG.info("Removing data at existing path: " + path);
-        try {
-          fs.delete(path, true);
-        } catch (IOException e) {
-          LOG.error("Exception thrown removing data at path: " + path, e);
-        }
-        break;
-      case APPEND:
-        LOG.info("Adding output files to existing path: " + path);
-        break;
-      default:
-        throw new CrunchRuntimeException("Unknown WriteMode:  " + strategy);
-      }
-    } else {
-      LOG.info("Will write output files to new path: " + path);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
deleted file mode 100644
index 6506816..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourcePathTargetImpl.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-
-public class ReadableSourcePathTargetImpl<T> extends SourcePathTargetImpl<T> implements ReadableSourceTarget<T> {
-
-  public ReadableSourcePathTargetImpl(ReadableSource<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
-    super(source, target, fileNamingScheme);
-  }
-
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-    return ((ReadableSource<T>) source).read(conf);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
deleted file mode 100644
index f435b3b..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/ReadableSourceTargetImpl.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import java.io.IOException;
-
-import org.apache.crunch.Target;
-import org.apache.crunch.io.ReadableSource;
-import org.apache.crunch.io.ReadableSourceTarget;
-import org.apache.hadoop.conf.Configuration;
-
-public class ReadableSourceTargetImpl<T> extends SourceTargetImpl<T> implements ReadableSourceTarget<T> {
-
-  public ReadableSourceTargetImpl(ReadableSource<T> source, Target target) {
-    super(source, target);
-  }
-
-  @Override
-  public Iterable<T> read(Configuration conf) throws IOException {
-    return ((ReadableSource<T>) source).read(conf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/crunch/blob/890e0086/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
deleted file mode 100644
index c0d7ce0..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/impl/SourcePathTargetImpl.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io.impl;
-
-import org.apache.crunch.Source;
-import org.apache.crunch.io.FileNamingScheme;
-import org.apache.crunch.io.PathTarget;
-import org.apache.crunch.types.PType;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-
-public class SourcePathTargetImpl<T> extends SourceTargetImpl<T> implements PathTarget {
-
-  private final FileNamingScheme fileNamingScheme;
-
-  public SourcePathTargetImpl(Source<T> source, PathTarget target, FileNamingScheme fileNamingScheme) {
-    super(source, target);
-    this.fileNamingScheme = fileNamingScheme;
-  }
-
-  @Override
-  public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
-    ((PathTarget) target).configureForMapReduce(job, ptype, outputPath, name);
-  }
-
-  @Override
-  public Path getPath() {
-    return ((PathTarget) target).getPath();
-  }
-
-  @Override
-  public FileNamingScheme getFileNamingScheme() {
-    return fileNamingScheme;
-  }
-}