You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by jw...@apache.org on 2013/01/10 22:22:54 UTC

[2/2] git commit: CRUNCH-127: Support writing multiple HBaseTargets in a single job, which required renaming InputBundles to FormatBundles, creating a CrunchOutputs to go along with CrunchInputs and deleting the old MultipleOutputs code, and creating our

Updated Branches:
  refs/heads/master 2bc04f98c -> 522a691c9


CRUNCH-127: Support writing multiple HBaseTargets in a single job, which required
renaming InputBundles to FormatBundles, creating a CrunchOutputs to go along with
CrunchInputs and deleting the old MultipleOutputs code, and creating our own
TableOutputFormat for HBase that is compatible with multiple tables. In the process,
remove the circular dependency between o.a.c.io and o.a.c.impl.mr


Project: http://git-wip-us.apache.org/repos/asf/incubator-crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-crunch/commit/522a691c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-crunch/tree/522a691c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-crunch/diff/522a691c

Branch: refs/heads/master
Commit: 522a691c967606963b8bfc13d3f92bc1e071ddef
Parents: 2bc04f9
Author: Josh Wills <jw...@apache.org>
Authored: Tue Dec 11 15:13:12 2012 -0800
Committer: Josh Wills <jw...@apache.org>
Committed: Wed Jan 9 14:35:50 2013 -0800

----------------------------------------------------------------------
 .../apache/crunch/io/hbase/WordCountHBaseIT.java   |   11 +-
 .../apache/crunch/io/hbase/HBaseSourceTarget.java  |   10 +-
 .../org/apache/crunch/io/hbase/HBaseTarget.java    |   13 +-
 .../apache/crunch/io/hbase/TableOutputFormat.java  |  130 ++++
 .../mapreduce/TaskAttemptContextFactory.java       |   70 +++
 .../lib/output/CrunchMultipleOutputs.java          |  474 ---------------
 .../crunch/impl/mr/emit/MultipleOutputEmitter.java |   15 +-
 .../crunch/impl/mr/run/CrunchInputFormat.java      |   20 +-
 .../crunch/impl/mr/run/CrunchInputSplit.java       |   43 +-
 .../apache/crunch/impl/mr/run/CrunchInputs.java    |   66 --
 .../crunch/impl/mr/run/CrunchRecordReader.java     |    1 +
 .../crunch/impl/mr/run/CrunchTaskContext.java      |    8 +-
 .../java/org/apache/crunch/impl/mr/run/RTNode.java |    6 +-
 .../crunch/impl/mr/run/RuntimeParameters.java      |    3 -
 .../impl/mr/run/TaskAttemptContextFactory.java     |   68 --
 .../java/org/apache/crunch/io/CrunchInputs.java    |   71 +++
 .../java/org/apache/crunch/io/CrunchOutputs.java   |  184 ++++++
 .../java/org/apache/crunch/io/FormatBundle.java    |  121 ++++
 .../java/org/apache/crunch/io/InputBundle.java     |  118 ----
 .../java/org/apache/crunch/io/PathTargetImpl.java  |    3 +-
 .../org/apache/crunch/io/ReadableSourceTarget.java |    1 -
 .../org/apache/crunch/io/SourceTargetHelper.java   |    4 -
 .../org/apache/crunch/io/avro/AvroFileSource.java  |    6 +-
 .../org/apache/crunch/io/impl/FileSourceImpl.java  |   12 +-
 .../apache/crunch/io/impl/FileTableSourceImpl.java |    4 +-
 .../org/apache/crunch/io/impl/FileTargetImpl.java  |    4 +-
 .../org/apache/crunch/io/text/NLineFileSource.java |    6 +-
 .../apache/crunch/io/text/TextFileTableSource.java |    6 +-
 28 files changed, 661 insertions(+), 817 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
index a46369e..4271fad 100644
--- a/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
+++ b/crunch-hbase/src/it/java/org/apache/crunch/io/hbase/WordCountHBaseIT.java
@@ -26,7 +26,6 @@ import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.util.List;
 import java.util.Random;
 import java.util.jar.JarEntry;
 import java.util.jar.JarOutputStream;
@@ -216,13 +215,15 @@ public class WordCountHBaseIT {
     int postFix = Math.abs(rand.nextInt());
     String inputTableName = "crunch_words_" + postFix;
     String outputTableName = "crunch_counts_" + postFix;
+    String otherTableName = "crunch_other_" + postFix;
     String joinTableName = "crunch_join_words_" + postFix;
     
     try {
 
       HTable inputTable = hbaseTestUtil.createTable(Bytes.toBytes(inputTableName), WORD_COLFAM);
       HTable outputTable = hbaseTestUtil.createTable(Bytes.toBytes(outputTableName), COUNTS_COLFAM);
-
+      HTable otherTable = hbaseTestUtil.createTable(Bytes.toBytes(otherTableName), COUNTS_COLFAM);
+      
       int key = 0;
       key = put(inputTable, key, "cat");
       key = put(inputTable, key, "cat");
@@ -231,11 +232,15 @@ public class WordCountHBaseIT {
       scan.addColumn(WORD_COLFAM, null);
       HBaseSourceTarget source = new HBaseSourceTarget(inputTableName, scan);
       PTable<ImmutableBytesWritable, Result> words = pipeline.read(source);
-      pipeline.write(wordCount(words), new HBaseTarget(outputTableName));
+      PCollection<Put> puts = wordCount(words);
+      pipeline.write(puts, new HBaseTarget(outputTableName));
+      pipeline.write(puts, new HBaseTarget(otherTableName));
       pipeline.done();
 
       assertIsLong(outputTable, "cat", 2);
       assertIsLong(outputTable, "dog", 1);
+      assertIsLong(otherTable, "cat", 2);
+      assertIsLong(otherTable, "dog", 1);
       
       // verify we can do joins.
       HTable joinTable = hbaseTestUtil.createTable(Bytes.toBytes(joinTableName), WORD_COLFAM);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
index 8e6a3fb..230c701 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java
@@ -25,9 +25,9 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.Pair;
 import org.apache.crunch.SourceTarget;
 import org.apache.crunch.TableSource;
-import org.apache.crunch.impl.mr.run.CrunchInputs;
 import org.apache.crunch.impl.mr.run.CrunchMapper;
-import org.apache.crunch.io.InputBundle;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.types.PTableType;
 import org.apache.crunch.types.PType;
 import org.apache.crunch.types.writable.Writables;
@@ -49,13 +49,13 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
       Writables.writables(ImmutableBytesWritable.class), Writables.writables(Result.class));
 
   protected Scan scan;
-  private InputBundle<TableInputFormat> inputBundle;
+  private FormatBundle<TableInputFormat> inputBundle;
   
   public HBaseSourceTarget(String table, Scan scan) {
     super(table);
     this.scan = scan;
     try {
-      this.inputBundle = new InputBundle<TableInputFormat>(TableInputFormat.class)
+      this.inputBundle = FormatBundle.forInput(TableInputFormat.class)
           .set(TableInputFormat.INPUT_TABLE, table)
           .set(TableInputFormat.SCAN, convertScanToString(scan));
     } catch (IOException e) {
@@ -100,7 +100,7 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair<
     TableMapReduceUtil.addDependencyJars(job);
     if (inputId == -1) {
       job.setMapperClass(CrunchMapper.class);
-      job.setInputFormatClass(inputBundle.getInputFormatClass());
+      job.setInputFormatClass(inputBundle.getFormatClass());
       inputBundle.configure(job.getConfiguration());
     } else {
       Path dummy = new Path("/hbase/" + table);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
index 35a74fa..44864e8 100644
--- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java
@@ -22,7 +22,8 @@ import java.io.IOException;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.SourceTarget;
-import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.crunch.io.CrunchOutputs;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.MapReduceTarget;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.types.PType;
@@ -33,7 +34,6 @@ import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
-import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
@@ -81,7 +81,6 @@ public class HBaseTarget implements MapReduceTarget {
   public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) {
     final Configuration conf = job.getConfiguration();
     HBaseConfiguration.addHbaseResources(conf);
-    conf.set(TableOutputFormat.OUTPUT_TABLE, table);
     Class<?> typeClass = ptype.getTypeClass(); // Either Put or Delete
     
     try {
@@ -95,9 +94,13 @@ public class HBaseTarget implements MapReduceTarget {
       job.setOutputFormatClass(TableOutputFormat.class);
       job.setOutputKeyClass(ImmutableBytesWritable.class);
       job.setOutputValueClass(typeClass);
+      conf.set(TableOutputFormat.OUTPUT_TABLE, table);
     } else {
-      CrunchMultipleOutputs.addNamedOutput(job, name,
-          TableOutputFormat.class,
+      FormatBundle<TableOutputFormat> bundle = FormatBundle.forOutput(
+          TableOutputFormat.class);
+      bundle.set(TableOutputFormat.OUTPUT_TABLE, table);
+      CrunchOutputs.addNamedOutput(job, name,
+          bundle,
           ImmutableBytesWritable.class,
           typeClass);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/TableOutputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/TableOutputFormat.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/TableOutputFormat.java
new file mode 100644
index 0000000..4493d65
--- /dev/null
+++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/TableOutputFormat.java
@@ -0,0 +1,130 @@
+/**
+ * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
+ *
+ * Cloudera, Inc. licenses this file to you under the Apache License,
+ * Version 2.0 (the "License"). You may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
+ * CONDITIONS OF ANY KIND, either express or implied. See the License for
+ * the specific language governing permissions and limitations under the
+ * License.
+ */
+package org.apache.crunch.io.hbase;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import com.google.common.collect.Maps;
+
+class TableOutputFormat<K> extends OutputFormat<K, Writable> {
+
+  private final Log LOG = LogFactory.getLog(TableOutputFormat.class);
+
+  /** Job parameter that specifies the output table. */
+  public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
+
+  /**
+   * Optional job parameter to specify a peer cluster.
+   * Used specifying remote cluster when copying between hbase clusters (the
+   * source is picked up from <code>hbase-site.xml</code>).
+   * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String)
+   */
+  public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum";
+
+  /** Optional specification of the rs class name of the peer cluster */
+  public static final String
+      REGION_SERVER_CLASS = "hbase.mapred.output.rs.class";
+  /** Optional specification of the rs impl name of the peer cluster */
+  public static final String
+      REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl";
+  
+  
+  private final Map<String, HTable> tables = Maps.newHashMap();
+  
+  private static class TableRecordWriter<K> extends RecordWriter<K, Writable> {
+
+    private HTable table;
+
+    public TableRecordWriter(HTable table) {
+      this.table = table;
+    }
+
+    @Override
+    public void close(TaskAttemptContext context) throws IOException {
+      table.close();
+    }
+
+    @Override
+    public void write(K key, Writable value)
+    throws IOException {
+      if (value instanceof Put) this.table.put(new Put((Put)value));
+      else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));
+      else throw new IOException("Pass a Delete or a Put");
+    }
+  }
+  
+  @Override
+  public void checkOutputSpecs(JobContext jc) throws IOException, InterruptedException {
+    // No-op for now
+  }
+
+  @Override
+  public OutputCommitter getOutputCommitter(TaskAttemptContext ctxt) throws IOException,
+      InterruptedException {
+    return new TableOutputCommitter();
+  }
+
+  @Override
+  public RecordWriter<K, Writable> getRecordWriter(TaskAttemptContext ctxt) throws IOException,
+      InterruptedException {
+    Configuration conf = ctxt.getConfiguration();
+    String tableName = conf.get(OUTPUT_TABLE);
+    if(tableName == null || tableName.length() <= 0) {
+      throw new IllegalArgumentException("Must specify table name");
+    }
+    HTable table = tables.get(tableName);
+    if (table == null) {
+      conf = HBaseConfiguration.create(conf);
+      String address = conf.get(QUORUM_ADDRESS);
+      String serverClass = conf.get(REGION_SERVER_CLASS);
+      String serverImpl = conf.get(REGION_SERVER_IMPL);
+      try {
+        if (address != null) {
+          ZKUtil.applyClusterKeyToConf(conf, address);
+        }
+        if (serverClass != null) {
+          conf.set(HConstants.REGION_SERVER_CLASS, serverClass);
+          conf.set(HConstants.REGION_SERVER_IMPL, serverImpl);
+        }
+        table = new HTable(conf, tableName);
+        table.setAutoFlush(false);
+        tables.put(tableName, table);
+      } catch (IOException e) {
+        LOG.error(e);
+        throw new RuntimeException(e);
+      }
+    }
+    return new TableRecordWriter<K>(table);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
new file mode 100644
index 0000000..887c051
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/TaskAttemptContextFactory.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.hadoop.mapreduce;
+
+import java.lang.reflect.Constructor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+
+/**
+ * A factory class that allows us to hide the fact that {@code TaskAttemptContext} is a class in
+ * Hadoop 1.x.x and an interface in Hadoop 2.x.x.
+ */
+@SuppressWarnings("unchecked")
+public class TaskAttemptContextFactory {
+
+  private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
+
+  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
+
+  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
+    return INSTANCE.createInternal(conf, taskAttemptId);
+  }
+
+  private Constructor<TaskAttemptContext> taskAttemptConstructor;
+
+  private TaskAttemptContextFactory() {
+    Class<TaskAttemptContext> implClass = TaskAttemptContext.class;
+    if (implClass.isInterface()) {
+      try {
+        implClass = (Class<TaskAttemptContext>) Class.forName(
+            "org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+      } catch (ClassNotFoundException e) {
+        LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
+      }
+    }
+    try {
+      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
+    } catch (Exception e) {
+      LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
+    }
+  }
+
+  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
+    try {
+      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
+    } catch (Exception e) {
+      LOG.error("Could not construct a TaskAttemptContext instance", e);
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java b/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
deleted file mode 100644
index 5d0863d..0000000
--- a/crunch/src/main/java/org/apache/crunch/hadoop/mapreduce/lib/output/CrunchMultipleOutputs.java
+++ /dev/null
@@ -1,474 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.hadoop.mapreduce.lib.output;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.StringTokenizer;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.RecordWriter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.hadoop.util.ReflectionUtils;
-
-import org.apache.crunch.impl.mr.run.TaskAttemptContextFactory;
-
-/**
- * The MultipleOutputs class simplifies writing output data 
- * to multiple outputs
- * 
- * <p> 
- * Case one: writing to additional outputs other than the job default output.
- *
- * Each additional output, or named output, may be configured with its own
- * <code>OutputFormat</code>, with its own key class and with its own value
- * class.
- * 
- * <p>
- * Case two: to write data to different files provided by user
- * </p>
- * 
- * <p>
- * MultipleOutputs supports counters, by default they are disabled. The 
- * counters group is the {@link CrunchMultipleOutputs} class name. The names of the 
- * counters are the same as the output name. These count the number records 
- * written to each output name.
- * </p>
- * 
- * Usage pattern for job submission:
- * <pre>
- *
- * Job job = new Job();
- *
- * FileInputFormat.setInputPath(job, inDir);
- * FileOutputFormat.setOutputPath(job, outDir);
- *
- * job.setMapperClass(MOMap.class);
- * job.setReducerClass(MOReduce.class);
- * ...
- *
- * // Defines additional single text based output 'text' for the job
- * MultipleOutputs.addNamedOutput(job, "text", TextOutputFormat.class,
- * LongWritable.class, Text.class);
- *
- * // Defines additional sequence-file based output 'sequence' for the job
- * MultipleOutputs.addNamedOutput(job, "seq",
- *   SequenceFileOutputFormat.class,
- *   LongWritable.class, Text.class);
- * ...
- *
- * job.waitForCompletion(true);
- * ...
- * </pre>
- * <p>
- * Usage in Reducer:
- * <pre>
- * <K, V> String generateFileName(K k, V v) {
- *   return k.toString() + "_" + v.toString();
- * }
- * 
- * public class MOReduce extends
- *   Reducer&lt;WritableComparable, Writable,WritableComparable, Writable&gt; {
- * private MultipleOutputs mos;
- * public void setup(Context context) {
- * ...
- * mos = new MultipleOutputs(context);
- * }
- *
- * public void reduce(WritableComparable key, Iterator&lt;Writable&gt; values,
- * Context context)
- * throws IOException {
- * ...
- * mos.write("text", , key, new Text("Hello"));
- * mos.write("seq", LongWritable(1), new Text("Bye"), "seq_a");
- * mos.write("seq", LongWritable(2), key, new Text("Chau"), "seq_b");
- * mos.write(key, new Text("value"), generateFileName(key, new Text("value")));
- * ...
- * }
- *
- * public void cleanup(Context) throws IOException {
- * mos.close();
- * ...
- * }
- *
- * }
- * </pre>
- */
-public class CrunchMultipleOutputs<KEYOUT, VALUEOUT> {
-
-  private static final String MULTIPLE_OUTPUTS = "mapreduce.multipleoutputs";
-
-  private static final String MO_PREFIX = 
-    "mapreduce.multipleoutputs.namedOutput.";
-
-  private static final String PART = "part";
-  private static final String FORMAT = ".format";
-  private static final String KEY = ".key";
-  private static final String VALUE = ".value";
-  private static final String COUNTERS_ENABLED = 
-    "mapreduce.multipleoutputs.counters";
-
-  private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
-  
-  /**
-   * Counters group used by the counters of MultipleOutputs.
-   */
-  private static final String COUNTERS_GROUP = CrunchMultipleOutputs.class.getName();
-  
-  /**
-   * Cache for the taskContexts
-   */
-  private Map<String, TaskAttemptContext> taskContexts = new HashMap<String, TaskAttemptContext>();
-
-  /**
-   * Checks if a named output name is valid token.
-   *
-   * @param namedOutput named output Name
-   * @throws IllegalArgumentException if the output name is not valid.
-   */
-  private static void checkTokenName(String namedOutput) {
-    if (namedOutput == null || namedOutput.length() == 0) {
-      throw new IllegalArgumentException(
-        "Name cannot be NULL or emtpy");
-    }
-    for (char ch : namedOutput.toCharArray()) {
-      if ((ch >= 'A') && (ch <= 'Z')) {
-        continue;
-      }
-      if ((ch >= 'a') && (ch <= 'z')) {
-        continue;
-      }
-      if ((ch >= '0') && (ch <= '9')) {
-        continue;
-      }
-      throw new IllegalArgumentException(
-        "Name cannot be have a '" + ch + "' char");
-    }
-  }
-
-  /**
-   * Checks if output name is valid.
-   *
-   * name cannot be the name used for the default output
-   * @param outputPath base output Name
-   * @throws IllegalArgumentException if the output name is not valid.
-   */
-  private static void checkBaseOutputPath(String outputPath) {
-    if (outputPath.equals(PART)) {
-      throw new IllegalArgumentException("output name cannot be 'part'");
-    }
-  }
-  
-  /**
-   * Checks if a named output name is valid.
-   *
-   * @param namedOutput named output Name
-   * @throws IllegalArgumentException if the output name is not valid.
-   */
-  private static void checkNamedOutputName(JobContext job,
-      String namedOutput, boolean alreadyDefined) {
-    checkTokenName(namedOutput);
-    checkBaseOutputPath(namedOutput);
-    List<String> definedChannels = getNamedOutputsList(job);
-    if (alreadyDefined && definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput +
-        "' already alreadyDefined");
-    } else if (!alreadyDefined && !definedChannels.contains(namedOutput)) {
-      throw new IllegalArgumentException("Named output '" + namedOutput +
-        "' not defined");
-    }
-  }
-
-  // Returns list of channel names.
-  private static List<String> getNamedOutputsList(JobContext job) {
-    List<String> names = new ArrayList<String>();
-    StringTokenizer st = new StringTokenizer(
-      job.getConfiguration().get(MULTIPLE_OUTPUTS, ""), " ");
-    while (st.hasMoreTokens()) {
-      names.add(st.nextToken());
-    }
-    return names;
-  }
-
-  // Returns the named output OutputFormat.
-  @SuppressWarnings("unchecked")
-  private static Class<? extends OutputFormat<?, ?>> getNamedOutputFormatClass(
-    JobContext job, String namedOutput) {
-    return (Class<? extends OutputFormat<?, ?>>)
-      job.getConfiguration().getClass(MO_PREFIX + namedOutput + FORMAT, null,
-      OutputFormat.class);
-  }
-
-  // Returns the key class for a named output.
-  private static Class<?> getNamedOutputKeyClass(JobContext job,
-                                                String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + KEY, null,
-      Object.class);
-  }
-
-  // Returns the value class for a named output.
-  private static Class<?> getNamedOutputValueClass(
-      JobContext job, String namedOutput) {
-    return job.getConfiguration().getClass(MO_PREFIX + namedOutput + VALUE,
-      null, Object.class);
-  }
-
-  /**
-   * Adds a named output for the job.
-   * <p/>
-   *
-   * @param job               job to add the named output
-   * @param namedOutput       named output name, it has to be a word, letters
-   *                          and numbers only, cannot be the word 'part' as
-   *                          that is reserved for the default output.
-   * @param outputFormatClass OutputFormat class.
-   * @param keyClass          key class
-   * @param valueClass        value class
-   */
-  public static void addNamedOutput(Job job, String namedOutput,
-      Class<? extends OutputFormat> outputFormatClass,
-      Class<?> keyClass, Class<?> valueClass) {
-    checkNamedOutputName(job, namedOutput, true);
-    Configuration conf = job.getConfiguration();
-    conf.set(MULTIPLE_OUTPUTS,
-      conf.get(MULTIPLE_OUTPUTS, "") + " " + namedOutput);
-    conf.setClass(MO_PREFIX + namedOutput + FORMAT, outputFormatClass,
-      OutputFormat.class);
-    conf.setClass(MO_PREFIX + namedOutput + KEY, keyClass, Object.class);
-    conf.setClass(MO_PREFIX + namedOutput + VALUE, valueClass, Object.class);
-  }
-
-  /**
-   * Enables or disables counters for the named outputs.
-   * 
-   * The counters group is the {@link CrunchMultipleOutputs} class name.
-   * The names of the counters are the same as the named outputs. These
-   * counters count the number records written to each output name.
-   * By default these counters are disabled.
-   *
-   * @param job    job  to enable counters
-   * @param enabled indicates if the counters will be enabled or not.
-   */
-  public static void setCountersEnabled(Job job, boolean enabled) {
-    job.getConfiguration().setBoolean(COUNTERS_ENABLED, enabled);
-  }
-
-  /**
-   * Returns if the counters for the named outputs are enabled or not.
-   * By default these counters are disabled.
-   *
-   * @param job    the job 
-   * @return TRUE if the counters are enabled, FALSE if they are disabled.
-   */
-  public static boolean getCountersEnabled(JobContext job) {
-    return job.getConfiguration().getBoolean(COUNTERS_ENABLED, false);
-  }
-
-  /**
-   * Wraps RecordWriter to increment counters. 
-   */
-  @SuppressWarnings("unchecked")
-  private static class RecordWriterWithCounter extends RecordWriter {
-    private RecordWriter writer;
-    private String counterName;
-    private TaskInputOutputContext context;
-
-    public RecordWriterWithCounter(RecordWriter writer, String counterName,
-                                   TaskInputOutputContext context) {
-      this.writer = writer;
-      this.counterName = counterName;
-      this.context = context;
-    }
-
-    @SuppressWarnings({"unchecked"})
-    public void write(Object key, Object value) 
-        throws IOException, InterruptedException {
-      context.getCounter(COUNTERS_GROUP, counterName).increment(1);
-      writer.write(key, value);
-    }
-
-    public void close(TaskAttemptContext context) 
-        throws IOException, InterruptedException {
-      writer.close(context);
-    }
-  }
-
-  // instance code, to be used from Mapper/Reducer code
-
-  private TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context;
-  private Set<String> namedOutputs;
-  private Map<String, RecordWriter<?, ?>> recordWriters;
-  private boolean countersEnabled;
-  
-  /**
-   * Creates and initializes multiple outputs support,
-   * it should be instantiated in the Mapper/Reducer setup method.
-   *
-   * @param context the TaskInputOutputContext object
-   */
-  public CrunchMultipleOutputs(
-      TaskInputOutputContext<?, ?, KEYOUT, VALUEOUT> context) {
-    this.context = context;
-    namedOutputs = Collections.unmodifiableSet(
-      new HashSet<String>(CrunchMultipleOutputs.getNamedOutputsList(context)));
-    recordWriters = new HashMap<String, RecordWriter<?, ?>>();
-    countersEnabled = getCountersEnabled(context);
-  }
-
-  /**
-   * Write key and value to the namedOutput.
-   *
-   * Output path is a unique file generated for the namedOutput.
-   * For example, {namedOutput}-(m|r)-{part-number}
-   * 
-   * @param namedOutput the named output name
-   * @param key         the key
-   * @param value       the value
-   */
-  @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value)
-      throws IOException, InterruptedException {
-    write(namedOutput, key, value, namedOutput);
-  }
-
-  /**
-   * Write key and value to baseOutputPath using the namedOutput.
-   * 
-   * @param namedOutput    the named output name
-   * @param key            the key
-   * @param value          the value
-   * @param baseOutputPath base-output path to write the record to.
-   * Note: Framework will generate unique filename for the baseOutputPath
-   */
-  @SuppressWarnings("unchecked")
-  public <K, V> void write(String namedOutput, K key, V value,
-      String baseOutputPath) throws IOException, InterruptedException {
-    checkNamedOutputName(context, namedOutput, false);
-    checkBaseOutputPath(baseOutputPath);
-    if (!namedOutputs.contains(namedOutput)) {
-      throw new IllegalArgumentException("Undefined named output '" +
-        namedOutput + "'");
-    }
-    TaskAttemptContext taskContext = getContext(namedOutput);
-    getRecordWriter(taskContext, baseOutputPath).write(key, value);
-  }
-
-  /**
-   * Write key value to an output file name.
-   * 
-   * Gets the record writer from job's output format.  
-   * Job's output format should be a FileOutputFormat.
-   * 
-   * @param key       the key
-   * @param value     the value
-   * @param baseOutputPath base-output path to write the record to.
-   * Note: Framework will generate unique filename for the baseOutputPath
-   */
-  @SuppressWarnings("unchecked")
-  public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
-      throws IOException, InterruptedException {
-    checkBaseOutputPath(baseOutputPath);
-    TaskAttemptContext taskContext = TaskAttemptContextFactory.create(
-      context.getConfiguration(), context.getTaskAttemptID());
-    getRecordWriter(taskContext, baseOutputPath).write(key, value);
-  }
-
-  // by being synchronized MultipleOutputTask can be use with a
-  // MultithreadedMapper.
-  @SuppressWarnings("unchecked")
-  private synchronized RecordWriter getRecordWriter(
-      TaskAttemptContext taskContext, String baseFileName) 
-      throws IOException, InterruptedException {
-    
-    // look for record-writer in the cache
-    RecordWriter writer = recordWriters.get(baseFileName);
-    
-    // If not in cache, create a new one
-    if (writer == null) {
-      // get the record writer from context output format
-      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, baseFileName);
-      try {
-        writer = ((OutputFormat) ReflectionUtils.newInstance(
-          taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
-          .getRecordWriter(taskContext);
-      } catch (ClassNotFoundException e) {
-        throw new IOException(e);
-      }
- 
-      // if counters are enabled, wrap the writer with context 
-      // to increment counters 
-      if (countersEnabled) {
-        writer = new RecordWriterWithCounter(writer, baseFileName, context);
-      }
-      
-      // add the record-writer to the cache
-      recordWriters.put(baseFileName, writer);
-    }
-    return writer;
-  }
-
-   // Create a taskAttemptContext for the named output with 
-   // output format and output key/value types put in the context
-  private TaskAttemptContext getContext(String nameOutput) throws IOException {
-
-    TaskAttemptContext taskContext = taskContexts.get(nameOutput);
-
-    if (taskContext != null) {
-      return taskContext;
-    }
-
-    // The following trick leverages the instantiation of a record writer via
-    // the job thus supporting arbitrary output formats.
-    Job job = new Job(context.getConfiguration());
-    job.getConfiguration().set("crunch.namedoutput", nameOutput);
-    job.setOutputFormatClass(getNamedOutputFormatClass(context, nameOutput));
-    job.setOutputKeyClass(getNamedOutputKeyClass(context, nameOutput));
-    job.setOutputValueClass(getNamedOutputValueClass(context, nameOutput));
-    taskContext = TaskAttemptContextFactory.create(
-      job.getConfiguration(), context.getTaskAttemptID());
-    
-    taskContexts.put(nameOutput, taskContext);
-    
-    return taskContext;
-  }
-  
-  /**
-   * Closes all the opened outputs.
-   * 
-   * This should be called from cleanup method of map/reduce task.
-   * If overridden subclasses must invoke <code>super.close()</code> at the
-   * end of their <code>close()</code>
-   * 
-   */
-  @SuppressWarnings("unchecked")
-  public void close() throws IOException, InterruptedException {
-    for (RecordWriter writer : recordWriters.values()) {
-      writer.close(context);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
index d271112..2e58fed 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/emit/MultipleOutputEmitter.java
@@ -21,16 +21,17 @@ import java.io.IOException;
 
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.Emitter;
+import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.types.Converter;
-import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 
 public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
 
   private final Converter converter;
-  private final CrunchMultipleOutputs<K, V> outputs;
+  private final CrunchOutputs<K, V> outputs;
   private final String outputName;
 
-  public MultipleOutputEmitter(Converter converter, CrunchMultipleOutputs<K, V> outputs, String outputName) {
+  public MultipleOutputEmitter(Converter converter, CrunchOutputs<K, V> outputs,
+      String outputName) {
     this.converter = converter;
     this.outputs = outputs;
     this.outputName = outputName;
@@ -39,10 +40,10 @@ public class MultipleOutputEmitter<T, K, V> implements Emitter<T> {
   @Override
   public void emit(T emitted) {
     try {
-      this.outputs.write(outputName, converter.outputKey(emitted), converter.outputValue(emitted));
-    } catch (IOException e) {
-      throw new CrunchRuntimeException(e);
-    } catch (InterruptedException e) {
+      this.outputs.write(outputName,
+          (K) converter.outputKey(emitted),
+          (V) converter.outputValue(emitted));
+    } catch (Exception e) {
       throw new CrunchRuntimeException(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
index bca7770..d39f67f 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputFormat.java
@@ -21,7 +21,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.crunch.io.InputBundle;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -41,15 +43,18 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
   public List<InputSplit> getSplits(JobContext job) throws IOException, InterruptedException {
     List<InputSplit> splits = Lists.newArrayList();
     Configuration conf = job.getConfiguration();
-    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
+    Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = CrunchInputs.getFormatNodeMap(job);
 
     // First, build a map of InputFormats to Paths
-    for (Map.Entry<InputBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
-      InputBundle inputBundle = entry.getKey();
+    for (Map.Entry<FormatBundle, Map<Integer, List<Path>>> entry : formatNodeMap.entrySet()) {
+      FormatBundle inputBundle = entry.getKey();
       Job jobCopy = new Job(conf);
       inputBundle.configure(jobCopy.getConfiguration());
-      InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getInputFormatClass(),
+      InputFormat<?, ?> format = (InputFormat<?, ?>) ReflectionUtils.newInstance(inputBundle.getFormatClass(),
           jobCopy.getConfiguration());
+      if (format instanceof Configurable) {
+        ((Configurable) format).setConf(jobCopy.getConfiguration());
+      }
       for (Map.Entry<Integer, List<Path>> nodeEntry : entry.getValue().entrySet()) {
         Integer nodeIndex = nodeEntry.getKey();
         List<Path> paths = nodeEntry.getValue();
@@ -59,8 +64,8 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
         // and Mapper types by wrapping in a TaggedInputSplit.
         List<InputSplit> pathSplits = format.getSplits(jobCopy);
         for (InputSplit pathSplit : pathSplits) {
-          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getInputFormatClass(), inputBundle
-              .getExtraConfiguration(), nodeIndex, jobCopy.getConfiguration()));
+          splits.add(new CrunchInputSplit(pathSplit, inputBundle.getFormatClass(),
+              nodeIndex, jobCopy.getConfiguration()));
         }
       }
     }
@@ -72,5 +77,4 @@ public class CrunchInputFormat<K, V> extends InputFormat<K, V> {
       InterruptedException {
     return new CrunchRecordReader<K, V>(inputSplit, context);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
index 6dc99b6..b41062b 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputSplit.java
@@ -22,9 +22,7 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
-import java.util.Map;
 
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -35,11 +33,10 @@ import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.util.ReflectionUtils;
 
-class CrunchInputSplit extends InputSplit implements Configurable, Writable {
+class CrunchInputSplit extends InputSplit implements Writable {
 
   private InputSplit inputSplit;
-  private Class<? extends InputFormat> inputFormatClass;
-  private Map<String, String> extraConf;
+  private Class<? extends InputFormat<?, ?>> inputFormatClass;
   private int nodeIndex;
   private Configuration conf;
 
@@ -47,15 +44,21 @@ class CrunchInputSplit extends InputSplit implements Configurable, Writable {
     // default constructor
   }
 
-  public CrunchInputSplit(InputSplit inputSplit, Class<? extends InputFormat> inputFormatClass,
-      Map<String, String> extraConf, int nodeIndex, Configuration conf) {
+  public CrunchInputSplit(
+      InputSplit inputSplit,
+      Class<? extends InputFormat<?, ?>> inputFormatClass,
+      int nodeIndex,
+      Configuration conf) {
     this.inputSplit = inputSplit;
     this.inputFormatClass = inputFormatClass;
-    this.extraConf = extraConf;
     this.nodeIndex = nodeIndex;
     this.conf = conf;
   }
 
+  public Configuration getConf() {
+    return conf;
+  }
+  
   public int getNodeIndex() {
     return nodeIndex;
   }
@@ -64,7 +67,7 @@ class CrunchInputSplit extends InputSplit implements Configurable, Writable {
     return inputSplit;
   }
 
-  public Class<? extends InputFormat> getInputFormatClass() {
+  public Class<? extends InputFormat<?, ?>> getInputFormatClass() {
     return inputFormatClass;
   }
 
@@ -80,12 +83,8 @@ class CrunchInputSplit extends InputSplit implements Configurable, Writable {
 
   public void readFields(DataInput in) throws IOException {
     nodeIndex = in.readInt();
-    int extraConfSize = in.readInt();
-    if (extraConfSize > 0) {
-      for (int i = 0; i < extraConfSize; i++) {
-        conf.set(in.readUTF(), in.readUTF());
-      }
-    }
+    conf = new Configuration();
+    conf.readFields(in);
     inputFormatClass = (Class<? extends InputFormat<?, ?>>) readClass(in);
     Class<? extends InputSplit> inputSplitClass = (Class<? extends InputSplit>) readClass(in);
     inputSplit = (InputSplit) ReflectionUtils.newInstance(inputSplitClass, conf);
@@ -106,11 +105,7 @@ class CrunchInputSplit extends InputSplit implements Configurable, Writable {
 
   public void write(DataOutput out) throws IOException {
     out.writeInt(nodeIndex);
-    out.writeInt(extraConf.size());
-    for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      out.writeUTF(e.getKey());
-      out.writeUTF(e.getValue());
-    }
+    conf.write(out);
     Text.writeString(out, inputFormatClass.getName());
     Text.writeString(out, inputSplit.getClass().getName());
     SerializationFactory factory = new SerializationFactory(conf);
@@ -118,12 +113,4 @@ class CrunchInputSplit extends InputSplit implements Configurable, Writable {
     serializer.open((DataOutputStream) out);
     serializer.serialize(inputSplit);
   }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
deleted file mode 100644
index 63eba61..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchInputs.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.crunch.io.InputBundle;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-
-import com.google.common.base.Joiner;
-import com.google.common.base.Splitter;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-
-public class CrunchInputs {
-
-  private static final char RECORD_SEP = ',';
-  private static final char FIELD_SEP = ';';
-  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
-  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
-
-  public static void addInputPath(Job job, Path path, InputBundle inputBundle, int nodeIndex) {
-    Configuration conf = job.getConfiguration();
-    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
-    String existing = conf.get(RuntimeParameters.MULTI_INPUTS);
-    conf.set(RuntimeParameters.MULTI_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
-  }
-
-  public static Map<InputBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
-    Map<InputBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
-    Configuration conf = job.getConfiguration();
-    for (String input : Splitter.on(RECORD_SEP).split(conf.get(RuntimeParameters.MULTI_INPUTS))) {
-      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
-      InputBundle inputBundle = InputBundle.fromSerialized(fields.get(0));
-      if (!formatNodeMap.containsKey(inputBundle)) {
-        formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
-      }
-      Integer nodeIndex = Integer.valueOf(fields.get(1));
-      if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
-        formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
-      }
-      formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
-    }
-    return formatNodeMap;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
index c3575a5..fc8fb32 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchRecordReader.java
@@ -19,6 +19,7 @@ package org.apache.crunch.impl.mr.run;
 
 import java.io.IOException;
 
+import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
index 3846e36..c4f2873 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/CrunchTaskContext.java
@@ -22,17 +22,17 @@ import java.util.List;
 
 import org.apache.crunch.CrunchRuntimeException;
 import org.apache.crunch.impl.mr.plan.PlanningParameters;
+import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.util.DistCache;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.TaskInputOutputContext;
-import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 
 class CrunchTaskContext {
 
   private final TaskInputOutputContext<Object, Object, Object, Object> taskContext;
   private final NodeContext nodeContext;
-  private CrunchMultipleOutputs<Object, Object> multipleOutputs;
+  private CrunchOutputs<Object, Object> multipleOutputs;
 
   public CrunchTaskContext(TaskInputOutputContext<Object, Object, Object, Object> taskContext, NodeContext nodeContext) {
     this.taskContext = taskContext;
@@ -77,9 +77,9 @@ class CrunchTaskContext {
     }
   }
 
-  public CrunchMultipleOutputs<Object, Object> getMultipleOutputs() {
+  public CrunchOutputs<Object, Object> getMultipleOutputs() {
     if (multipleOutputs == null) {
-      multipleOutputs = new CrunchMultipleOutputs<Object, Object>(taskContext);
+      multipleOutputs = new CrunchOutputs<Object, Object>(taskContext);
     }
     return multipleOutputs;
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
index ead1d9e..e30980d 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RTNode.java
@@ -70,12 +70,14 @@ public class RTNode implements Serializable {
 
     if (outputConverter != null) {
       if (outputName != null) {
-        this.emitter = new MultipleOutputEmitter(outputConverter, ctxt.getMultipleOutputs(), outputName);
+        this.emitter = new MultipleOutputEmitter(outputConverter, ctxt.getMultipleOutputs(),
+            outputName);
       } else {
         this.emitter = new OutputEmitter(outputConverter, ctxt.getContext());
       }
     } else if (!children.isEmpty()) {
-      this.emitter = new IntermediateEmitter(outputPType, children, ctxt.getContext().getConfiguration());
+      this.emitter = new IntermediateEmitter(outputPType, children,
+          ctxt.getContext().getConfiguration());
     } else {
       throw new CrunchRuntimeException("Invalid RTNode config: no emitter for: " + nodeName);
     }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
index 1dcabb3..27a8402 100644
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
+++ b/crunch/src/main/java/org/apache/crunch/impl/mr/run/RuntimeParameters.java
@@ -19,14 +19,11 @@ package org.apache.crunch.impl.mr.run;
 
 /**
  * Parameters used during the runtime execution.
- * 
  */
 public class RuntimeParameters {
 
   public static final String AGGREGATOR_BUCKETS = "crunch.aggregator.buckets";
 
-  public static final String MULTI_INPUTS = "crunch.inputs.dir";
-
   public static final String DEBUG = "crunch.debug";
 
   public static final String TMP_DIR = "crunch.tmp.dir";

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java b/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
deleted file mode 100644
index b196f89..0000000
--- a/crunch/src/main/java/org/apache/crunch/impl/mr/run/TaskAttemptContextFactory.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.impl.mr.run;
-
-import java.lang.reflect.Constructor;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
-
-/**
- *
- */
-@SuppressWarnings("unchecked")
-public class TaskAttemptContextFactory {
-
-  private static final Log LOG = LogFactory.getLog(TaskAttemptContextFactory.class);
-
-  private static final TaskAttemptContextFactory INSTANCE = new TaskAttemptContextFactory();
-
-  public static TaskAttemptContext create(Configuration conf, TaskAttemptID taskAttemptId) {
-    return INSTANCE.createInternal(conf, taskAttemptId);
-  }
-
-  private Constructor taskAttemptConstructor;
-
-  private TaskAttemptContextFactory() {
-    Class implClass = TaskAttemptContext.class;
-    if (implClass.isInterface()) {
-      try {
-        implClass = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
-      } catch (ClassNotFoundException e) {
-        LOG.fatal("Could not find TaskAttemptContextImpl class, exiting", e);
-      }
-    }
-    try {
-      this.taskAttemptConstructor = implClass.getConstructor(Configuration.class, TaskAttemptID.class);
-    } catch (Exception e) {
-      LOG.fatal("Could not access TaskAttemptContext constructor, exiting", e);
-    }
-  }
-
-  private TaskAttemptContext createInternal(Configuration conf, TaskAttemptID taskAttemptId) {
-    try {
-      return (TaskAttemptContext) taskAttemptConstructor.newInstance(conf, taskAttemptId);
-    } catch (Exception e) {
-      LOG.error("Could not construct a TaskAttemptContext instance", e);
-      return null;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java b/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
new file mode 100644
index 0000000..d154db2
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/CrunchInputs.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Helper functions for configuring multiple {@code InputFormat} instances within a single
+ * Crunch MapReduce job.
+ */
+public class CrunchInputs {
+  public static final String CRUNCH_INPUTS = "crunch.inputs.dir";
+
+  private static final char RECORD_SEP = ',';
+  private static final char FIELD_SEP = ';';
+  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+
+  public static void addInputPath(Job job, Path path, FormatBundle inputBundle, int nodeIndex) {
+    Configuration conf = job.getConfiguration();
+    String inputs = JOINER.join(inputBundle.serialize(), String.valueOf(nodeIndex), path.toString());
+    String existing = conf.get(CRUNCH_INPUTS);
+    conf.set(CRUNCH_INPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
+  }
+
+  public static Map<FormatBundle, Map<Integer, List<Path>>> getFormatNodeMap(JobContext job) {
+    Map<FormatBundle, Map<Integer, List<Path>>> formatNodeMap = Maps.newHashMap();
+    Configuration conf = job.getConfiguration();
+    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_INPUTS))) {
+      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+      FormatBundle<InputFormat> inputBundle = FormatBundle.fromSerialized(fields.get(0), InputFormat.class);
+      if (!formatNodeMap.containsKey(inputBundle)) {
+        formatNodeMap.put(inputBundle, Maps.<Integer, List<Path>> newHashMap());
+      }
+      Integer nodeIndex = Integer.valueOf(fields.get(1));
+      if (!formatNodeMap.get(inputBundle).containsKey(nodeIndex)) {
+        formatNodeMap.get(inputBundle).put(nodeIndex, Lists.<Path> newLinkedList());
+      }
+      formatNodeMap.get(inputBundle).get(nodeIndex).add(new Path(fields.get(2)));
+    }
+    return formatNodeMap;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
new file mode 100644
index 0000000..ccf4fb5
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/CrunchOutputs.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.hadoop.mapreduce.TaskAttemptContextFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An analogue of {@link CrunchInputs} for handling multiple {@code OutputFormat} instances
+ * writing to multiple files within a single MapReduce job.
+ */
+public class CrunchOutputs<K, V> {
+  public static final String CRUNCH_OUTPUTS = "crunch.outputs.dir";
+  
+  private static final char RECORD_SEP = ',';
+  private static final char FIELD_SEP = ';';
+  private static final Joiner JOINER = Joiner.on(FIELD_SEP);
+  private static final Splitter SPLITTER = Splitter.on(FIELD_SEP);
+
+  public static void addNamedOutput(Job job, String name,
+      Class<? extends OutputFormat> outputFormatClass,
+      Class keyClass, Class valueClass) {
+    addNamedOutput(job, name, FormatBundle.forOutput(outputFormatClass), keyClass, valueClass);
+  }
+  
+  public static void addNamedOutput(Job job, String name,
+      FormatBundle<? extends OutputFormat> outputBundle,
+      Class keyClass, Class valueClass) {
+    Configuration conf = job.getConfiguration();
+    String inputs = JOINER.join(name, outputBundle.serialize(), keyClass.getName(), valueClass.getName());
+    String existing = conf.get(CRUNCH_OUTPUTS);
+    conf.set(CRUNCH_OUTPUTS, existing == null ? inputs : existing + RECORD_SEP + inputs);
+  }
+  
+  private static class OutputConfig<K, V> {
+    public FormatBundle<OutputFormat<K, V>> bundle;
+    public Class<K> keyClass;
+    public Class<V> valueClass;
+    
+    public OutputConfig(FormatBundle<OutputFormat<K, V>> bundle,
+        Class<K> keyClass, Class<V> valueClass) {
+      this.bundle = bundle;
+      this.keyClass = keyClass;
+      this.valueClass = valueClass;
+    }
+  }
+  
+  private static Map<String, OutputConfig> getNamedOutputs(
+      TaskInputOutputContext<?, ?, ?, ?> context) {
+    Map<String, OutputConfig> out = Maps.newHashMap();
+    Configuration conf = context.getConfiguration();
+    for (String input : Splitter.on(RECORD_SEP).split(conf.get(CRUNCH_OUTPUTS))) {
+      List<String> fields = Lists.newArrayList(SPLITTER.split(input));
+      String name = fields.get(0);
+      FormatBundle<OutputFormat> bundle = FormatBundle.fromSerialized(fields.get(1),
+          OutputFormat.class);
+      try {
+        Class<?> keyClass = Class.forName(fields.get(2));
+        Class<?> valueClass = Class.forName(fields.get(3));
+        out.put(name, new OutputConfig(bundle, keyClass, valueClass));
+      } catch (ClassNotFoundException e) {
+        throw new CrunchRuntimeException(e);
+      }
+    }
+    return out;
+  }
+  
+  private static final String BASE_OUTPUT_NAME = "mapreduce.output.basename";
+  private static final String COUNTERS_GROUP = CrunchOutputs.class.getName();
+
+  private TaskInputOutputContext<?, ?, K, V> baseContext;
+  private Map<String, OutputConfig> namedOutputs;
+  private Map<String, RecordWriter<K, V>> recordWriters;
+  private Map<String, TaskAttemptContext> taskContextCache;
+  
+  /**
+   * Creates and initializes multiple outputs support,
+   * it should be instantiated in the Mapper/Reducer setup method.
+   *
+   * @param context the TaskInputOutputContext object
+   */
+  public CrunchOutputs(TaskInputOutputContext<?, ?, K, V> context) {
+    this.baseContext = context;
+    namedOutputs = getNamedOutputs(context);
+    recordWriters = Maps.newHashMap();
+    taskContextCache = Maps.newHashMap();
+  }
+  
+  @SuppressWarnings("unchecked")
+  public void write(String namedOutput, K key, V value)
+      throws IOException, InterruptedException {
+    if (!namedOutputs.containsKey(namedOutput)) {
+      throw new IllegalArgumentException("Undefined named output '" +
+        namedOutput + "'");
+    }
+    TaskAttemptContext taskContext = getContext(namedOutput);
+    baseContext.getCounter(COUNTERS_GROUP, namedOutput).increment(1);
+    getRecordWriter(taskContext, namedOutput).write(key, value);
+  }
+  
+  public void close() throws IOException, InterruptedException {
+    for (RecordWriter<?, ?> writer : recordWriters.values()) {
+      writer.close(baseContext);
+    }
+  }
+  
+  private TaskAttemptContext getContext(String nameOutput) throws IOException {
+    TaskAttemptContext taskContext = taskContextCache.get(nameOutput);
+    if (taskContext != null) {
+      return taskContext;
+    }
+
+    // The following trick leverages the instantiation of a record writer via
+    // the job thus supporting arbitrary output formats.
+    OutputConfig outConfig = namedOutputs.get(nameOutput);
+    Configuration conf = new Configuration(baseContext.getConfiguration());
+    Job job = new Job(conf);
+    job.getConfiguration().set("crunch.namedoutput", nameOutput);
+    job.setOutputFormatClass(outConfig.bundle.getFormatClass());
+    job.setOutputKeyClass(outConfig.keyClass);
+    job.setOutputValueClass(outConfig.valueClass);
+    outConfig.bundle.configure(job.getConfiguration());
+    taskContext = TaskAttemptContextFactory.create(
+      job.getConfiguration(), baseContext.getTaskAttemptID());
+
+    taskContextCache.put(nameOutput, taskContext);
+    return taskContext;
+  }
+  
+  private synchronized RecordWriter<K, V> getRecordWriter(
+      TaskAttemptContext taskContext, String namedOutput) 
+      throws IOException, InterruptedException {
+    // look for record-writer in the cache
+    RecordWriter<K, V> writer = recordWriters.get(namedOutput);
+    
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      taskContext.getConfiguration().set(BASE_OUTPUT_NAME, namedOutput);
+      try {
+        OutputFormat format = ReflectionUtils.newInstance(
+            taskContext.getOutputFormatClass(),
+            taskContext.getConfiguration());
+        writer = format.getRecordWriter(taskContext);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+      recordWriters.put(namedOutput, writer);
+    }
+    
+    return writer;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
new file mode 100644
index 0000000..d969009
--- /dev/null
+++ b/crunch/src/main/java/org/apache/crunch/io/FormatBundle.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.io;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Map;
+
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.OutputFormat;
+
+import com.google.common.collect.Maps;
+
+/**
+ * A combination of an {@link InputFormat} or {@link OutputFormat} and any extra 
+ * configuration information that format class needs to run.
+ * 
+ * <p>The {@code FormatBundle} allow us to let different formats act as
+ * if they are the only format that exists in a particular MapReduce job, even
+ * when we have multiple types of inputs and outputs within a single job.
+ */
+public class FormatBundle<K> implements Serializable {
+
+  private Class<K> formatClass;
+  private Map<String, String> extraConf;
+
+  public static <T> FormatBundle<T> fromSerialized(String serialized, Class<T> clazz) {
+    ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
+    try {
+      ObjectInputStream ois = new ObjectInputStream(bais);
+      FormatBundle<T> bundle = (FormatBundle<T>) ois.readObject();
+      ois.close();
+      return bundle;
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    } catch (ClassNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static <T extends InputFormat<?, ?>> FormatBundle<T> forInput(Class<T> inputFormatClass) {
+    return new FormatBundle<T>(inputFormatClass);
+  }
+  
+  public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> inputFormatClass) {
+    return new FormatBundle<T>(inputFormatClass);
+  }
+  
+  private FormatBundle(Class<K> formatClass) {
+    this.formatClass = formatClass;
+    this.extraConf = Maps.newHashMap();
+  }
+
+  public FormatBundle<K> set(String key, String value) {
+    this.extraConf.put(key, value);
+    return this;
+  }
+
+  public Class<K> getFormatClass() {
+    return formatClass;
+  }
+
+  public Configuration configure(Configuration conf) {
+    for (Map.Entry<String, String> e : extraConf.entrySet()) {
+      conf.set(e.getKey(), e.getValue());
+    }
+    return conf;
+  }
+
+  public String serialize() {
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    try {
+      ObjectOutputStream oos = new ObjectOutputStream(baos);
+      oos.writeObject(this);
+      oos.close();
+      return Base64.encodeBase64String(baos.toByteArray());
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public String getName() {
+    return formatClass.getSimpleName();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder().append(formatClass).append(extraConf).toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (other == null || !(other instanceof FormatBundle)) {
+      return false;
+    }
+    FormatBundle<K> oib = (FormatBundle<K>) other;
+    return formatClass.equals(oib.formatClass) && extraConf.equals(oib.extraConf);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/InputBundle.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/InputBundle.java b/crunch/src/main/java/org/apache/crunch/io/InputBundle.java
deleted file mode 100644
index ed737d7..0000000
--- a/crunch/src/main/java/org/apache/crunch/io/InputBundle.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.crunch.io;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.Map;
-
-import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-
-import com.google.common.collect.Maps;
-
-/**
- * A combination of an InputFormat and any configuration information that
- * InputFormat needs to run properly. InputBundles allow us to let different
- * InputFormats act as if they are the only InputFormat that exists in a
- * particular MapReduce job.
- */
-public class InputBundle<K extends InputFormat> implements Serializable {
-
-  private Class<K> inputFormatClass;
-  private Map<String, String> extraConf;
-
-  public static <T extends InputFormat> InputBundle<T> fromSerialized(String serialized) {
-    ByteArrayInputStream bais = new ByteArrayInputStream(Base64.decodeBase64(serialized));
-    try {
-      ObjectInputStream ois = new ObjectInputStream(bais);
-      InputBundle<T> bundle = (InputBundle<T>) ois.readObject();
-      ois.close();
-      return bundle;
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    } catch (ClassNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static <T extends InputFormat> InputBundle<T> of(Class<T> inputFormatClass) {
-    return new InputBundle<T>(inputFormatClass);
-  }
-  
-  public InputBundle(Class<K> inputFormatClass) {
-    this.inputFormatClass = inputFormatClass;
-    this.extraConf = Maps.newHashMap();
-  }
-
-  public InputBundle<K> set(String key, String value) {
-    this.extraConf.put(key, value);
-    return this;
-  }
-
-  public Class<K> getInputFormatClass() {
-    return inputFormatClass;
-  }
-
-  public Map<String, String> getExtraConfiguration() {
-    return extraConf;
-  }
-
-  public Configuration configure(Configuration conf) {
-    for (Map.Entry<String, String> e : extraConf.entrySet()) {
-      conf.set(e.getKey(), e.getValue());
-    }
-    return conf;
-  }
-
-  public String serialize() {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    try {
-      ObjectOutputStream oos = new ObjectOutputStream(baos);
-      oos.writeObject(this);
-      oos.close();
-      return Base64.encodeBase64String(baos.toByteArray());
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public String getName() {
-    return inputFormatClass.getSimpleName();
-  }
-
-  @Override
-  public int hashCode() {
-    return new HashCodeBuilder().append(inputFormatClass).append(extraConf).toHashCode();
-  }
-
-  @Override
-  public boolean equals(Object other) {
-    if (other == null || !(other instanceof InputBundle)) {
-      return false;
-    }
-    InputBundle<K> oib = (InputBundle<K>) other;
-    return inputFormatClass.equals(oib.inputFormatClass) && extraConf.equals(oib.extraConf);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
index beb9ce8..0be3f9a 100644
--- a/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/PathTargetImpl.java
@@ -18,7 +18,6 @@
 package org.apache.crunch.io;
 
 import org.apache.crunch.types.PType;
-import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -54,7 +53,7 @@ public abstract class PathTargetImpl implements PathTarget {
       job.setOutputKeyClass(keyClass);
       job.setOutputValueClass(valueClass);
     } else {
-      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
index ac979c3..95c90aa 100644
--- a/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
+++ b/crunch/src/main/java/org/apache/crunch/io/ReadableSourceTarget.java
@@ -18,7 +18,6 @@
 package org.apache.crunch.io;
 
 import org.apache.crunch.SourceTarget;
-import org.apache.hadoop.fs.Path;
 
 /**
  * An interface that indicates that a {@code SourceTarget} instance can be read

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
index 65fb35a..f4400de 100644
--- a/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
+++ b/crunch/src/main/java/org/apache/crunch/io/SourceTargetHelper.java
@@ -19,8 +19,6 @@ package org.apache.crunch.io;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -32,8 +30,6 @@ import org.apache.hadoop.fs.Path;
  */
 public class SourceTargetHelper {
 
-  private static final Log LOG = LogFactory.getLog(SourceTargetHelper.class);
-
   public static long getPathSize(Configuration conf, Path path) throws IOException {
     return getPathSize(path.getFileSystem(conf), path);
   }

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
index 0e9a6ee..15792bf 100644
--- a/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/avro/AvroFileSource.java
@@ -21,7 +21,7 @@ import java.io.IOException;
 
 import org.apache.avro.mapred.AvroJob;
 import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.InputBundle;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.avro.AvroInputFormat;
@@ -33,8 +33,8 @@ import org.apache.hadoop.fs.Path;
 
 public class AvroFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
 
-  private static <S> InputBundle getBundle(AvroType<S> ptype) {
-    InputBundle bundle = new InputBundle(AvroInputFormat.class)
+  private static <S> FormatBundle getBundle(AvroType<S> ptype) {
+    FormatBundle bundle = FormatBundle.forInput(AvroInputFormat.class)
         .set(AvroJob.INPUT_IS_REFLECT, String.valueOf(ptype.hasReflect()))
         .set(AvroJob.INPUT_SCHEMA, ptype.getSchema().toString())
         .set(Avros.REFLECT_DATA_FACTORY_CLASS, Avros.REFLECT_DATA_FACTORY.getClass().getName());

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
index e67845a..964c6a0 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java
@@ -23,8 +23,8 @@ import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.crunch.Source;
-import org.apache.crunch.impl.mr.run.CrunchInputs;
-import org.apache.crunch.io.InputBundle;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.SourceTargetHelper;
 import org.apache.crunch.types.PType;
 import org.apache.hadoop.conf.Configuration;
@@ -39,15 +39,15 @@ public class FileSourceImpl<T> implements Source<T> {
 
   protected final Path path;
   protected final PType<T> ptype;
-  protected final InputBundle<?> inputBundle;
+  protected final FormatBundle<? extends InputFormat> inputBundle;
 
   public FileSourceImpl(Path path, PType<T> ptype, Class<? extends InputFormat> inputFormatClass) {
     this.path = path;
     this.ptype = ptype;
-    this.inputBundle = InputBundle.of(inputFormatClass);
+    this.inputBundle = FormatBundle.forInput(inputFormatClass);
   }
 
-  public FileSourceImpl(Path path, PType<T> ptype, InputBundle<?> inputBundle) {
+  public FileSourceImpl(Path path, PType<T> ptype, FormatBundle<? extends InputFormat> inputBundle) {
     this.path = path;
     this.ptype = ptype;
     this.inputBundle = inputBundle;
@@ -57,7 +57,7 @@ public class FileSourceImpl<T> implements Source<T> {
   public void configureSource(Job job, int inputId) throws IOException {
     if (inputId == -1) {
       FileInputFormat.addInputPath(job, path);
-      job.setInputFormatClass(inputBundle.getInputFormatClass());
+      job.setInputFormatClass(inputBundle.getFormatClass());
       inputBundle.configure(job.getConfiguration());
     } else {
       CrunchInputs.addInputPath(job, path, inputBundle, inputId);

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
index 7d63cc0..295edb5 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTableSourceImpl.java
@@ -19,7 +19,7 @@ package org.apache.crunch.io.impl;
 
 import org.apache.crunch.Pair;
 import org.apache.crunch.TableSource;
-import org.apache.crunch.io.InputBundle;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.types.PTableType;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -30,7 +30,7 @@ public class FileTableSourceImpl<K, V> extends FileSourceImpl<Pair<K, V>> implem
     super(path, tableType, formatClass);
   }
 
-  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, InputBundle bundle) {
+  public FileTableSourceImpl(Path path, PTableType<K, V> tableType, FormatBundle bundle) {
     super(path, tableType, bundle);
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
index 00df45e..46a6386 100644
--- a/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
+++ b/crunch/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java
@@ -19,7 +19,7 @@ package org.apache.crunch.io.impl;
 
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.crunch.SourceTarget;
-import org.apache.crunch.hadoop.mapreduce.lib.output.CrunchMultipleOutputs;
+import org.apache.crunch.io.CrunchOutputs;
 import org.apache.crunch.io.FileNamingScheme;
 import org.apache.crunch.io.OutputHandler;
 import org.apache.crunch.io.PathTarget;
@@ -62,7 +62,7 @@ public class FileTargetImpl implements PathTarget {
       job.setOutputKeyClass(keyClass);
       job.setOutputValueClass(valueClass);
     } else {
-      CrunchMultipleOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
+      CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-crunch/blob/522a691c/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
----------------------------------------------------------------------
diff --git a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
index ad3414a..40e2dbd 100644
--- a/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
+++ b/crunch/src/main/java/org/apache/crunch/io/text/NLineFileSource.java
@@ -20,7 +20,7 @@ package org.apache.crunch.io.text;
 import java.io.IOException;
 
 import org.apache.crunch.io.CompositePathIterable;
-import org.apache.crunch.io.InputBundle;
+import org.apache.crunch.io.FormatBundle;
 import org.apache.crunch.io.ReadableSource;
 import org.apache.crunch.io.impl.FileSourceImpl;
 import org.apache.crunch.types.PType;
@@ -36,8 +36,8 @@ import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
  */
 public class NLineFileSource<T> extends FileSourceImpl<T> implements ReadableSource<T> {
 
-  private static InputBundle getBundle(int linesPerTask) {
-    InputBundle bundle = new InputBundle(NLineInputFormat.class);
+  private static FormatBundle getBundle(int linesPerTask) {
+    FormatBundle bundle = FormatBundle.forInput(NLineInputFormat.class);
     bundle.set(NLineInputFormat.LINES_PER_MAP, String.valueOf(linesPerTask));
     return bundle;
   }