You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/23 19:45:17 UTC

[03/20] TEZ-444. Rename *.new* packages back to what they should be, remove dead code from the old packages - mapreduce module (part of TEZ-398). (sseth)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
deleted file mode 100644
index d71dba0..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTask.java
+++ /dev/null
@@ -1,731 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce.newprocessor;
-
-import java.io.IOException;
-import java.net.URI;
-import java.text.NumberFormat;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.crypto.SecretKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.FileSystem.Statistics;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.FileAlreadyExistsException;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.MapOutputFile;
-import org.apache.hadoop.mapred.RawKeyValueIterator;
-import org.apache.hadoop.mapred.TaskAttemptContext;
-import org.apache.hadoop.mapred.TaskAttemptID;
-import org.apache.hadoop.mapred.TaskID;
-import org.apache.hadoop.mapreduce.OutputCommitter;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.task.ReduceContextImpl;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
-import org.apache.hadoop.yarn.util.ResourceCalculatorProcessTree;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskStatus.State;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.common.counters.TezCounters;
-import org.apache.tez.dag.records.TezDAGID;
-import org.apache.tez.engine.common.security.JobTokenIdentifier;
-import org.apache.tez.engine.common.security.TokenCache;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.newapi.LogicalOutput;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.records.OutputContext;
-import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
-import org.apache.tez.mapreduce.hadoop.IDConverter;
-import org.apache.tez.mapreduce.hadoop.MRConfig;
-import org.apache.tez.mapreduce.hadoop.MRJobConfig;
-import org.apache.tez.mapreduce.hadoop.newmapred.TaskAttemptContextImpl;
-import org.apache.tez.mapreduce.hadoop.mapreduce.JobContextImpl;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
-//import org.apache.tez.mapreduce.partition.MRPartitioner;
-import org.apache.tez.mapreduce.task.impl.YarnOutputFiles;
-
-@SuppressWarnings("deprecation")
-public abstract class MRTask {
-
-  static final Log LOG = LogFactory.getLog(MRTask.class);
-
-  protected JobConf jobConf;
-  protected JobContext jobContext;
-  protected TaskAttemptContext taskAttemptContext;
-  protected OutputCommitter committer;
-
-  // Current counters
-  transient TezCounters counters;
-  protected GcTimeUpdater gcUpdater;
-  private ResourceCalculatorProcessTree pTree;
-  private long initCpuCumulativeTime = 0;
-  protected TezProcessorContext processorContext;
-  protected TaskAttemptID taskAttemptId;
-  protected Progress progress = new Progress();
-  protected SecretKey jobTokenSecret;
-
-  boolean isMap;
-
-  /* flag to track whether task is done */
-  AtomicBoolean taskDone = new AtomicBoolean(false);
-
-  /** Construct output file names so that, when an output directory listing is
-   * sorted lexicographically, positions correspond to output partitions.*/
-  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
-  static {
-    NUMBER_FORMAT.setMinimumIntegerDigits(5);
-    NUMBER_FORMAT.setGroupingUsed(false);
-  }
-
-  protected MRTaskReporter mrReporter;
-  protected boolean useNewApi;
-
-  /**
-   * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
-   */
-  private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
-     new HashMap<String, FileSystemStatisticUpdater>();
-
-  public MRTask(boolean isMap) {
-    this.isMap = isMap;
-  }
-
-  // TODO how to update progress
-  public void initialize(TezProcessorContext context) throws IOException,
-  InterruptedException {
-
-    DeprecatedKeys.init();
-
-    processorContext = context;
-    counters = context.getCounters();
-    this.taskAttemptId = new TaskAttemptID(
-        new TaskID(
-            Long.toString(context.getApplicationId().getClusterTimestamp()),
-            context.getApplicationId().getId(),
-            (isMap ? TaskType.MAP : TaskType.REDUCE),
-            context.getTaskIndex()),
-          context.getTaskAttemptNumber());
-    // TODO TEZAM4 Figure out initialization / run sequence of Input, Process,
-    // Output. Phase is MR specific.
-    gcUpdater = new GcTimeUpdater(counters);
-
-    byte[] userPayload = context.getUserPayload();
-    Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
-    if (conf instanceof JobConf) {
-      this.jobConf = (JobConf)conf;
-    } else {
-      this.jobConf = new JobConf(conf);
-    }
-    jobConf.set(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID,
-        taskAttemptId.toString());
-    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
-        context.getDAGAttemptNumber());
-
-    initResourceCalculatorPlugin();
-
-    LOG.info("MRTask.inited: taskAttemptId = " + taskAttemptId.toString());
-
-    // TODO Post MRR
-    // A single file per vertex will likely be a better solution. Does not
-    // require translation - client can take care of this. Will work independent
-    // of whether the configuration is for intermediate tasks or not. Has the
-    // overhead of localizing multiple files per job - i.e. the client would
-    // need to write these files to hdfs, add them as local resources per
-    // vertex. A solution like this may be more practical once it's possible to
-    // submit configuration parameters to the AM and effectively tasks via RPC.
-
-    jobConf.set(MRJobConfig.VERTEX_NAME, processorContext.getTaskVertexName());
-
-    if (LOG.isDebugEnabled() && userPayload != null) {
-      Iterator<Entry<String, String>> iter = jobConf.iterator();
-      String taskIdStr = taskAttemptId.getTaskID().toString();
-      while (iter.hasNext()) {
-        Entry<String, String> confEntry = iter.next();
-        LOG.debug("TaskConf Entry"
-            + ", taskId=" + taskIdStr
-            + ", key=" + confEntry.getKey()
-            + ", value=" + confEntry.getValue());
-      }
-    }
-
-    configureMRTask();
-  }
-
-  private void configureMRTask()
-      throws IOException, InterruptedException {
-
-    Credentials credentials = UserGroupInformation.getCurrentUser()
-        .getCredentials();
-    jobConf.setCredentials(credentials);
-    // TODO Can this be avoided all together. Have the MRTezOutputCommitter use
-    // the Tez parameter.
-    // TODO This could be fetched from the env if YARN is setting it for all
-    // Containers.
-    // Set it in conf, so as to be able to be used the the OutputCommitter.
-
-    jobConf.setClass(MRConfig.TASK_LOCAL_OUTPUT_CLASS, YarnOutputFiles.class,
-        MapOutputFile.class); // MR
-
-    // Not needed. This is probably being set via the source/consumer meta
-    Token<JobTokenIdentifier> jobToken = TokenCache.getJobToken(credentials);
-    if (jobToken != null) {
-      // Will MR ever run without a job token.
-      SecretKey sk = JobTokenSecretManager.createSecretKey(jobToken
-          .getPassword());
-      this.jobTokenSecret = sk;
-    } else {
-      LOG.warn("No job token set");
-    }
-
-    configureLocalDirs();
-
-    if (jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY) != null) {
-      jobConf.set(MRJobConfig.MAPREDUCE_JOB_CREDENTIALS_BINARY,
-          jobConf.get(TezJobConfig.DAG_CREDENTIALS_BINARY));
-    }
-
-    // Set up the DistributedCache related configs
-    setupDistributedCacheConfig(jobConf);
-  }
-
-  private void configureLocalDirs() throws IOException {
-    // TODO NEWTEZ Is most of this functionality required ?
-    jobConf.setStrings(TezJobConfig.LOCAL_DIRS, processorContext.getWorkDirs());
-    jobConf.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, System.getenv(Environment.PWD.name()));
-
-    jobConf.setStrings(MRConfig.LOCAL_DIR, processorContext.getWorkDirs());
-
-    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-    Path workDir = null;
-    // First, try to find the JOB_LOCAL_DIR on this host.
-    try {
-      workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
-    } catch (DiskErrorException e) {
-      // DiskErrorException means dir not found. If not found, it will
-      // be created below.
-    }
-    if (workDir == null) {
-      // JOB_LOCAL_DIR doesn't exist on this host -- Create it.
-      workDir = lDirAlloc.getLocalPathForWrite("work", jobConf);
-      FileSystem lfs = FileSystem.getLocal(jobConf).getRaw();
-      boolean madeDir = false;
-      try {
-        madeDir = lfs.mkdirs(workDir);
-      } catch (FileAlreadyExistsException e) {
-        // Since all tasks will be running in their own JVM, the race condition
-        // exists where multiple tasks could be trying to create this directory
-        // at the same time. If this task loses the race, it's okay because
-        // the directory already exists.
-        madeDir = true;
-        workDir = lDirAlloc.getLocalPathToRead("work", jobConf);
-      }
-      if (!madeDir) {
-          throw new IOException("Mkdirs failed to create "
-              + workDir.toString());
-      }
-    }
-    // TODO NEWTEZ Is this required ?
-    jobConf.set(TezJobConfig.JOB_LOCAL_DIR, workDir.toString());
-    jobConf.set(MRJobConfig.JOB_LOCAL_DIR, workDir.toString());
-  }
-
-  /**
-   * Set up the DistributedCache related configs to make
-   * {@link DistributedCache#getLocalCacheFiles(Configuration)} and
-   * {@link DistributedCache#getLocalCacheArchives(Configuration)} working.
-   *
-   * @param job
-   * @throws IOException
-   */
-  private static void setupDistributedCacheConfig(final JobConf job)
-      throws IOException {
-
-    String localWorkDir = (job.get(TezJobConfig.TASK_LOCAL_RESOURCE_DIR));
-    // ^ ^ all symlinks are created in the current work-dir
-
-    // Update the configuration object with localized archives.
-    URI[] cacheArchives = DistributedCache.getCacheArchives(job);
-    if (cacheArchives != null) {
-      List<String> localArchives = new ArrayList<String>();
-      for (int i = 0; i < cacheArchives.length; ++i) {
-        URI u = cacheArchives[i];
-        Path p = new Path(u);
-        Path name = new Path((null == u.getFragment()) ? p.getName()
-            : u.getFragment());
-        String linkName = name.toUri().getPath();
-        localArchives.add(new Path(localWorkDir, linkName).toUri().getPath());
-      }
-      if (!localArchives.isEmpty()) {
-        job.set(MRJobConfig.CACHE_LOCALARCHIVES, StringUtils
-            .arrayToString(localArchives.toArray(new String[localArchives
-                .size()])));
-      }
-    }
-
-    // Update the configuration object with localized files.
-    URI[] cacheFiles = DistributedCache.getCacheFiles(job);
-    if (cacheFiles != null) {
-      List<String> localFiles = new ArrayList<String>();
-      for (int i = 0; i < cacheFiles.length; ++i) {
-        URI u = cacheFiles[i];
-        Path p = new Path(u);
-        Path name = new Path((null == u.getFragment()) ? p.getName()
-            : u.getFragment());
-        String linkName = name.toUri().getPath();
-        localFiles.add(new Path(localWorkDir, linkName).toUri().getPath());
-      }
-      if (!localFiles.isEmpty()) {
-        job.set(MRJobConfig.CACHE_LOCALFILES, StringUtils
-            .arrayToString(localFiles.toArray(new String[localFiles.size()])));
-      }
-    }
-  }
-
-
-  private void initResourceCalculatorPlugin() {
-    Class<? extends ResourceCalculatorProcessTree> clazz =
-        this.jobConf.getClass(MRConfig.RESOURCE_CALCULATOR_PROCESS_TREE,
-            null, ResourceCalculatorProcessTree.class);
-    pTree = ResourceCalculatorProcessTree
-        .getResourceCalculatorProcessTree(System.getenv().get("JVM_PID"), clazz, this.jobConf);
-    LOG.info(" Using ResourceCalculatorProcessTree : " + pTree);
-    if (pTree != null) {
-      pTree.updateProcessTree();
-      initCpuCumulativeTime = pTree.getCumulativeCpuTime();
-    }
-  }
-
-  public TezProcessorContext getUmbilical() {
-    return this.processorContext;
-  }
-
-  public void initTask() throws IOException,
-                                InterruptedException {
-    this.mrReporter = new MRTaskReporter(processorContext);
-    this.useNewApi = jobConf.getUseNewMapper();
-    TezDAGID dagId = IDConverter.fromMRTaskAttemptId(taskAttemptId).getTaskID()
-        .getVertexID().getDAGId();
-
-    this.jobContext = new JobContextImpl(jobConf, dagId, mrReporter);
-    this.taskAttemptContext =
-        new TaskAttemptContextImpl(jobConf, taskAttemptId, mrReporter);
-
-    if (getState() == State.UNASSIGNED) {
-      setState(State.RUNNING);
-    }
-
-//    combineProcessor = null;
-//    boolean useCombiner = false;
-//    if (useNewApi) {
-//      try {
-//        useCombiner = (taskAttemptContext.getCombinerClass() != null);
-//      } catch (ClassNotFoundException e) {
-//        throw new IOException("Could not find combiner class", e);
-//      }
-//    } else {
-//      useCombiner = (job.getCombinerClass() != null);
-//    }
-//    if (useCombiner) {
-//      combineProcessor = new MRCombiner(this);
-//      combineProcessor.initialize(job, getTaskReporter());
-//    } else {
-//    }
-
-    localizeConfiguration(jobConf);
-  }
-
-//  public void initPartitioner(JobConf job) throws IOException,
-//      InterruptedException {
-//    partitioner = new MRPartitioner(this);
-//    ((MRPartitioner) partitioner).initialize(job, getTaskReporter());
-//  }
-
-  public MRTaskReporter getMRReporter() {
-    return mrReporter;
-  }
-
-  public void setState(State state) {
-    // TODO Auto-generated method stub
-
-  }
-
-  public State getState() {
-    // TODO Auto-generated method stub
-    return null;
-  }
-
-  public OutputCommitter getCommitter() {
-    return committer;
-  }
-
-  public void setCommitter(OutputCommitter committer) {
-    this.committer = committer;
-  }
-
-  public TezCounters getCounters() { return counters; }
-
-  public void setConf(JobConf jobConf) {
-    this.jobConf = jobConf;
-  }
-
-  public JobConf getConf() {
-    return this.jobConf;
-  }
-
-  /**
-   * Gets a handle to the Statistics instance based on the scheme associated
-   * with path.
-   *
-   * @param path the path.
-   * @param conf the configuration to extract the scheme from if not part of
-   *   the path.
-   * @return a Statistics instance, or null if none is found for the scheme.
-   */
-  @Private
-  public static List<Statistics> getFsStatistics(Path path, Configuration conf) throws IOException {
-    List<Statistics> matchedStats = new ArrayList<FileSystem.Statistics>();
-    path = path.getFileSystem(conf).makeQualified(path);
-    String scheme = path.toUri().getScheme();
-    for (Statistics stats : FileSystem.getAllStatistics()) {
-      if (stats.getScheme().equals(scheme)) {
-        matchedStats.add(stats);
-      }
-    }
-    return matchedStats;
-  }
-
-  @Private
-  public synchronized String getOutputName() {
-    return "part-" + NUMBER_FORMAT.format(taskAttemptId.getTaskID().getId());
-  }
-
-  public void waitBeforeCompletion(MRTaskReporter reporter) throws IOException,
-      InterruptedException {
-  }
-
-  public void outputReady(MRTaskReporter reporter, OutputContext outputContext)
-      throws IOException,
-      InterruptedException {
-    LOG.info("Task: " + taskAttemptId + " reporting outputReady");
-    updateCounters();
-    statusUpdate();
-  }
-
-  public void done(LogicalOutput output) throws IOException, InterruptedException {
-    updateCounters();
-
-    LOG.info("Task:" + taskAttemptId + " is done."
-        + " And is in the process of committing");
-    // TODO change this to use the new context
-    // TODO TEZ Interaciton between Commit and OutputReady. Merge ?
-    if (output instanceof SimpleOutput) {
-      SimpleOutput sOut = (SimpleOutput)output;
-      if (sOut.isCommitRequired()) {
-        //wait for commit approval and commit
-        // TODO EVENTUALLY - Commit is not required for map tasks.
-        // skip a couple of RPCs before exiting.
-        commit(sOut);
-      }
-    }
-    taskDone.set(true);
-    // Make sure we send at least one set of counter increments. It's
-    // ok to call updateCounters() in this thread after comm thread stopped.
-    updateCounters();
-    sendLastUpdate();
-    //signal the tasktracker that we are done
-    //sendDone(umbilical);
-  }
-
-  /**
-   * Send a status update to the task tracker
-   * @param umbilical
-   * @throws IOException
-   */
-  public void statusUpdate() throws IOException, InterruptedException {
-    // TODO call progress update here if not being called within Map/Reduce
-  }
-
-  /**
-   * Sends last status update before sending umbilical.done();
-   */
-  private void sendLastUpdate()
-      throws IOException, InterruptedException {
-    statusUpdate();
-  }
-
-  private void commit(SimpleOutput output) throws IOException {
-    int retries = 3;
-    while (true) {
-      // This will loop till the AM asks for the task to be killed. As
-      // against, the AM sending a signal to the task to kill itself
-      // gracefully.
-      try {
-        if (processorContext.canCommit()) {
-          break;
-        }
-        Thread.sleep(1000);
-      } catch(InterruptedException ie) {
-        //ignore
-      } catch (IOException ie) {
-        LOG.warn("Failure sending canCommit: "
-            + StringUtils.stringifyException(ie));
-        if (--retries == 0) {
-          throw ie;
-        }
-      }
-    }
-
-    // task can Commit now
-    try {
-      LOG.info("Task " + taskAttemptId + " is allowed to commit now");
-      output.commit();
-      return;
-    } catch (IOException iee) {
-      LOG.warn("Failure committing: " +
-          StringUtils.stringifyException(iee));
-      //if it couldn't commit a successfully then delete the output
-      discardOutput(output);
-      throw iee;
-    }
-  }
-
-  private
-  void discardOutput(SimpleOutput output) {
-    try {
-      output.abort();
-    } catch (IOException ioe)  {
-      LOG.warn("Failure cleaning up: " +
-               StringUtils.stringifyException(ioe));
-    }
-  }
-
-
-  public void updateCounters() {
-    // TODO Auto-generated method stub
-    // TODO TEZAM Implement.
-    Map<String, List<FileSystem.Statistics>> map = new
-        HashMap<String, List<FileSystem.Statistics>>();
-    for(Statistics stat: FileSystem.getAllStatistics()) {
-      String uriScheme = stat.getScheme();
-      if (map.containsKey(uriScheme)) {
-        List<FileSystem.Statistics> list = map.get(uriScheme);
-        list.add(stat);
-      } else {
-        List<FileSystem.Statistics> list = new ArrayList<FileSystem.Statistics>();
-        list.add(stat);
-        map.put(uriScheme, list);
-      }
-    }
-    for (Map.Entry<String, List<FileSystem.Statistics>> entry: map.entrySet()) {
-      FileSystemStatisticUpdater updater = statisticUpdaters.get(entry.getKey());
-      if(updater==null) {//new FileSystem has been found in the cache
-        updater =
-            new FileSystemStatisticUpdater(counters, entry.getValue(),
-                entry.getKey());
-        statisticUpdaters.put(entry.getKey(), updater);
-      }
-      updater.updateCounters();
-    }
-
-    gcUpdater.incrementGcCounter();
-    updateResourceCounters();
-  }
-
-  /**
-   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
-   * current total committed heap space usage of this JVM.
-   */
-  private void updateHeapUsageCounter() {
-    long currentHeapUsage = Runtime.getRuntime().totalMemory();
-    counters.findCounter(TaskCounter.COMMITTED_HEAP_BYTES)
-            .setValue(currentHeapUsage);
-  }
-
-  /**
-   * Update resource information counters
-   */
-  void updateResourceCounters() {
-    // Update generic resource counters
-    updateHeapUsageCounter();
-
-    // Updating resources specified in ResourceCalculatorPlugin
-    if (pTree == null) {
-      return;
-    }
-    pTree.updateProcessTree();
-    long cpuTime = pTree.getCumulativeCpuTime();
-    long pMem = pTree.getCumulativeRssmem();
-    long vMem = pTree.getCumulativeVmem();
-    // Remove the CPU time consumed previously by JVM reuse
-    cpuTime -= initCpuCumulativeTime;
-    counters.findCounter(TaskCounter.CPU_MILLISECONDS).setValue(cpuTime);
-    counters.findCounter(TaskCounter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
-    counters.findCounter(TaskCounter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
-  }
-
-
-  public static String normalizeStatus(String status, Configuration conf) {
-    // Check to see if the status string is too long
-    // and truncate it if needed.
-    int progressStatusLength = conf.getInt(
-        MRConfig.PROGRESS_STATUS_LEN_LIMIT_KEY,
-        MRConfig.PROGRESS_STATUS_LEN_LIMIT_DEFAULT);
-    if (status.length() > progressStatusLength) {
-      LOG.warn("Task status: \"" + status + "\" truncated to max limit ("
-          + progressStatusLength + " characters)");
-      status = status.substring(0, progressStatusLength);
-    }
-    return status;
-  }
-
-  protected static <INKEY,INVALUE,OUTKEY,OUTVALUE>
-  org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
-  createReduceContext(org.apache.hadoop.mapreduce.Reducer
-                        <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
-                      Configuration job,
-                      TaskAttemptID taskId,
-                      final TezRawKeyValueIterator rIter,
-                      org.apache.hadoop.mapreduce.Counter inputKeyCounter,
-                      org.apache.hadoop.mapreduce.Counter inputValueCounter,
-                      org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output,
-                      org.apache.hadoop.mapreduce.OutputCommitter committer,
-                      org.apache.hadoop.mapreduce.StatusReporter reporter,
-                      RawComparator<INKEY> comparator,
-                      Class<INKEY> keyClass, Class<INVALUE> valueClass
-  ) throws IOException, InterruptedException {
-    RawKeyValueIterator r =
-        new RawKeyValueIterator() {
-
-          @Override
-          public boolean next() throws IOException {
-            return rIter.next();
-          }
-
-          @Override
-          public DataInputBuffer getValue() throws IOException {
-            return rIter.getValue();
-          }
-
-          @Override
-          public Progress getProgress() {
-            return rIter.getProgress();
-          }
-
-          @Override
-          public DataInputBuffer getKey() throws IOException {
-            return rIter.getKey();
-          }
-
-          @Override
-          public void close() throws IOException {
-            rIter.close();
-          }
-        };
-    org.apache.hadoop.mapreduce.ReduceContext<INKEY, INVALUE, OUTKEY, OUTVALUE>
-    reduceContext =
-      new ReduceContextImpl<INKEY, INVALUE, OUTKEY, OUTVALUE>(
-          job,
-          taskId,
-          r,
-          inputKeyCounter,
-          inputValueCounter,
-          output,
-          committer,
-          reporter,
-          comparator,
-          keyClass,
-          valueClass);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Using key class: " + keyClass
-          + ", valueClass: " + valueClass);
-    }
-
-    org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
-        reducerContext =
-          new WrappedReducer<INKEY, INVALUE, OUTKEY, OUTVALUE>().getReducerContext(
-              reduceContext);
-
-    return reducerContext;
-  }
-
-  public void taskCleanup()
-      throws IOException, InterruptedException {
-    // set phase for this task
-    statusUpdate();
-    LOG.info("Runnning cleanup for the task");
-    // do the cleanup
-    committer.abortTask(taskAttemptContext);
-  }
-
-  public void localizeConfiguration(JobConf jobConf)
-      throws IOException, InterruptedException {
-    jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
-    jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
-    jobConf.setInt(JobContext.TASK_PARTITION,
-        taskAttemptId.getTaskID().getId());
-    jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
-  }
-
-  public abstract TezCounter getOutputRecordsCounter();
-
-  public abstract TezCounter getInputRecordsCounter();
-
-  public org.apache.hadoop.mapreduce.TaskAttemptContext getTaskAttemptContext() {
-    return taskAttemptContext;
-  }
-
-  public JobContext getJobContext() {
-    return jobContext;
-  }
-
-  public TaskAttemptID getTaskAttemptId() {
-    return taskAttemptId;
-  }
-
-  public TezProcessorContext getTezEngineTaskContext() {
-    return processorContext;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
deleted file mode 100644
index c7c9567..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/MRTaskReporter.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.mapreduce.newprocessor;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.mapred.Counters;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.newapi.TezInputContext;
-import org.apache.tez.engine.newapi.TezOutputContext;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.engine.newapi.TezTaskContext;
-import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
-import org.apache.tez.mapreduce.hadoop.newmapred.MRReporter;
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class MRTaskReporter
-    extends org.apache.hadoop.mapreduce.StatusReporter
-    implements Reporter {
-
-  private final TezTaskContext context;
-  private final boolean isProcessorContext;
-  private final Reporter reporter;
-
-  private InputSplit split = null;
-
-  public MRTaskReporter(TezProcessorContext context) {
-    this.context = context;
-    this.reporter = new MRReporter(context);
-    this.isProcessorContext = true;
-  }
-
-  public MRTaskReporter(TezOutputContext context) {
-    this.context = context;
-    this.reporter = new MRReporter(context);
-    this.isProcessorContext = false;
-  }
-  
-  public MRTaskReporter(TezInputContext context) {
-    this.context= context;
-    this.reporter = new MRReporter(context);
-    this.isProcessorContext = false;
-  }
-
-  public void setProgress(float progress) {
-    if (isProcessorContext) {
-      ((TezProcessorContext)context).setProgress(progress);
-    } else {
-      // TODO FIXME NEWTEZ - will simpleoutput's reporter use this api?
-    }
-  }
-
-  public void setStatus(String status) {
-    reporter.setStatus(status);
-  }
-
-  public float getProgress() {
-    return reporter.getProgress();
-  };
-
-  public void progress() {
-    reporter.progress();
-  }
-
-  public Counters.Counter getCounter(String group, String name) {
-    TezCounter counter = context.getCounters().findCounter(group, name);
-    MRCounters.MRCounter mrCounter = null;
-    if (counter != null) {
-      mrCounter = new MRCounters.MRCounter(counter);
-    }
-    return mrCounter;
-  }
-
-  public Counters.Counter getCounter(Enum<?> name) {
-    TezCounter counter = context.getCounters().findCounter(name);
-    MRCounters.MRCounter mrCounter = null;
-    if (counter != null) {
-      mrCounter = new MRCounters.MRCounter(counter);
-    }
-    return mrCounter;
-  }
-
-  public void incrCounter(Enum<?> key, long amount) {
-    reporter.incrCounter(key, amount);
-  }
-
-  public void incrCounter(String group, String counter, long amount) {
-    reporter.incrCounter(group, counter, amount);
-  }
-
-  public void setInputSplit(InputSplit split) {
-    this.split = split;
-  }
-
-  public InputSplit getInputSplit() throws UnsupportedOperationException {
-    if (split == null) {
-      throw new UnsupportedOperationException("Input only available on map");
-    } else {
-      return split;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
deleted file mode 100644
index 21df743..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/map/MapProcessor.java
+++ /dev/null
@@ -1,341 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.newprocessor.map;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.MapRunnable;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.api.TezException;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVReader;
-import org.apache.tez.engine.newapi.KVWriter;
-import org.apache.tez.engine.newapi.LogicalIOProcessor;
-import org.apache.tez.engine.newapi.LogicalInput;
-import org.apache.tez.engine.newapi.LogicalOutput;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.mapreduce.hadoop.newmapreduce.MapContextImpl;
-import org.apache.tez.mapreduce.newinput.SimpleInput;
-import org.apache.tez.mapreduce.newinput.SimpleInputLegacy;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
-import org.apache.tez.mapreduce.newprocessor.MRTask;
-import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class MapProcessor extends MRTask implements LogicalIOProcessor {
-
-  private static final Log LOG = LogFactory.getLog(MapProcessor.class);
-
-  public MapProcessor(){
-    super(true);
-  }
-
-  @Override
-  public void initialize(TezProcessorContext processorContext)
-      throws IOException {
-    try {
-      super.initialize(processorContext);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-
-  @Override
-  public void handleEvents(List<Event> processorEvents) {
-    // TODO Auto-generated method stub
-
-  }
-
-  public void close() throws IOException {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
-
-    LOG.info("Running map: " + processorContext.getUniqueIdentifier());
-
-    initTask();
-
-    if (inputs.size() != 1
-        || outputs.size() != 1) {
-      throw new IOException("Cannot handle multiple inputs or outputs"
-          + ", inputCount=" + inputs.size()
-          + ", outputCount=" + outputs.size());
-    }
-    LogicalInput in = inputs.values().iterator().next();
-    LogicalOutput out = outputs.values().iterator().next();
-
-    // Sanity check
-    if (!(in instanceof SimpleInputLegacy)) {
-      throw new IOException(new TezException(
-          "Only Simple Input supported. Input: " + in.getClass()));
-    }
-    SimpleInputLegacy input = (SimpleInputLegacy)in;
-
-    KVWriter kvWriter = null;
-    if (!(out instanceof OnFileSortedOutput)) {
-      kvWriter = ((SimpleOutput)out).getWriter();
-    } else {
-      kvWriter = ((OnFileSortedOutput)out).getWriter();
-    }
-
-    if (useNewApi) {
-      runNewMapper(jobConf, mrReporter, input, kvWriter);
-    } else {
-      runOldMapper(jobConf, mrReporter, input, kvWriter);
-    }
-
-    done(out);
-  }
-
-  void runOldMapper(
-      final JobConf job,
-      final MRTaskReporter reporter,
-      final SimpleInputLegacy input,
-      final KVWriter output
-      ) throws IOException, InterruptedException {
-
-    // Initialize input in-line since it sets parameters which may be used by the processor.
-    // Done only for SimpleInput.
-    // TODO use new method in SimpleInput to get required info
-    //input.initialize(job, master);
-
-    RecordReader in = new OldRecordReader(input);
-
-    OutputCollector collector = new OldOutputCollector(output);
-
-    MapRunnable runner =
-        (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
-
-    runner.run(in, collector, (Reporter)reporter);
-    // start the sort phase only if there are reducers
-    this.statusUpdate();
-  }
-
-  private void runNewMapper(final JobConf job,
-      MRTaskReporter reporter,
-      final SimpleInputLegacy in,
-      KVWriter out
-      ) throws IOException, InterruptedException {
-
-    // Initialize input in-line since it sets parameters which may be used by the processor.
-    // Done only for SimpleInput.
-    // TODO use new method in SimpleInput to get required info
-    //in.initialize(job, master);
-
-    // make a task context so we can get the classes
-    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
-        getTaskAttemptContext();
-
-    // make a mapper
-    org.apache.hadoop.mapreduce.Mapper mapper;
-    try {
-      mapper = (org.apache.hadoop.mapreduce.Mapper)
-          ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
-    } catch (ClassNotFoundException cnfe) {
-      throw new IOException(cnfe);
-    }
-
-    org.apache.hadoop.mapreduce.RecordReader input =
-        new NewRecordReader(in);
-
-    org.apache.hadoop.mapreduce.RecordWriter output =
-        new NewOutputCollector(out);
-
-    org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
-
-    org.apache.hadoop.mapreduce.MapContext
-    mapContext =
-    new MapContextImpl(
-        job, taskAttemptId,
-        input, output,
-        getCommitter(),
-        processorContext, split);
-
-    org.apache.hadoop.mapreduce.Mapper.Context mapperContext =
-        new WrappedMapper().getMapContext(mapContext);
-
-    input.initialize(split, mapperContext);
-    mapper.run(mapperContext);
-    this.statusUpdate();
-    input.close();
-    output.close(mapperContext);
-  }
-
-  private static class NewRecordReader extends
-      org.apache.hadoop.mapreduce.RecordReader {
-    private final SimpleInput in;
-    private KVReader reader;
-
-    private NewRecordReader(SimpleInput in) throws IOException {
-      this.in = in;
-      this.reader = in.getReader();
-    }
-
-    @Override
-    public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
-        TaskAttemptContext context) throws IOException,
-        InterruptedException {
-      //in.initializeNewRecordReader(split, context);
-    }
-
-    @Override
-    public boolean nextKeyValue() throws IOException,
-        InterruptedException {
-      return reader.next();
-    }
-
-    @Override
-    public Object getCurrentKey() throws IOException,
-        InterruptedException {
-      return reader.getCurrentKV().getKey();
-    }
-
-    @Override
-    public Object getCurrentValue() throws IOException,
-        InterruptedException {
-      return reader.getCurrentKV().getValues().iterator().next();
-    }
-
-    @Override
-    public float getProgress() throws IOException, InterruptedException {
-      return in.getProgress();
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-  }
-
-  private static class OldRecordReader implements RecordReader {
-    private final SimpleInputLegacy simpleInput;
-
-    private OldRecordReader(SimpleInputLegacy simpleInput) {
-      this.simpleInput = simpleInput;
-    }
-
-    @Override
-    public boolean next(Object key, Object value) throws IOException {
-      // TODO broken
-//      simpleInput.setKey(key);
-//      simpleInput.setValue(value);
-//      try {
-//        return simpleInput.hasNext();
-//      } catch (InterruptedException ie) {
-//        throw new IOException(ie);
-//      }
-      return simpleInput.getOldRecordReader().next(key, value);
-    }
-
-    @Override
-    public Object createKey() {
-      return simpleInput.getOldRecordReader().createKey();
-    }
-
-    @Override
-    public Object createValue() {
-      return simpleInput.getOldRecordReader().createValue();
-    }
-
-    @Override
-    public long getPos() throws IOException {
-      return simpleInput.getOldRecordReader().getPos();
-    }
-
-    @Override
-    public void close() throws IOException {
-    }
-
-    @Override
-    public float getProgress() throws IOException {
-      try {
-        return simpleInput.getProgress();
-      } catch (InterruptedException ie) {
-        throw new IOException(ie);
-      }
-    }
-  }
-
-  private static class OldOutputCollector
-  implements OutputCollector {
-    private final KVWriter output;
-
-    OldOutputCollector(KVWriter output) {
-      this.output = output;
-    }
-
-    public void collect(Object key, Object value) throws IOException {
-        output.write(key, value);
-    }
-  }
-
-  private class NewOutputCollector
-    extends org.apache.hadoop.mapreduce.RecordWriter {
-    private final KVWriter out;
-
-    NewOutputCollector(KVWriter out) throws IOException {
-      this.out = out;
-    }
-
-    @Override
-    public void write(Object key, Object value) throws IOException, InterruptedException {
-      out.write(key, value);
-    }
-
-    @Override
-    public void close(TaskAttemptContext context
-                      ) throws IOException, InterruptedException {
-    }
-  }
-
-  @Override
-  public void localizeConfiguration(JobConf jobConf)
-      throws IOException, InterruptedException {
-    super.localizeConfiguration(jobConf);
-    jobConf.setBoolean(JobContext.TASK_ISMAP, true);
-  }
-
-  @Override
-  public TezCounter getOutputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
-  }
-
-  @Override
-  public TezCounter getInputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
deleted file mode 100644
index cedcdd6..0000000
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/newprocessor/reduce/ReduceProcessor.java
+++ /dev/null
@@ -1,353 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.mapreduce.newprocessor.reduce;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobContext;
-import org.apache.hadoop.mapred.OutputCollector;
-import org.apache.hadoop.mapred.Reducer;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.util.Progress;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.counters.TaskCounter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
-import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVReader;
-import org.apache.tez.engine.newapi.KVWriter;
-import org.apache.tez.engine.newapi.LogicalIOProcessor;
-import org.apache.tez.engine.newapi.LogicalInput;
-import org.apache.tez.engine.newapi.LogicalOutput;
-import org.apache.tez.engine.newapi.TezProcessorContext;
-import org.apache.tez.mapreduce.newinput.ShuffledMergedInputLegacy;
-import org.apache.tez.mapreduce.newoutput.SimpleOutput;
-import org.apache.tez.mapreduce.newprocessor.MRTask;
-import org.apache.tez.mapreduce.newprocessor.MRTaskReporter;
-
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class ReduceProcessor
-extends MRTask
-implements LogicalIOProcessor {
-
-  private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
-
-  private Counter reduceInputKeyCounter;
-  private Counter reduceInputValueCounter;
-
-  public ReduceProcessor() {
-    super(false);
-  }
-
-  @Override
-  public void initialize(TezProcessorContext processorContext)
-      throws IOException {
-    try {
-      super.initialize(processorContext);
-    } catch (InterruptedException e) {
-      throw new IOException(e);
-    }
-  }
-
-
-  @Override
-  public void handleEvents(List<Event> processorEvents) {
-    // TODO Auto-generated method stub
-
-  }
-
-  public void close() throws IOException {
-    // TODO Auto-generated method stub
-
-  }
-
-  @Override
-  public void run(Map<String, LogicalInput> inputs,
-      Map<String, LogicalOutput> outputs) throws Exception {
-
-    LOG.info("Running reduce: " + processorContext.getUniqueIdentifier());
-
-    initTask();
-
-    if (outputs.size() <= 0 || outputs.size() > 1) {
-      throw new IOException("Invalid number of outputs"
-          + ", outputCount=" + outputs.size());
-    }
-
-    if (inputs.size() <= 0 || inputs.size() > 1) {
-      throw new IOException("Invalid number of inputs"
-          + ", inputCount=" + inputs.size());
-    }
-
-    LogicalInput in = inputs.values().iterator().next();
-    LogicalOutput out = outputs.values().iterator().next();
-
-    this.statusUpdate();
-
-    Class keyClass = ConfigUtils.getIntermediateInputKeyClass(jobConf);
-    Class valueClass = ConfigUtils.getIntermediateInputValueClass(jobConf);
-    LOG.info("Using keyClass: " + keyClass);
-    LOG.info("Using valueClass: " + valueClass);
-    RawComparator comparator =
-        ConfigUtils.getInputKeySecondaryGroupingComparator(jobConf);
-    LOG.info("Using comparator: " + comparator);
-
-    reduceInputKeyCounter =
-        mrReporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
-    reduceInputValueCounter =
-        mrReporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
-
-    // Sanity check
-    if (!(in instanceof ShuffledMergedInputLegacy)) {
-      throw new IOException("Illegal input to reduce: " + in.getClass());
-    }
-    ShuffledMergedInputLegacy shuffleInput = (ShuffledMergedInputLegacy)in;
-    KVReader kvReader = shuffleInput.getReader();
-
-    KVWriter kvWriter = null;
-    if((out instanceof SimpleOutput)) {
-      kvWriter = ((SimpleOutput) out).getWriter();
-    } else if ((out instanceof OnFileSortedOutput)) {
-      kvWriter = ((OnFileSortedOutput) out).getWriter();
-    } else {
-      throw new IOException("Illegal input to reduce: " + in.getClass());
-    }
-
-    if (useNewApi) {
-      try {
-        runNewReducer(
-            jobConf,
-            mrReporter,
-            shuffleInput, comparator,  keyClass, valueClass,
-            kvWriter);
-      } catch (ClassNotFoundException cnfe) {
-        throw new IOException(cnfe);
-      }
-    } else {
-      runOldReducer(
-          jobConf, mrReporter,
-          kvReader, comparator, keyClass, valueClass, kvWriter);
-    }
-
-    done(out);
-  }
-
-  void runOldReducer(JobConf job,
-      final MRTaskReporter reporter,
-      KVReader input,
-      RawComparator comparator,
-      Class keyClass,
-      Class valueClass,
-      final KVWriter output) throws IOException, InterruptedException {
-
-    Reducer reducer =
-        ReflectionUtils.newInstance(job.getReducerClass(), job);
-
-    // make output collector
-
-    OutputCollector collector =
-        new OutputCollector() {
-      public void collect(Object key, Object value)
-          throws IOException {
-        output.write(key, value);
-      }
-    };
-
-    // apply reduce function
-    try {
-      ReduceValuesIterator values =
-          new ReduceValuesIterator(
-              input, reporter, reduceInputValueCounter);
-
-      values.informReduceProgress();
-      while (values.more()) {
-        reduceInputKeyCounter.increment(1);
-        reducer.reduce(values.getKey(), values, collector, reporter);
-        values.informReduceProgress();
-      }
-
-      //Clean up: repeated in catch block below
-      reducer.close();
-      //End of clean up.
-    } catch (IOException ioe) {
-      try {
-        reducer.close();
-      } catch (IOException ignored) {
-      }
-
-      throw ioe;
-    }
-  }
-
-  private static class ReduceValuesIterator<KEY,VALUE>
-  implements Iterator<VALUE> {
-    private Counter reduceInputValueCounter;
-    private KVReader in;
-    private Progressable reporter;
-    private Object currentKey;
-    private Iterator<Object> currentValues;
-
-    public ReduceValuesIterator (KVReader in,
-        Progressable reporter,
-        Counter reduceInputValueCounter)
-            throws IOException {
-      this.reduceInputValueCounter = reduceInputValueCounter;
-      this.in = in;
-      this.reporter = reporter;
-    }
-
-    public boolean more() throws IOException {
-      boolean more = in.next();
-      if(more) {
-        currentKey = in.getCurrentKV().getKey();
-        currentValues = in.getCurrentKV().getValues().iterator();
-      } else {
-        currentKey = null;
-        currentValues = null;
-      }
-      return more;
-    }
-
-    public KEY getKey() throws IOException {
-      return (KEY) currentKey;
-    }
-
-    public void informReduceProgress() {
-      reporter.progress();
-    }
-
-    @Override
-    public boolean hasNext() {
-      return currentValues.hasNext();
-    }
-
-    @Override
-    public VALUE next() {
-      reduceInputValueCounter.increment(1);
-      return (VALUE) currentValues.next();
-    }
-
-    @Override
-    public void remove() {
-      throw new UnsupportedOperationException();
-    }
-
-  }
-
-  void runNewReducer(JobConf job,
-      final MRTaskReporter reporter,
-      ShuffledMergedInputLegacy input,
-      RawComparator comparator,
-      Class keyClass,
-      Class valueClass,
-      final KVWriter out
-      ) throws IOException,InterruptedException,
-      ClassNotFoundException {
-
-    // make a task context so we can get the classes
-    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext = getTaskAttemptContext();
-
-    // make a reducer
-    org.apache.hadoop.mapreduce.Reducer reducer =
-        (org.apache.hadoop.mapreduce.Reducer)
-        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
-
-    // wrap value iterator to report progress.
-    final TezRawKeyValueIterator rawIter = input.getIterator();
-    TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
-      public void close() throws IOException {
-        rawIter.close();
-      }
-      public DataInputBuffer getKey() throws IOException {
-        return rawIter.getKey();
-      }
-      public Progress getProgress() {
-        return rawIter.getProgress();
-      }
-      public DataInputBuffer getValue() throws IOException {
-        return rawIter.getValue();
-      }
-      public boolean next() throws IOException {
-        boolean ret = rawIter.next();
-        reporter.setProgress(rawIter.getProgress().getProgress());
-        return ret;
-      }
-    };
-
-    org.apache.hadoop.mapreduce.RecordWriter trackedRW =
-        new org.apache.hadoop.mapreduce.RecordWriter() {
-
-      @Override
-      public void write(Object key, Object value) throws IOException,
-      InterruptedException {
-        out.write(key, value);
-      }
-
-      @Override
-      public void close(TaskAttemptContext context) throws IOException,
-      InterruptedException {
-      }
-    };
-
-    org.apache.hadoop.mapreduce.Reducer.Context reducerContext =
-        createReduceContext(
-            reducer, job, taskAttemptId,
-            rIter, reduceInputKeyCounter,
-            reduceInputValueCounter,
-            trackedRW,
-            committer,
-            reporter, comparator, keyClass,
-            valueClass);
-
-
-
-    reducer.run(reducerContext);
-    trackedRW.close(reducerContext);
-  }
-
-  @Override
-  public void localizeConfiguration(JobConf jobConf)
-      throws IOException, InterruptedException {
-    super.localizeConfiguration(jobConf);
-    jobConf.setBoolean(JobContext.TASK_ISMAP, false);
-  }
-
-  @Override
-  public TezCounter getOutputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
-  }
-
-  @Override
-  public TezCounter getInputRecordsCounter() {
-    return processorContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
index 060e28c..91fb8cc 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/SimpleOutput.java
@@ -1,181 +1,226 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
 package org.apache.tez.mapreduce.output;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem.Statistics;
+import org.apache.hadoop.mapred.FileOutputCommitter;
+import org.apache.hadoop.mapred.FileOutputFormat;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.TaskAttemptID;
+import org.apache.hadoop.mapred.TaskID;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormatCounter;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.records.OutputContext;
-import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.mapreduce.common.Utils;
+import org.apache.tez.mapreduce.hadoop.MRConfig;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+import org.apache.tez.mapreduce.hadoop.mapred.MRReporter;
+import org.apache.tez.mapreduce.hadoop.mapreduce.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.processor.MRTaskReporter;
 
-/**
- * {@link SimpleOutput} is an {@link Output} which persists key/value pairs
- * written to it. 
- * 
- * It is compatible with all standard Apache Hadoop MapReduce 
- * {@link OutputFormat} implementations. 
- */
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class SimpleOutput implements Output {
-
-  private MRTask task;
-  
+public class SimpleOutput implements LogicalOutput {
+
+  private static final Log LOG = LogFactory.getLog(SimpleOutput.class);
+
+  private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
+  static {
+    NUMBER_FORMAT.setMinimumIntegerDigits(5);
+    NUMBER_FORMAT.setGroupingUsed(false);
+  }
+
+  private TezOutputContext outputContext;
+  private JobConf jobConf;
   boolean useNewApi;
-  JobConf jobConf;
-  
-  org.apache.hadoop.mapreduce.TaskAttemptContext taskAttemptContext;
+  private AtomicBoolean closed = new AtomicBoolean(false);
+
+  @SuppressWarnings("rawtypes")
   org.apache.hadoop.mapreduce.OutputFormat newOutputFormat;
+  @SuppressWarnings("rawtypes")
   org.apache.hadoop.mapreduce.RecordWriter newRecordWriter;
-  
+
+  @SuppressWarnings("rawtypes")
   org.apache.hadoop.mapred.OutputFormat oldOutputFormat;
+  @SuppressWarnings("rawtypes")
   org.apache.hadoop.mapred.RecordWriter oldRecordWriter;
-  
+
   private TezCounter outputRecordCounter;
-  private TezCounter fileOutputByteCounter; 
+  private TezCounter fileOutputByteCounter;
   private List<Statistics> fsStats;
-  private MRTaskReporter reporter;
-  
-  public SimpleOutput(TezEngineTaskContext task)
-  {}
-  
-  public void setTask(MRTask task) {
-    this.task = task;
-  }
-  
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
 
-    if (task == null) {
-      return;
-    }
-    
-    if (conf instanceof JobConf) {
-      jobConf = (JobConf)conf;
-    } else {
-      jobConf = new JobConf(conf);
-    }
-    
-    useNewApi = jobConf.getUseNewMapper();
-    taskAttemptContext = task.getTaskAttemptContext();
-    
-    outputRecordCounter = task.getOutputRecordsCounter();
-    fileOutputByteCounter = task.getFileOutputBytesCounter();
-
-    reporter = task.getMRReporter();
-    
+  private TaskAttemptContext newApiTaskAttemptContext;
+  private org.apache.hadoop.mapred.TaskAttemptContext oldApiTaskAttemptContext;
+
+  private boolean isMapperOutput;
+
+  private OutputCommitter committer;
+
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException, InterruptedException {
+    LOG.info("Initializing Simple Output");
+    this.outputContext = outputContext;
+    Configuration conf = TezUtils.createConfFromUserPayload(
+        outputContext.getUserPayload());
+    this.jobConf = new JobConf(conf);
+    this.useNewApi = this.jobConf.getUseNewMapper();
+    this.isMapperOutput = jobConf.getBoolean(MRConfig.IS_MAP_PROCESSOR,
+        false);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID,
+        outputContext.getDAGAttemptNumber());
+
+    outputRecordCounter = outputContext.getCounters().findCounter(
+        TaskCounter.MAP_OUTPUT_RECORDS);
+    fileOutputByteCounter = outputContext.getCounters().findCounter(
+        FileOutputFormatCounter.BYTES_WRITTEN);
+
     if (useNewApi) {
+      newApiTaskAttemptContext = createTaskAttemptContext();
       try {
         newOutputFormat =
             ReflectionUtils.newInstance(
-                taskAttemptContext.getOutputFormatClass(), jobConf);
+                newApiTaskAttemptContext.getOutputFormatClass(), jobConf);
       } catch (ClassNotFoundException cnfe) {
         throw new IOException(cnfe);
       }
-      
+
       List<Statistics> matchedStats = null;
-      if (newOutputFormat instanceof 
+      if (newOutputFormat instanceof
           org.apache.hadoop.mapreduce.lib.output.FileOutputFormat) {
-        matchedStats = 
-            MRTask.getFsStatistics(
+        matchedStats =
+            Utils.getFsStatistics(
                 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat
-                    .getOutputPath(taskAttemptContext), 
+                    .getOutputPath(newApiTaskAttemptContext),
                 jobConf);
       }
       fsStats = matchedStats;
 
       long bytesOutPrev = getOutputBytes();
-      newRecordWriter = 
-          newOutputFormat.getRecordWriter(this.taskAttemptContext);
+      try {
+        newRecordWriter =
+            newOutputFormat.getRecordWriter(newApiTaskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while creating record writer", e);
+      }
       long bytesOutCurr = getOutputBytes();
       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     } else {
+      TaskAttemptID taskAttemptId = new TaskAttemptID(new TaskID(Long.toString(
+          outputContext.getApplicationId().getClusterTimestamp()),
+          outputContext.getApplicationId().getId(),
+          (isMapperOutput ? TaskType.MAP : TaskType.REDUCE),
+          outputContext.getTaskIndex()),
+          outputContext.getTaskAttemptNumber());
+      jobConf.set(JobContext.TASK_ATTEMPT_ID, taskAttemptId.toString());
+      jobConf.set(JobContext.TASK_ID, taskAttemptId.getTaskID().toString());
+      jobConf.setBoolean(JobContext.TASK_ISMAP, isMapperOutput);
+      jobConf.setInt(JobContext.TASK_PARTITION,
+          taskAttemptId.getTaskID().getId());
+      jobConf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+
+      oldApiTaskAttemptContext =
+          new org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl(
+              jobConf, taskAttemptId,
+              new MRTaskReporter(outputContext));
       oldOutputFormat = jobConf.getOutputFormat();
-      
+
       List<Statistics> matchedStats = null;
-      if (oldOutputFormat instanceof org.apache.hadoop.mapred.FileOutputFormat) {
-        matchedStats = 
-            MRTask.getFsStatistics(
+      if (oldOutputFormat
+          instanceof org.apache.hadoop.mapred.FileOutputFormat) {
+        matchedStats =
+            Utils.getFsStatistics(
                 org.apache.hadoop.mapred.FileOutputFormat.getOutputPath(
-                    jobConf), 
+                    jobConf),
                 jobConf);
       }
       fsStats = matchedStats;
 
       FileSystem fs = FileSystem.get(jobConf);
-      String finalName = task.getOutputName();
+      String finalName = getOutputName();
 
       long bytesOutPrev = getOutputBytes();
-      oldRecordWriter = 
+      oldRecordWriter =
           oldOutputFormat.getRecordWriter(
-              fs, jobConf, finalName, reporter);
+              fs, jobConf, finalName, new MRReporter(outputContext));
       long bytesOutCurr = getOutputBytes();
       fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
     }
+    initCommitter(jobConf, useNewApi);
+
+    LOG.info("Initialized Simple Output"
+        + ", using_new_api: " + useNewApi);
+    return null;
   }
-  
-  public void write(Object key, Object value) 
+
+  public void initCommitter(JobConf job, boolean useNewApi)
       throws IOException, InterruptedException {
 
-    reporter.progress();
-    long bytesOutPrev = getOutputBytes();
-  
     if (useNewApi) {
-      newRecordWriter.write(key, value);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("using new api for output committer");
+      }
+
+      OutputFormat<?, ?> outputFormat = null;
+      try {
+        outputFormat = ReflectionUtils.newInstance(
+            newApiTaskAttemptContext.getOutputFormatClass(), job);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException("Unknown OutputFormat", cnfe);
+      }
+      this.committer = outputFormat.getOutputCommitter(
+          newApiTaskAttemptContext);
     } else {
-      oldRecordWriter.write(key, value);
+      this.committer = job.getOutputCommitter();
     }
-    
-    long bytesOutCurr = getOutputBytes();
-    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
-    outputRecordCounter.increment(1);
 
+    Path outputPath = FileOutputFormat.getOutputPath(job);
+    if (outputPath != null) {
+      if ((this.committer instanceof FileOutputCommitter)) {
+        FileOutputFormat.setWorkOutputPath(job,
+            ((FileOutputCommitter) this.committer).getTaskAttemptPath(
+                oldApiTaskAttemptContext));
+      } else {
+        FileOutputFormat.setWorkOutputPath(job, outputPath);
+      }
+    }
+    if (useNewApi) {
+      this.committer.setupTask(newApiTaskAttemptContext);
+    } else {
+      this.committer.setupTask(oldApiTaskAttemptContext);
+    }
   }
 
-  public void close() throws IOException, InterruptedException {
-    reporter.progress();
-    long bytesOutPrev = getOutputBytes();
+  public boolean isCommitRequired() throws IOException {
     if (useNewApi) {
-      newRecordWriter.close(taskAttemptContext);
+      return committer.needsTaskCommit(newApiTaskAttemptContext);
     } else {
-      oldRecordWriter.close(null);
+      return committer.needsTaskCommit(oldApiTaskAttemptContext);
     }
-    long bytesOutCurr = getOutputBytes();
-    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
   }
 
-  public org.apache.hadoop.mapreduce.OutputFormat getNewOutputFormat() {
-    return newOutputFormat;
-  }
-  
-  public org.apache.hadoop.mapred.OutputFormat getOldOutputFormat() {
-    return oldOutputFormat;
+  private TaskAttemptContext createTaskAttemptContext() {
+    return new TaskAttemptContextImpl(this.jobConf, outputContext,
+        isMapperOutput);
   }
-  
+
   private long getOutputBytes() {
     if (fsStats == null) return 0;
     long bytesWritten = 0;
@@ -185,9 +230,97 @@ public class SimpleOutput implements Output {
     return bytesWritten;
   }
 
+  private String getOutputName() {
+    return "part-" + NUMBER_FORMAT.format(outputContext.getTaskIndex());
+  }
+
+  @Override
+  public KVWriter getWriter() throws IOException {
+    return new KVWriter() {
+      private final boolean useNewWriter = useNewApi;
+
+      @SuppressWarnings("unchecked")
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        long bytesOutPrev = getOutputBytes();
+        if (useNewWriter) {
+          try {
+            newRecordWriter.write(key, value);
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new IOException("Interrupted while writing next key-value",e);
+          }
+        } else {
+          oldRecordWriter.write(key, value);
+        }
+
+        long bytesOutCurr = getOutputBytes();
+        fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+        outputRecordCounter.increment(1);
+      }
+    };
+  }
+
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    // Not expecting any events at the moment.
+  }
+
   @Override
-  public OutputContext getOutputContext() {
+  public synchronized List<Event> close() throws IOException {
+    if (closed.getAndSet(true)) {
+      return null;
+    }
+
+    LOG.info("Closing Simple Output");
+    long bytesOutPrev = getOutputBytes();
+    if (useNewApi) {
+      try {
+        newRecordWriter.close(newApiTaskAttemptContext);
+      } catch (InterruptedException e) {
+        throw new IOException("Interrupted while closing record writer", e);
+      }
+    } else {
+      oldRecordWriter.close(null);
+    }
+    long bytesOutCurr = getOutputBytes();
+    fileOutputByteCounter.increment(bytesOutCurr - bytesOutPrev);
+    LOG.info("Closed Simple Output");
     return null;
   }
 
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    // Nothing to do for now
+  }
+
+  /**
+   * SimpleOutput expects that a Processor call commit prior to the
+   * Processor's completion
+   * @throws IOException
+   */
+  public void commit() throws IOException {
+    close();
+    if (useNewApi) {
+      committer.commitTask(newApiTaskAttemptContext);
+    } else {
+      committer.commitTask(oldApiTaskAttemptContext);
+    }
+  }
+
+
+  /**
+   * SimpleOutput expects that a Processor call abort in case of any error
+   * ( including an error during commit ) prior to the Processor's completion
+   * @throws IOException
+   */
+  public void abort() throws IOException {
+    close();
+    if (useNewApi) {
+      committer.abortTask(newApiTaskAttemptContext);
+    } else {
+      committer.abortTask(oldApiTaskAttemptContext);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d609458/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
index 61dfcd1..d061ad5 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/partition/MRPartitioner.java
@@ -1,70 +1,54 @@
 /**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 package org.apache.tez.mapreduce.partition;
 
-import java.io.IOException;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 
-@SuppressWarnings({"rawtypes", "unchecked"})
+@SuppressWarnings({ "rawtypes", "unchecked" })
 public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
 
   static final Log LOG = LogFactory.getLog(MRPartitioner.class);
-  private final MRTask task;
-  
-  JobConf jobConf;
-  boolean useNewApi;
-  
-  org.apache.hadoop.mapred.Partitioner oldPartitioner;
-  org.apache.hadoop.mapreduce.Partitioner newPartitioner;
 
-  public MRPartitioner(MRTask task) {
-    this.task = task;
-  }
-  
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException {
-    if (conf instanceof JobConf) {
-      jobConf = (JobConf)conf;
-    } else {
-      jobConf = new JobConf(conf);
-    }
-    
-    useNewApi = jobConf.getUseNewMapper();
-    final int partitions = this.task.getTezEngineTaskContext()
-        .getOutputSpecList().get(0).getNumOutputs();
+  private final boolean useNewApi;
+  private int partitions = 1;
+
+  private org.apache.hadoop.mapreduce.Partitioner newPartitioner;
+  private org.apache.hadoop.mapred.Partitioner oldPartitioner;
+
+  public MRPartitioner(Configuration conf) {
+    this.useNewApi = ConfigUtils.useNewApi(conf);
+    this.partitions = conf.getInt(TezJobConfig.TEZ_ENGINE_NUM_EXPECTED_PARTITIONS, 1);
+
     if (useNewApi) {
       if (partitions > 1) {
-        try {
-          newPartitioner = (org.apache.hadoop.mapreduce.Partitioner)
-            ReflectionUtils.newInstance(
-                task.getJobContext().getPartitionerClass(), jobConf);
-        } catch (ClassNotFoundException cnfe) {
-          throw new IOException(cnfe);
-        }
+        newPartitioner = (org.apache.hadoop.mapreduce.Partitioner) ReflectionUtils
+            .newInstance(
+                (Class<? extends org.apache.hadoop.mapreduce.Partitioner<?, ?>>) conf
+                    .getClass(MRJobConfig.PARTITIONER_CLASS_ATTR,
+                        org.apache.hadoop.mapreduce.lib.partition.HashPartitioner.class), conf);
       } else {
         newPartitioner = new org.apache.hadoop.mapreduce.Partitioner() {
           @Override
@@ -75,24 +59,24 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
       }
     } else {
       if (partitions > 1) {
-        oldPartitioner = (Partitioner)
-          ReflectionUtils.newInstance(jobConf.getPartitionerClass(), jobConf);
+        oldPartitioner = (org.apache.hadoop.mapred.Partitioner) ReflectionUtils.newInstance(
+            (Class<? extends org.apache.hadoop.mapred.Partitioner>) conf.getClass(
+                "mapred.partitioner.class", org.apache.hadoop.mapred.lib.HashPartitioner.class), conf);
       } else {
-        oldPartitioner = new Partitioner() {
+        oldPartitioner = new org.apache.hadoop.mapred.Partitioner() {
           @Override
-          public void configure(JobConf job) {}
-          
+          public void configure(JobConf job) {
+          }
+
           @Override
           public int getPartition(Object key, Object value, int numPartitions) {
             return numPartitions - 1;
           }
         };
       }
-
     }
-
   }
-  
+
   @Override
   public int getPartition(Object key, Object value, int numPartitions) {
     if (useNewApi) {
@@ -101,5 +85,4 @@ public class MRPartitioner implements org.apache.tez.engine.api.Partitioner {
       return oldPartitioner.getPartition(key, value, numPartitions);
     }
   }
-
-}
+}
\ No newline at end of file