You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by mi...@apache.org on 2016/10/30 15:35:13 UTC

tez git commit: TEZ-3215. Support for MultipleOutputs. (mingma)

Repository: tez
Updated Branches:
  refs/heads/master a2f8cc3d9 -> a328d469d


TEZ-3215. Support for MultipleOutputs. (mingma)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/a328d469
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/a328d469
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/a328d469

Branch: refs/heads/master
Commit: a328d469d3de53eae087aee62b10140531a87722
Parents: a2f8cc3
Author: Ming Ma <mi...@twitter.com>
Authored: Sun Oct 30 08:34:48 2016 -0700
Committer: Ming Ma <mi...@twitter.com>
Committed: Sun Oct 30 08:34:48 2016 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/mapreduce/hadoop/MRJobConfig.java       |   6 +
 .../apache/tez/mapreduce/output/MROutput.java   | 138 +++++++++----
 .../tez/mapreduce/output/MultiMROutput.java     | 203 +++++++++++++++++++
 .../tez/mapreduce/output/TestMultiMROutput.java | 193 ++++++++++++++++++
 .../library/api/KeyValueWriterWithBasePath.java |  49 +++++
 6 files changed, 546 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 83e0b59..b4beb80 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
 
 ALL CHANGES:
 
+  TEZ-3215. Support for MultipleOutputs.
   TEZ-3097. Flaky test: TestCommit.testDAGCommitStartedEventFail_OnDAGSuccess.
   TEZ-3487. Improvements in travis yml file to get builds to work.
   TEZ-3405. Support ability for AM to kill itself if there is no client heartbeating to it.

http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
index 7db98bc..02c74b2 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/hadoop/MRJobConfig.java
@@ -132,6 +132,12 @@ public interface MRJobConfig {
 
   public static final String MAPREDUCE_JOB_USER_CLASSPATH_FIRST = "mapreduce.job.user.classpath.first";
 
+  public static String LAZY_OUTPUTFORMAT_OUTPUTFORMAT =
+      "mapreduce.output.lazyoutputformat.outputformat";
+
+  public static String FILEOUTPUTFORMAT_BASE_OUTPUT_NAME =
+      "mapreduce.output.basename";
+
   public static final String IO_SORT_FACTOR = "mapreduce.task.io.sort.factor";
 
   public static final String IO_SORT_MB = "mapreduce.task.io.sort.mb";

http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index 043085d..6ed70c5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
 import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -73,14 +75,14 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
  * {@link MROutput} is an {@link Output} which allows key/values pairs
  * to be written by a processor.
  *
- * It is compatible with all standard Apache Hadoop MapReduce 
+ * It is compatible with all standard Apache Hadoop MapReduce
  * OutputFormat implementations.
- * 
+ *
  * This class is not meant to be extended by external projects.
  */
 @Public
 public class MROutput extends AbstractLogicalOutput {
-  
+
   /**
    * Helper class to configure {@link MROutput}
    *
@@ -94,18 +96,36 @@ public class MROutput extends AbstractLogicalOutput {
     String outputClassName = MROutput.class.getName();
     String outputPath;
     boolean doCommit = true;
-    
-    private MROutputConfigBuilder(Configuration conf, Class<?> outputFormatParam) {
+
+    private MROutputConfigBuilder(Configuration conf,
+        Class<?> outputFormatParam, boolean useLazyOutputFormat) {
       this.conf = conf;
       if (outputFormatParam != null) {
         outputFormatProvided = true;
-        this.outputFormat = outputFormatParam;
-        if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom(outputFormatParam)) {
+        if (org.apache.hadoop.mapred.OutputFormat.class.isAssignableFrom(
+            outputFormatParam)) {
           useNewApi = false;
-        } else if (org.apache.hadoop.mapreduce.OutputFormat.class.isAssignableFrom(outputFormatParam)) {
+          if (!useLazyOutputFormat) {
+            this.outputFormat = outputFormatParam;
+          } else {
+            conf.setClass(MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT,
+                outputFormatParam,
+                org.apache.hadoop.mapred.OutputFormat.class);
+            this.outputFormat =
+                org.apache.hadoop.mapred.lib.LazyOutputFormat.class;
+          }
+        } else if (OutputFormat.class.isAssignableFrom(outputFormatParam)) {
           useNewApi = true;
+          if (!useLazyOutputFormat) {
+            this.outputFormat = outputFormatParam;
+          } else {
+            conf.setClass(MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT,
+                outputFormatParam, OutputFormat.class);
+            this.outputFormat = LazyOutputFormat.class;
+          }
         } else {
-          throw new TezUncheckedException("outputFormat must be assignable from either " +
+          throw new TezUncheckedException(
+              "outputFormat must be assignable from either " +
               "org.apache.hadoop.mapred.OutputFormat or " +
               "org.apache.hadoop.mapreduce.OutputFormat" +
               " Given: " + outputFormatParam.getName());
@@ -145,8 +165,21 @@ public class MROutput extends AbstractLogicalOutput {
     }
 
     private MROutputConfigBuilder setOutputPath(String outputPath) {
-      if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.isAssignableFrom(outputFormat) || 
-          FileOutputFormat.class.isAssignableFrom(outputFormat))) {
+      boolean passNewLazyOutputFormatCheck =
+          (LazyOutputFormat.class.isAssignableFrom(outputFormat)) &&
+          org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.
+              isAssignableFrom(conf.getClass(
+                  MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, null));
+      boolean passOldLazyOutputFormatCheck =
+          (org.apache.hadoop.mapred.lib.LazyOutputFormat.class.
+              isAssignableFrom(outputFormat)) &&
+          FileOutputFormat.class.isAssignableFrom(conf.getClass(
+              MRJobConfig.LAZY_OUTPUTFORMAT_OUTPUTFORMAT, null));
+
+      if (!(org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.class.
+          isAssignableFrom(outputFormat) ||
+          FileOutputFormat.class.isAssignableFrom(outputFormat) ||
+          passNewLazyOutputFormatCheck || passOldLazyOutputFormatCheck)) {
         throw new TezUncheckedException("When setting outputPath the outputFormat must " +
             "be assignable from either org.apache.hadoop.mapred.FileOutputFormat or " +
             "org.apache.hadoop.mapreduce.lib.output.FileOutputFormat. " +
@@ -277,7 +310,12 @@ public class MROutput extends AbstractLogicalOutput {
    */
   public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
                                                           @Nullable Class<?> outputFormat) {
-    return new MROutputConfigBuilder(conf, outputFormat);
+    return createConfigBuilder(conf, outputFormat, false);
+  }
+
+  public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
+      @Nullable Class<?> outputFormat, boolean useLazyOutputFormat) {
+    return new MROutputConfigBuilder(conf, outputFormat, useLazyOutputFormat);
   }
 
   /**
@@ -298,9 +336,14 @@ public class MROutput extends AbstractLogicalOutput {
    * @return {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder}
    */
   public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
-                                                          @Nullable Class<?> outputFormat,
-                                                          @Nullable String outputPath) {
-    MROutputConfigBuilder configurer = new MROutputConfigBuilder(conf, outputFormat);
+      @Nullable Class<?> outputFormat, @Nullable String outputPath) {
+    return createConfigBuilder(conf, outputFormat, outputPath, false);
+  }
+
+  public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
+      @Nullable Class<?> outputFormat, @Nullable String outputPath,
+      boolean useLazyOutputFormat) {
+    MROutputConfigBuilder configurer = createConfigBuilder(conf, outputFormat, useLazyOutputFormat);
     if (outputPath != null) {
       configurer.setOutputPath(outputPath);
     }
@@ -312,9 +355,9 @@ public class MROutput extends AbstractLogicalOutput {
   private final NumberFormat taskNumberFormat = NumberFormat.getInstance();
   private final NumberFormat nonTaskNumberFormat = NumberFormat.getInstance();
   
-  private JobConf jobConf;
+  protected JobConf jobConf;
   boolean useNewApi;
-  private AtomicBoolean flushed = new AtomicBoolean(false);
+  protected AtomicBoolean flushed = new AtomicBoolean(false);
 
   @SuppressWarnings("rawtypes")
   org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
@@ -326,7 +369,7 @@ public class MROutput extends AbstractLogicalOutput {
   @SuppressWarnings("rawtypes")
   org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
 
-  private TezCounter outputRecordCounter;
+  protected TezCounter outputRecordCounter;
 
   @VisibleForTesting
   TaskAttemptContext newApiTaskAttemptContext;
@@ -344,6 +387,12 @@ public class MROutput extends AbstractLogicalOutput {
 
   @Override
   public List<Event> initialize() throws IOException, InterruptedException {
+    List<Event> events = initializeBase();
+    initWriter();
+    return events;
+  }
+
+  protected List<Event> initializeBase() throws IOException, InterruptedException {
     getContext().requestInitialMemory(0l, null); //mandatory call
     taskNumberFormat.setMinimumIntegerDigits(5);
     taskNumberFormat.setGroupingUsed(false);
@@ -373,18 +422,18 @@ public class MROutput extends AbstractLogicalOutput {
       taskAttemptId.getTaskID().getId());
     jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
     
-    if (useNewApi) {
-      // set the output part name to have a unique prefix
-      if (jobConf.get("mapreduce.output.basename") == null) {
-        jobConf.set("mapreduce.output.basename", getOutputFileNamePrefix());
-      }
-    }
-
     String outputFormatClassName;
 
-    outputRecordCounter = getContext().getCounters().findCounter(TaskCounter.OUTPUT_RECORDS);    
+    outputRecordCounter = getContext().getCounters().findCounter(
+        TaskCounter.OUTPUT_RECORDS);
 
     if (useNewApi) {
+      // set the output part name to have a unique prefix
+      if (jobConf.get(MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME) == null) {
+        jobConf.set(MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME,
+            getOutputFileNamePrefix());
+      }
+
       newApiTaskAttemptContext = createTaskAttemptContext(taskAttemptId);
       try {
         newOutputFormat =
@@ -396,13 +445,6 @@ public class MROutput extends AbstractLogicalOutput {
       }
 
       initCommitter(jobConf, useNewApi);
-
-      try {
-        newRecordWriter =
-            newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
-      } catch (InterruptedException e) {
-        throw new IOException("Interrupted while creating record writer", e);
-      }
     } else {
       oldApiTaskAttemptContext =
           new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
@@ -412,13 +454,6 @@ public class MROutput extends AbstractLogicalOutput {
       outputFormatClassName = oldOutputFormat.getClass().getName();
 
       initCommitter(jobConf, useNewApi);
-
-      FileSystem fs = FileSystem.get(jobConf);
-      String finalName = getOutputName();
-
-      oldRecordWriter =
-          oldOutputFormat.getRecordWriter(
-              fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
     }
 
     LOG.info(getContext().getDestinationVertexName() + ": "
@@ -427,6 +462,22 @@ public class MROutput extends AbstractLogicalOutput {
     return null;
   }
 
+  private void initWriter() throws IOException {
+    if (useNewApi) {
+      try {
+        newRecordWriter =
+            newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while creating record writer", e);
+      }
+    } else {
+      FileSystem fs = FileSystem.get(jobConf);
+      String finalName = getOutputName(getOutputFileNamePrefix());
+      oldRecordWriter = oldOutputFormat.getRecordWriter(
+          fs, jobConf, finalName, new MRReporter(getContext().getCounters()));
+    }
+  }
+
   @Override
   public void start() {
   }
@@ -475,7 +526,7 @@ public class MROutput extends AbstractLogicalOutput {
         isMapperOutput, null);
   }
 
-  private String getOutputFileNamePrefix() {
+  protected String getOutputFileNamePrefix() {
     String prefix = jobConf.get(MRJobConfig.MROUTPUT_FILE_NAME_PREFIX);
     if (prefix == null) {
       prefix = "part-v" + 
@@ -485,10 +536,9 @@ public class MROutput extends AbstractLogicalOutput {
     return prefix;
   }
 
-  private String getOutputName() {
+  protected String getOutputName(String prefix) {
     // give a unique prefix to the output name
-    return getOutputFileNamePrefix() + 
-        "-" + taskNumberFormat.format(getContext().getTaskIndex());
+    return prefix + "-" + taskNumberFormat.format(getContext().getTaskIndex());
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java
new file mode 100644
index 0000000..0bd573b
--- /dev/null
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MultiMROutput.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.mapreduce.output;
+
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.Output;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.api.KeyValueWriterWithBasePath;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+
+/**
+ * {@link MultiMROutput} is an {@link Output} which allows key/values pairs
+ * to be written by a processor to different output files.
+ *
+ * It is compatible with all standard Apache Hadoop MapReduce
+ * OutputFormat implementations.
+ *
+ */
+@Public
+public class MultiMROutput extends MROutput {
+
+  Map<String, org.apache.hadoop.mapreduce.RecordWriter<?, ?>>
+      newRecordWriters;
+
+  Map<String, org.apache.hadoop.mapred.RecordWriter<?, ?>>
+      oldRecordWriters;
+
+  public MultiMROutput(OutputContext outputContext, int numPhysicalOutputs) {
+    super(outputContext, numPhysicalOutputs);
+  }
+
+  @Override
+  public List<Event> initialize() throws IOException, InterruptedException {
+    List<Event> events = super.initializeBase();
+    if (useNewApi) {
+      newRecordWriters = new HashMap<>();
+    } else {
+      oldRecordWriters = new HashMap<>();
+    }
+    return events;
+  }
+
+  /**
+   * Create an
+   * {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder}
+   *
+   * @param conf         Configuration for the {@link MROutput}
+   * @param outputFormat FileInputFormat derived class
+   * @param outputPath   Output path
+   * @return {@link org.apache.tez.mapreduce.output.MROutput.MROutputConfigBuilder}
+   */
+  public static MROutputConfigBuilder createConfigBuilder(Configuration conf,
+      Class<?> outputFormat, String outputPath, boolean useLazyOutputFormat) {
+    return MROutput.createConfigBuilder(conf, outputFormat, outputPath, useLazyOutputFormat)
+        .setOutputClassName(MultiMROutput.class.getName());
+  }
+
+  @Override
+  public KeyValueWriterWithBasePath getWriter() throws IOException {
+    return new KeyValueWriterWithBasePath() {
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        throw new UnsupportedOperationException(
+            "Write without basePath isn't supported.");
+      }
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public void write(Object key, Object value, String basePath)
+          throws IOException {
+        if (basePath == null) {
+          throw new UnsupportedOperationException(
+              "Write without basePath isn't supported.");
+        }
+        if (basePath.length() > 0 && basePath.charAt(0) == '/' ) {
+          // The base path can't be absolute path starting with "/".
+          // Otherwise, it will cause the task temporary files being
+          // written outside the output committer's task work path.
+          throw new UnsupportedOperationException(
+              "Write with absolute basePath isn't supported.");
+        }
+        if (useNewApi) {
+          try {
+            getNewRecordWriter(newApiTaskAttemptContext, basePath).write(
+                key, value);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOInterruptedException(
+                "Interrupted while writing next key-value",e);
+          }
+        } else {
+          getOldRecordWriter(basePath).write(key, value);
+        }
+        outputRecordCounter.increment(1);
+        getContext().notifyProgress();
+      }
+    };
+  }
+
+  /**
+   * Call this in the processor before finishing to ensure outputs that
+   * outputs have been flushed. Must be called before commit.
+   * @throws IOException
+   */
+  @Override
+  public void flush() throws IOException {
+    if (flushed.getAndSet(true)) {
+      return;
+    }
+    try {
+      if (useNewApi) {
+          for (RecordWriter writer : newRecordWriters.values()) {
+            writer.close(newApiTaskAttemptContext);
+          }
+      } else {
+        for (org.apache.hadoop.mapred.RecordWriter writer :
+            oldRecordWriters.values()) {
+          writer.close(null);
+        }
+      }
+    } catch (InterruptedException e) {
+        throw new IOException("Interrupted while closing record writer", e);
+    }
+  }
+
+  @SuppressWarnings("unchecked")
+  private synchronized RecordWriter getNewRecordWriter(
+      TaskAttemptContext taskContext, String baseFileName)
+      throws IOException, InterruptedException {
+
+    // look for record-writer in the cache
+    RecordWriter writer = newRecordWriters.get(baseFileName);
+
+    // If not in cache, create a new one
+    if (writer == null) {
+      // get the record writer from context output format
+      taskContext.getConfiguration().set(
+          MRJobConfig.FILEOUTPUTFORMAT_BASE_OUTPUT_NAME, baseFileName);
+      try {
+        writer = ((OutputFormat) ReflectionUtils.newInstance(
+            taskContext.getOutputFormatClass(), taskContext.getConfiguration()))
+            .getRecordWriter(taskContext);
+      } catch (ClassNotFoundException e) {
+        throw new IOException(e);
+      }
+      // add the record-writer to the cache
+      newRecordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+
+  @SuppressWarnings("unchecked")
+  private synchronized org.apache.hadoop.mapred.RecordWriter
+      getOldRecordWriter(String baseFileName) throws IOException {
+
+    // look for record-writer in the cache
+    org.apache.hadoop.mapred.RecordWriter writer =
+        oldRecordWriters.get(baseFileName);
+
+    // If not in cache, create a new one
+    if (writer == null) {
+        FileSystem fs = FileSystem.get(jobConf);
+        String finalName = getOutputName(baseFileName);
+        writer = oldOutputFormat.getRecordWriter(fs, jobConf,
+            finalName, new MRReporter(getContext().getCounters()));
+      // add the record-writer to the cache
+      oldRecordWriters.put(baseFileName, writer);
+    }
+    return writer;
+  }
+};

http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
new file mode 100644
index 0000000..3618e40
--- /dev/null
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/output/TestMultiMROutput.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.output;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.DataSinkDescriptor;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.runtime.api.OutputContext;
+import org.apache.tez.runtime.api.OutputStatisticsReporter;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestMultiMROutput {
+
+  @Test(timeout = 5000)
+  public void testNewAPI_TextOutputFormat() throws Exception {
+    validate(true, TextOutputFormat.class, true, FileOutputCommitter.class,
+        false);
+  }
+
+  @Test(timeout = 5000)
+  public void testOldAPI_TextOutputFormat() throws Exception {
+    validate(false, org.apache.hadoop.mapred.TextOutputFormat.class, false,
+        org.apache.hadoop.mapred.FileOutputCommitter.class, false);
+  }
+
+  @Test(timeout = 5000)
+  public void testNewAPI_SequenceFileOutputFormat() throws Exception {
+    validate(true, SequenceFileOutputFormat.class, false,
+        FileOutputCommitter.class, false);
+  }
+
+  @Test(timeout = 5000)
+  public void testOldAPI_SequenceFileOutputFormat() throws Exception {
+    validate(false, org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+        false, org.apache.hadoop.mapred.FileOutputCommitter.class, false);
+  }
+
+  @Test(timeout = 5000)
+  public void testNewAPI_LazySequenceFileOutputFormat() throws Exception {
+    validate(true, SequenceFileOutputFormat.class, false,
+        FileOutputCommitter.class, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testOldAPI_LazySequenceFileOutputFormat() throws Exception {
+    validate(false, org.apache.hadoop.mapred.SequenceFileOutputFormat.class,
+        false, org.apache.hadoop.mapred.FileOutputCommitter.class, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testNewAPI_LazyTextOutputFormat() throws Exception {
+    validate(true, TextOutputFormat.class, false,
+        FileOutputCommitter.class, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testOldAPI_LazyTextOutputFormat() throws Exception {
+    validate(false, org.apache.hadoop.mapred.TextOutputFormat.class, false,
+        org.apache.hadoop.mapred.FileOutputCommitter.class, true);
+  }
+
+  @Test(timeout = 5000)
+  public void testInvalidBasePath() throws Exception {
+    MultiMROutput outputs = createMROutputs(SequenceFileOutputFormat.class,
+        false, true);
+    try {
+      outputs.getWriter().write(new Text(Integer.toString(0)),
+          new Text("foo"), "/tmp");
+      Assert.assertTrue(false); // should not come here
+    } catch (UnsupportedOperationException uoe) {
+    }
+  }
+
+  private OutputContext createMockOutputContext(UserPayload payload) {
+    OutputContext outputContext = mock(OutputContext.class);
+    ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1);
+    when(outputContext.getUserPayload()).thenReturn(payload);
+    when(outputContext.getApplicationId()).thenReturn(appId);
+    when(outputContext.getTaskVertexIndex()).thenReturn(1);
+    when(outputContext.getTaskAttemptNumber()).thenReturn(1);
+    when(outputContext.getCounters()).thenReturn(new TezCounters());
+    when(outputContext.getStatisticsReporter()).thenReturn(
+        mock(OutputStatisticsReporter.class));
+    return outputContext;
+  }
+
+  private void validate(boolean expectedUseNewAPIValue, Class outputFormat,
+      boolean isMapper, Class committerClass, boolean useLazyOutputFormat)
+          throws InterruptedException, IOException {
+    MultiMROutput output = createMROutputs(outputFormat, isMapper,
+        useLazyOutputFormat);
+
+    assertEquals(isMapper, output.isMapperOutput);
+    assertEquals(expectedUseNewAPIValue, output.useNewApi);
+    if (expectedUseNewAPIValue) {
+      if (useLazyOutputFormat) {
+        assertEquals(LazyOutputFormat.class,
+            output.newOutputFormat.getClass());
+      } else {
+        assertEquals(outputFormat, output.newOutputFormat.getClass());
+      }
+      assertNotNull(output.newApiTaskAttemptContext);
+      assertNull(output.oldOutputFormat);
+      assertEquals(Text.class,
+          output.newApiTaskAttemptContext.getOutputValueClass());
+      assertEquals(Text.class,
+          output.newApiTaskAttemptContext.getOutputKeyClass());
+      assertNull(output.oldApiTaskAttemptContext);
+      assertNotNull(output.newRecordWriters);
+      assertNull(output.oldRecordWriters);
+    } else {
+      if (!useLazyOutputFormat) {
+        assertEquals(outputFormat, output.oldOutputFormat.getClass());
+      } else {
+        assertEquals(org.apache.hadoop.mapred.lib.LazyOutputFormat.class,
+            output.oldOutputFormat.getClass());
+      }
+      assertNull(output.newOutputFormat);
+      assertNotNull(output.oldApiTaskAttemptContext);
+      assertNull(output.newApiTaskAttemptContext);
+      assertEquals(Text.class,
+          output.oldApiTaskAttemptContext.getOutputValueClass());
+      assertEquals(Text.class,
+          output.oldApiTaskAttemptContext.getOutputKeyClass());
+      assertNotNull(output.oldRecordWriters);
+      assertNull(output.newRecordWriters);
+    }
+
+    assertEquals(committerClass, output.committer.getClass());
+    int numOfUniqueKeys = 3;
+    for (int i=0; i<numOfUniqueKeys; i++) {
+      output.getWriter().write(new Text(Integer.toString(i)),
+          new Text("foo"), Integer.toString(i));
+    }
+    output.close();
+    if (expectedUseNewAPIValue) {
+      assertEquals(numOfUniqueKeys, output.newRecordWriters.size());
+    } else {
+      assertEquals(numOfUniqueKeys, output.oldRecordWriters.size());
+    }
+  }
+
+  private MultiMROutput createMROutputs(Class outputFormat,
+      boolean isMapper, boolean useLazyOutputFormat)
+          throws InterruptedException, IOException {
+    String outputPath = "/tmp/output";
+    JobConf conf = new JobConf();
+    conf.setBoolean(MRConfig.IS_MAP_PROCESSOR, isMapper);
+    conf.setOutputKeyClass(Text.class);
+    conf.setOutputValueClass(Text.class);
+    DataSinkDescriptor dataSink = MultiMROutput.createConfigBuilder(
+        conf, outputFormat, outputPath, useLazyOutputFormat).build();
+
+    OutputContext outputContext = createMockOutputContext(
+        dataSink.getOutputDescriptor().getUserPayload());
+    MultiMROutput output = new MultiMROutput(outputContext, 2);
+    output.initialize();
+    return output;
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/a328d469/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java
new file mode 100644
index 0000000..5446ca6
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriterWithBasePath.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Evolving;
+import org.apache.tez.runtime.api.Writer;
+
+/**
+ * A key/value(s) pair based {@link Writer} that supports
+ * output to different files.
+ */
+@Public
+@Evolving
+public abstract class KeyValueWriterWithBasePath extends KeyValueWriter {
+  /**
+   * Writes a key/value pair.
+   *
+   * @param key
+   *          the key to write
+   * @param value
+   *          the value to write
+   * @param basePath
+   *          the base path of the output file.
+   * @throws IOException
+   *           if an error occurs
+   * @throws {@link IOInterruptedException} if IO was interrupted
+   */
+  public abstract void write(Object key, Object value, String basePath)
+      throws IOException;
+}