You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by hi...@apache.org on 2013/04/19 01:54:28 UTC

svn commit: r1469642 [34/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t...

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,111 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.tez.common.TezTaskReporter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.mapreduce.hadoop.mapred.MRCounters;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable 
+public class MRTaskReporter 
+    extends org.apache.hadoop.mapreduce.StatusReporter
+    implements Reporter {
+
+  private final TezTaskReporterImpl reporter;
+  
+  private InputSplit split = null;
+
+  public MRTaskReporter(TezTaskReporter reporter) {
+    this.reporter =  (TezTaskReporterImpl)reporter;
+  }
+
+  // getters and setters for flag
+  void setProgressFlag() {
+    reporter.setProgressFlag();
+  }
+  boolean resetProgressFlag() {
+    return reporter.resetProgressFlag();
+  }
+  public void setStatus(String status) {
+    reporter.setStatus(status);
+  }
+  public void setProgress(float progress) {
+    reporter.setProgress(progress);
+  }
+  
+  public float getProgress() {
+    return reporter.getProgress();
+  };
+  
+  public void progress() {
+    reporter.progress();
+  }
+  
+  public Counters.Counter getCounter(String group, String name) {
+    TezCounter counter = reporter.getCounter(group, name);
+    MRCounters.MRCounter mrCounter = null;
+    if (counter != null) {
+      mrCounter = new MRCounters.MRCounter(counter);
+    }
+    return mrCounter;
+  }
+  
+  public Counters.Counter getCounter(Enum<?> name) {
+    TezCounter counter = reporter.getCounter(name);
+    MRCounters.MRCounter mrCounter = null;
+    if (counter != null) {
+      mrCounter = new MRCounters.MRCounter(counter);
+    }
+    return mrCounter;
+  }
+  
+  public void incrCounter(Enum<?> key, long amount) {
+    reporter.incrCounter(key, amount);
+  }
+  
+  public void incrCounter(String group, String counter, long amount) {
+    reporter.incrCounter(group, counter, amount);
+  }
+  
+  public void setInputSplit(InputSplit split) {
+    this.split = split;
+  }
+  
+  public InputSplit getInputSplit() throws UnsupportedOperationException {
+    if (split == null) {
+      throw new UnsupportedOperationException("Input only available on map");
+    } else {
+      return split;
+    }
+  }  
+  
+  public void startCommunicationThread() {
+    reporter.startCommunicationThread();
+  }
+  
+  public void stopCommunicationThread() throws InterruptedException {
+    reporter.stopCommunicationThread();
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/MRTaskReporter.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,271 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.processor;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable 
+class TezTaskReporterImpl 
+    implements org.apache.tez.common.TezTaskReporter, Runnable {
+
+  private static final Log LOG = LogFactory.getLog(TezTaskReporterImpl.class);
+  
+  private final MRTask mrTask;
+  private final TezTaskUmbilicalProtocol umbilical;
+  private final Progress taskProgress;
+  
+  private Thread pingThread = null;
+  private boolean done = true;
+  private Object lock = new Object();
+
+  /**
+   * flag that indicates whether progress update needs to be sent to parent.
+   * If true, it has been set. If false, it has been reset. 
+   * Using AtomicBoolean since we need an atomic read & reset method. 
+   */  
+  private AtomicBoolean progressFlag = new AtomicBoolean(false);
+  
+  TezTaskReporterImpl(MRTask mrTask, TezTaskUmbilicalProtocol umbilical) {
+    this.mrTask = mrTask;
+    this.umbilical = umbilical;
+    this.taskProgress = mrTask.getProgress();
+  }
+
+  // getters and setters for flag
+  void setProgressFlag() {
+    progressFlag.set(true);
+  }
+  
+  boolean resetProgressFlag() {
+    return progressFlag.getAndSet(false);
+  }
+  
+  public void setStatus(String status) {
+    // FIXME - BADLY
+    if (true) {
+      return;
+    }
+    taskProgress.setStatus(
+        MRTask.normalizeStatus(status, this.mrTask.jobConf));
+    // indicate that progress update needs to be sent
+    setProgressFlag();
+  }
+  
+  public void setProgress(float progress) {
+    // set current phase progress.
+    // This method assumes that task has phases.
+    taskProgress.phase().set(progress);
+    // indicate that progress update needs to be sent
+    setProgressFlag();
+  }
+  
+  public float getProgress() {
+    return taskProgress.getProgress();
+  };
+  
+  public void progress() {
+    // indicate that progress update needs to be sent
+    setProgressFlag();
+  }
+  
+  public TezCounter getCounter(String group, String name) {
+    return this.mrTask.counters == null ? 
+        null : 
+        this.mrTask.counters.findCounter(group, name);
+  }
+  
+  public TezCounter getCounter(Enum<?> name) {
+    return this.mrTask.counters == null ? 
+        null : 
+        this.mrTask.counters.findCounter(name);
+  }
+  
+  public void incrCounter(Enum<?> key, long amount) {
+    if (this.mrTask.counters != null) {
+      this.mrTask.counters.findCounter(key).increment(amount);
+    }
+    setProgressFlag();
+  }
+  
+  public void incrCounter(String group, String counter, long amount) {
+    if (this.mrTask.counters != null) {
+      this.mrTask.counters.findCounter(group, counter).increment(amount);
+    }
+    setProgressFlag();
+  }
+  
+  /** 
+   * The communication thread handles communication with the parent (Task Tracker). 
+   * It sends progress updates if progress has been made or if the task needs to 
+   * let the parent know that it's alive. It also pings the parent to see if it's alive. 
+   */
+  public void run() {
+    final int MAX_RETRIES = 3;
+    int remainingRetries = MAX_RETRIES;
+    // get current flag value and reset it as well
+    boolean sendProgress = resetProgressFlag();
+    while (!this.mrTask.taskDone.get()) {
+      synchronized (lock) {
+        done = false;
+      }
+      try {
+        boolean taskFound = true; // whether TT knows about this task
+        // sleep for a bit
+        synchronized(lock) {
+          if (this.mrTask.taskDone.get()) {
+            break;
+          }
+          lock.wait(MRTask.PROGRESS_INTERVAL);
+        }
+        if (this.mrTask.taskDone.get()) {
+          break;
+        }
+
+        if (sendProgress) {
+          // we need to send progress update
+          this.mrTask.updateCounters();
+          this.mrTask.getStatus().statusUpdate(
+              taskProgress.get(),
+              taskProgress.toString(), 
+              this.mrTask.counters);
+          taskFound = 
+              umbilical.statusUpdate(
+                  this.mrTask.getTaskAttemptId(), this.mrTask.getStatus());
+          this.mrTask.getStatus().clearStatus();
+        }
+        else {
+          // send ping 
+          taskFound = umbilical.ping(this.mrTask.getTaskAttemptId());
+        }
+
+        // if Task Tracker is not aware of our task ID (probably because it died and 
+        // came back up), kill ourselves
+        if (!taskFound) {
+          MRTask.LOG.warn("Parent died.  Exiting " + this.mrTask.getTaskAttemptId());
+          resetDoneFlag();
+          System.exit(66);
+        }
+
+        sendProgress = resetProgressFlag(); 
+        remainingRetries = MAX_RETRIES;
+      } 
+      catch (Throwable t) {
+        MRTask.LOG.info("Communication exception: " + StringUtils.stringifyException(t));
+        remainingRetries -=1;
+        if (remainingRetries == 0) {
+          ReflectionUtils.logThreadInfo(MRTask.LOG, "Communication exception", 0);
+          MRTask.LOG.warn("Last retry, killing " + this.mrTask.getTaskAttemptId());
+          resetDoneFlag();
+          System.exit(65);
+        }
+      }
+    }
+    //Notify that we are done with the work
+    resetDoneFlag();
+  }
+  void resetDoneFlag() {
+    synchronized (lock) {
+      done = true;
+      lock.notify();
+    }
+  }
+  public void startCommunicationThread() {
+    if (pingThread == null) {
+      pingThread = new Thread(this, "communication thread");
+      pingThread.setDaemon(true);
+      pingThread.start();
+    }
+  }
+  public void stopCommunicationThread() throws InterruptedException {
+    if (pingThread != null) {
+      // Intent of the lock is to not send an interupt in the middle of an
+      // umbilical.ping or umbilical.statusUpdate
+      synchronized(lock) {
+      //Interrupt if sleeping. Otherwise wait for the RPC call to return.
+        lock.notify(); 
+      }
+
+      synchronized (lock) { 
+        while (!done) {
+          lock.wait();
+        }
+      }
+      pingThread.interrupt();
+      pingThread.join();
+    }
+  }
+
+  @Override
+  public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+      int fromEventIdx, int maxEventsToFetch,
+      TezTaskAttemptID reduce) {
+    return umbilical.getDependentTasksCompletionEvents(
+        fromEventIdx, maxEventsToFetch, reduce);
+  }
+
+  @Override
+  public void reportFatalError(TezTaskAttemptID taskAttemptId,
+      Throwable throwable, String logMsg) {
+    LOG.fatal(logMsg);
+    Throwable tCause = throwable.getCause();
+    String cause = tCause == null 
+                   ? StringUtils.stringifyException(throwable)
+                   : StringUtils.stringifyException(tCause);
+    try {
+      umbilical.fatalError(mrTask.getTaskAttemptId(), cause);
+    } catch (IOException ioe) {
+      LOG.fatal("Failed to contact the tasktracker", ioe);
+      System.exit(-1);
+    }
+  }
+
+  public TezTaskUmbilicalProtocol getUmbilical() {
+    return umbilical;
+  }
+
+  @Override
+  public long getProtocolVersion(String protocol, long clientVersion)
+      throws IOException {
+    // TODO TEZAM3
+    return 1;
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String protocol,
+      long clientVersion, int clientMethodsHash) throws IOException {
+    return ProtocolSignature.getProtocolSignature(this, protocol,
+        clientVersion, clientMethodsHash);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/TezTaskReporterImpl.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,377 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.map;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.MapRunnable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.map.WrappedMapper;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitIndex;
+import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.split.SplitMetaInfoReaderTez;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.mapreduce.hadoop.IDConverter;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class MapProcessor extends MRTask implements Processor {
+
+  private static final Log LOG = LogFactory.getLog(MapProcessor.class);
+
+  private Progress mapPhase;
+
+  @Inject
+  public MapProcessor(
+      @Assisted TezTask context
+      ) throws IOException {
+    super(context);
+  }
+  
+
+
+  @Override
+  public void initialize(Configuration conf, Master master) throws IOException,
+  InterruptedException {
+    super.initialize(conf, master);
+    TaskSplitMetaInfo[] allMetaInfo = readSplits();
+    TaskSplitMetaInfo thisTaskMetaInfo = allMetaInfo[tezTaskContext
+        .getTaskAttemptId().getTaskID().getId()];
+    splitMetaInfo = new TaskSplitIndex(thisTaskMetaInfo.getSplitLocation(),
+        thisTaskMetaInfo.getStartOffset());
+  }
+
+  @Override
+  public void process(
+      final Input in, 
+      final Output out)
+          throws IOException, InterruptedException {
+    MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
+    boolean useNewApi = jobConf.getUseNewMapper();
+    initTask(jobConf, getDAGID(), reporter, useNewApi);
+
+    if (in instanceof SimpleInput) {
+      ((SimpleInput)in).setTask(this);
+    }
+    
+    if (out instanceof SimpleOutput) {
+      ((SimpleOutput)out).setTask(this);
+    } else if (out instanceof SortingOutput) {
+      ((SortingOutput)out).setTask(this);
+    }
+    
+    
+    in.initialize(jobConf, getTaskReporter());
+    out.initialize(jobConf, getTaskReporter());
+
+    // If there are no reducers then there won't be any sort. Hence the map 
+    // phase will govern the entire attempt's progress.
+    if (jobConf.getNumReduceTasks() == 0) {
+      mapPhase = getProgress().addPhase("map", 1.0f);
+    } else {
+      // If there are reducers then the entire attempt's progress will be 
+      // split between the map phase (67%) and the sort phase (33%).
+      mapPhase = getProgress().addPhase("map", 0.667f);
+    }
+
+    // Sanity check
+    if (!(in instanceof SimpleInput)) {
+      throw new IOException("Unknown input! - " + in.getClass());
+    }
+    SimpleInput input = (SimpleInput)in;
+    
+    if (useNewApi) {
+      runNewMapper(jobConf, reporter, input, out, getTaskReporter());
+    } else {
+      runOldMapper(jobConf, reporter, input, out, getTaskReporter());
+    }
+
+    done(out.getOutputContext(), reporter);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    // TODO Auto-generated method stub
+
+  }
+  
+  void runOldMapper(
+      final JobConf job,
+      final MRTaskReporter reporter,
+      final SimpleInput input,
+      final Output output,
+      final Master master
+      ) throws IOException, InterruptedException {
+    
+    RecordReader in = new OldRecordReader(input);
+        
+    int numReduceTasks = job.getNumReduceTasks();
+    LOG.info("numReduceTasks: " + numReduceTasks);
+
+    OutputCollector collector = new OldOutputCollector(output);
+
+    MapRunnable runner =
+        (MapRunnable)ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
+
+    try {
+      runner.run(in, collector, (Reporter)reporter);
+      mapPhase.complete();
+      // start the sort phase only if there are reducers
+      if (numReduceTasks > 0) {
+        setPhase(TezTaskStatus.Phase.SORT);
+      }
+      this.statusUpdate();
+    } finally {
+      //close
+      in.close();                               // close input
+      output.close();
+    }
+  }
+
+  private void runNewMapper(final JobConf job,
+      MRTaskReporter reporter,
+      final SimpleInput in,
+      Output out,
+      final Master master
+      ) throws IOException, InterruptedException {
+    // make a task context so we can get the classes
+    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+        new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+
+    // make a mapper
+    org.apache.hadoop.mapreduce.Mapper mapper;
+    try {
+      mapper = (org.apache.hadoop.mapreduce.Mapper)
+          ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
+    } catch (ClassNotFoundException cnfe) {
+      throw new IOException(cnfe);
+    }
+
+    if (!(in instanceof SimpleInput)) {
+      throw new IOException("Unknown input! - " + in.getClass());
+    }
+
+    org.apache.hadoop.mapreduce.RecordReader input =
+        new NewRecordReader(in);
+
+    org.apache.hadoop.mapreduce.RecordWriter output = 
+        new NewOutputCollector(out);
+
+    org.apache.hadoop.mapreduce.InputSplit split = in.getNewInputSplit();
+    
+    org.apache.hadoop.mapreduce.MapContext 
+    mapContext = 
+    new org.apache.tez.mapreduce.hadoop.mapreduce.MapContextImpl(
+        job, IDConverter.toMRTaskAttemptId(getTaskAttemptId()), 
+        input, output, 
+        getCommitter(), 
+        reporter, split);
+
+    org.apache.hadoop.mapreduce.Mapper.Context mapperContext = 
+        new WrappedMapper().getMapContext(mapContext);
+
+    input.initialize(split, mapperContext);
+    mapper.run(mapperContext);
+    mapPhase.complete();
+    setPhase(TezTaskStatus.Phase.SORT);
+    this.statusUpdate();
+    input.close();
+    output.close(mapperContext);
+  }
+
+  private static class NewRecordReader extends
+      org.apache.hadoop.mapreduce.RecordReader {
+    private final SimpleInput in;
+
+    private NewRecordReader(SimpleInput in) {
+      this.in = in;
+    }
+
+    @Override
+    public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
+        TaskAttemptContext context) throws IOException,
+        InterruptedException {
+      in.initializeNewRecordReader(split, context);
+    }
+
+    @Override
+    public boolean nextKeyValue() throws IOException,
+        InterruptedException {
+      return in.hasNext();
+    }
+
+    @Override
+    public Object getCurrentKey() throws IOException,
+        InterruptedException {
+      return in.getNextKey();
+    }
+
+    @Override
+    public Object getCurrentValue() throws IOException,
+        InterruptedException {
+      return in.getNextValues().iterator().next();
+    }
+
+    @Override
+    public float getProgress() throws IOException, InterruptedException {
+      return in.getProgress();
+    }
+
+    @Override
+    public void close() throws IOException {
+      in.close();
+    }
+  }
+
+  private static class OldRecordReader implements RecordReader {
+    private final SimpleInput simpleInput;
+
+    private OldRecordReader(SimpleInput simpleInput) {
+      this.simpleInput = simpleInput;
+    }
+
+    @Override
+    public boolean next(Object key, Object value) throws IOException {
+      simpleInput.setKey(key);
+      simpleInput.setValue(value);
+      try {
+        return simpleInput.hasNext();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+
+    @Override
+    public Object createKey() {
+      return simpleInput.getOldRecordReader().createKey();
+    }
+
+    @Override
+    public Object createValue() {
+      return simpleInput.getOldRecordReader().createValue();
+    }
+
+    @Override
+    public long getPos() throws IOException {
+      return simpleInput.getOldRecordReader().getPos();
+    }
+
+    @Override
+    public void close() throws IOException {
+      simpleInput.close();
+    }
+
+    @Override
+    public float getProgress() throws IOException {
+      try {
+        return simpleInput.getProgress();
+      } catch (InterruptedException ie) {
+        throw new IOException(ie);
+      }
+    }
+  }
+
+  private static class OldOutputCollector 
+  implements OutputCollector {
+    private final Output output;
+    
+    OldOutputCollector(Output output) {
+      this.output = output;
+    }
+
+    public void collect(Object key, Object value) throws IOException {
+      try {
+        output.write(key, value);
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new IOException("interrupt exception", ie);
+      }
+    }
+  }
+
+  private class NewOutputCollector
+    extends org.apache.hadoop.mapreduce.RecordWriter {
+    private final Output out;
+
+    NewOutputCollector(Output out) throws IOException {
+      this.out = out;
+    }
+
+    @Override
+    public void write(Object key, Object value) throws IOException, InterruptedException {
+      out.write(key, value);
+    }
+
+    @Override
+    public void close(TaskAttemptContext context
+                      ) throws IOException, InterruptedException {
+      out.close();
+    }
+  }
+
+  @Override
+  public void localizeConfiguration(JobConf jobConf) 
+      throws IOException, InterruptedException {
+    super.localizeConfiguration(jobConf);
+    jobConf.setBoolean(JobContext.TASK_ISMAP, true);
+  }
+  
+  @Override
+  public TezCounter getOutputRecordsCounter() {
+    return reporter.getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+  }
+
+  @Override
+  public TezCounter getInputRecordsCounter() {
+    return reporter.getCounter(TaskCounter.MAP_INPUT_RECORDS);
+
+  }
+  
+  protected TaskSplitMetaInfo[] readSplits() throws IOException {
+    TaskSplitMetaInfo[] allTaskSplitMetaInfo;
+    allTaskSplitMetaInfo = SplitMetaInfoReaderTez.readSplitMetaInfo(getConf(),
+        FileSystem.getLocal(getConf()));
+    return allTaskSplitMetaInfo;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/map/MapProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,347 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.processor.reduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.mapred.Counters.Counter;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobContext;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.MRTask;
+import org.apache.tez.mapreduce.processor.MRTaskReporter;
+
+import com.google.common.base.Preconditions;
+import com.google.inject.Inject;
+import com.google.inject.assistedinject.Assisted;
+
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class ReduceProcessor
+extends MRTask
+implements Processor {
+
+  private static final Log LOG = LogFactory.getLog(ReduceProcessor.class);
+  
+  private Progress sortPhase;
+  private Progress reducePhase;
+
+  private Counter reduceInputKeyCounter;
+  private Counter reduceInputValueCounter;
+  private int numMapTasks;
+
+  @Inject
+  public ReduceProcessor(
+      @Assisted TezTask context
+      ) {
+    super(context);
+    TezEngineTask tezEngineContext = (TezEngineTask) context;
+    Preconditions.checkNotNull(tezEngineContext.getInputSpecList(),
+        "InputSpecList should not be null");
+    Preconditions.checkArgument(
+        tezEngineContext.getInputSpecList().size() == 1,
+        "Expected exactly one input, found : "
+            + tezEngineContext.getInputSpecList().size());
+    this.numMapTasks = tezEngineContext.getInputSpecList().get(0)
+        .getNumInputs();
+  }
+  
+  @Override
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+    super.initialize(conf, master);
+    
+  }
+
+  @Override
+  public void process(Input in, Output out)
+      throws IOException, InterruptedException {
+    MRTaskReporter reporter = new MRTaskReporter(getTaskReporter());
+    boolean useNewApi = jobConf.getUseNewMapper();
+    initTask(jobConf, getDAGID(), reporter, useNewApi);
+
+    if (in instanceof SimpleInput) {
+      ((SimpleInput)in).setTask(this);
+    } else if (in instanceof ShuffledMergedInput) {
+      ((ShuffledMergedInput)in).setTask(this);
+    }
+    
+    if (out instanceof SimpleOutput) {
+      ((SimpleOutput)out).setTask(this);
+    } else if (out instanceof SortingOutput) {
+      ((SortingOutput)out).setTask(this);
+    }
+
+    in.initialize(jobConf, getTaskReporter());
+    out.initialize(jobConf, getTaskReporter());
+
+    sortPhase  = getProgress().addPhase("sort");
+    reducePhase = getProgress().addPhase("reduce");
+    sortPhase.complete();                         // sort is complete
+    setPhase(TezTaskStatus.Phase.REDUCE); 
+
+    this.statusUpdate();
+    
+    Class keyClass = ConfigUtils.getMapOutputKeyClass(jobConf);
+    Class valueClass = ConfigUtils.getMapOutputValueClass(jobConf);
+    RawComparator comparator = 
+        ConfigUtils.getOutputValueGroupingComparator(jobConf);
+
+    reduceInputKeyCounter = 
+        reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    reduceInputValueCounter = 
+        reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+        
+    // Sanity check
+    if (!(in instanceof ShuffledMergedInput)) {
+      throw new IOException("Illegal input to reduce: " + in.getClass());
+    }
+    ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in;
+
+    if (useNewApi) {
+      try {
+        runNewReducer(
+            jobConf, 
+            (TezTaskUmbilicalProtocol)getUmbilical(), reporter, 
+            shuffleInput, comparator,  keyClass, valueClass, 
+            out);
+      } catch (ClassNotFoundException cnfe) {
+        throw new IOException(cnfe);
+      }
+    } else {
+      runOldReducer(
+          jobConf, (TezTaskUmbilicalProtocol)getUmbilical(), reporter, 
+          shuffleInput, comparator, keyClass, valueClass, out);
+    }
+    
+    done(out.getOutputContext(), reporter);
+  }
+
+  public void close() throws IOException, InterruptedException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  void runOldReducer(JobConf job,
+      TezTaskUmbilicalProtocol umbilical,
+      final MRTaskReporter reporter,
+      ShuffledMergedInput input,
+      RawComparator comparator,
+      Class keyClass,
+      Class valueClass,
+      final Output output) throws IOException, InterruptedException {
+    
+    Reducer reducer = 
+        ReflectionUtils.newInstance(job.getReducerClass(), job);
+    
+    // make output collector
+
+    OutputCollector collector = 
+        new OutputCollector() {
+      public void collect(Object key, Object value)
+          throws IOException {
+        try {
+          output.write(key, value);
+        } catch (InterruptedException ie) {
+          throw new IOException(ie);
+        }
+      }
+    };
+
+    // apply reduce function
+    try {
+      ReduceValuesIterator values = 
+          new ReduceValuesIterator(
+              input, 
+              job.getOutputValueGroupingComparator(), keyClass, valueClass, 
+              job, reporter, reduceInputValueCounter, reducePhase);
+      
+      values.informReduceProgress();
+      while (values.more()) {
+        reduceInputKeyCounter.increment(1);
+        reducer.reduce(values.getKey(), values, collector, reporter);
+        values.nextKey();
+        values.informReduceProgress();
+      }
+
+      //Clean up: repeated in catch block below
+      reducer.close();
+      output.close();
+      //End of clean up.
+    } catch (IOException ioe) {
+      try {
+        reducer.close();
+      } catch (IOException ignored) {}
+
+      try {
+        output.close();
+      } catch (IOException ignored) {}
+
+      throw ioe;
+    }
+  }
+  
+  private static class ReduceValuesIterator<KEY,VALUE> 
+  extends org.apache.tez.engine.common.task.impl.ValuesIterator<KEY,VALUE> {
+    private Counter reduceInputValueCounter;
+    private Progress reducePhase;
+
+    public ReduceValuesIterator (ShuffledMergedInput in,
+        RawComparator<KEY> comparator, 
+        Class<KEY> keyClass,
+        Class<VALUE> valClass,
+        Configuration conf, Progressable reporter,
+        Counter reduceInputValueCounter,
+        Progress reducePhase)
+            throws IOException {
+      super(in.getIterator(), comparator, keyClass, valClass, conf, reporter);
+      this.reduceInputValueCounter = reduceInputValueCounter;
+      this.reducePhase = reducePhase;
+    }
+
+    @Override
+    public VALUE next() {
+      reduceInputValueCounter.increment(1);
+      return moveToNext();
+    }
+
+    protected VALUE moveToNext() {
+      return super.next();
+    }
+
+    public void informReduceProgress() {
+      reducePhase.set(super.in.getProgress().getProgress()); // update progress
+      reporter.progress();
+    }
+  }
+
+  void runNewReducer(JobConf job,
+      final TezTaskUmbilicalProtocol umbilical,
+      final MRTaskReporter reporter,
+      ShuffledMergedInput input,
+      RawComparator comparator,
+      Class keyClass,
+      Class valueClass,
+      final Output out
+      ) throws IOException,InterruptedException, 
+      ClassNotFoundException {
+    // wrap value iterator to report progress.
+    final TezRawKeyValueIterator rawIter = input.getIterator();
+    TezRawKeyValueIterator rIter = new TezRawKeyValueIterator() {
+      public void close() throws IOException {
+        rawIter.close();
+      }
+      public DataInputBuffer getKey() throws IOException {
+        return rawIter.getKey();
+      }
+      public Progress getProgress() {
+        return rawIter.getProgress();
+      }
+      public DataInputBuffer getValue() throws IOException {
+        return rawIter.getValue();
+      }
+      public boolean next() throws IOException {
+        boolean ret = rawIter.next();
+        reporter.setProgress(rawIter.getProgress().getProgress());
+        return ret;
+      }
+    };
+    
+    // make a task context so we can get the classes
+    org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
+        new TaskAttemptContextImpl(job, getTaskAttemptId(), reporter);
+    
+    // make a reducer
+    org.apache.hadoop.mapreduce.Reducer reducer =
+        (org.apache.hadoop.mapreduce.Reducer)
+        ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
+    
+    org.apache.hadoop.mapreduce.RecordWriter trackedRW = 
+        new org.apache.hadoop.mapreduce.RecordWriter() {
+
+          @Override
+          public void write(Object key, Object value) throws IOException,
+              InterruptedException {
+            out.write(key, value);
+          }
+
+          @Override
+          public void close(TaskAttemptContext context) throws IOException,
+              InterruptedException {
+            out.close();
+          }
+        };
+
+    org.apache.hadoop.mapreduce.Reducer.Context reducerContext = 
+        createReduceContext(
+            reducer, job, getTaskAttemptId(),
+            rIter, reduceInputKeyCounter, 
+            reduceInputValueCounter, 
+            trackedRW,
+            committer,
+            reporter, comparator, keyClass,
+            valueClass);
+    reducer.run(reducerContext);
+    trackedRW.close(reducerContext);
+  }
+
+  @Override
+  public void localizeConfiguration(JobConf jobConf) 
+      throws IOException, InterruptedException {
+    super.localizeConfiguration(jobConf);
+    jobConf.setBoolean(JobContext.TASK_ISMAP, false);
+    jobConf.setInt(TezJobConfig.TEZ_ENGINE_TASK_INDEGREE, numMapTasks); 
+  }
+
+  @Override
+  public TezCounter getOutputRecordsCounter() {
+    return reporter.getCounter(TaskCounter.REDUCE_OUTPUT_RECORDS);
+  }
+
+  @Override
+  public TezCounter getInputRecordsCounter() {
+    return reporter.getCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+  }
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class FinalTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, SimpleOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, ShuffledMergedInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, ReduceProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/FinalTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, OnFileSortedOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.output.InMemorySortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithInMemSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+  
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, InMemorySortedOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithInMemSort.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class InitialTaskWithLocalSort extends AbstractModule {
+// TODO EVENTUALLY - have all types subclass a single parent instead of AbstractModule.
+  
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, LocalOnFileSorterOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/InitialTaskWithLocalSort.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+
+public class IntermediateTask extends AbstractModule {
+
+  @Override
+  protected void configure() {
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, ShuffledMergedInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, OnFileSortedOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, ReduceProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/IntermediateTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.LocalMergedInput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class LocalFinalTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, SimpleOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, LocalMergedInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, ReduceProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/LocalFinalTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce.task;
+
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.task.RuntimeTask;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+
+import com.google.inject.AbstractModule;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class MapOnlyTask extends AbstractModule {
+
+  @Override
+  protected void configure() {    
+    install(
+        new FactoryModuleBuilder().implement(
+            Input.class, SimpleInput.class).
+        build(InputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Output.class, SimpleOutput.class).
+        build(OutputFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Processor.class, MapProcessor.class).
+        build(ProcessorFactory.class)
+        );
+    install(
+        new FactoryModuleBuilder().implement(
+            Task.class, RuntimeTask.class).
+        build(TaskFactory.class)
+        );
+    
+    bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/MapOnlyTask.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,239 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce.task.impl;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapOutputFile;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.tez.common.Constants;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class YarnOutputFiles extends MapOutputFile {
+
+  private JobConf conf;
+
+  private static final String JOB_OUTPUT_DIR = "output";
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  public YarnOutputFiles() {
+  }
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc = 
+    new LocalDirAllocator(MRConfig.LOCAL_DIR);
+
+  private Path getAttemptOutputDir() {
+    return new Path(JOB_OUTPUT_DIR, conf.get(JobContext.TASK_ATTEMPT_ID));
+  }
+  
+  /**
+   * Return the path to local map output file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput = 
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+          Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   * 
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.MAP_OUTPUT_FILENAME_STRING +
+          Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), JOB_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir,
+        conf.get(JobContext.TASK_ATTEMPT_ID));
+    return new Path(attemptOutputDir, Constants.MAP_OUTPUT_FILENAME_STRING +
+        Constants.MAP_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   * 
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   * 
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            conf.get(JobContext.TASK_ATTEMPT_ID), spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   * 
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException 
+   */
+  public Path getInputFile(int mapId) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   * 
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(org.apache.hadoop.mapreduce.TaskID mapId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        Constants.REDUCE_INPUT_FILE_FORMAT_STRING,
+        getAttemptOutputDir().toString(), mapId.getId()),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    if (conf instanceof JobConf) {
+      this.conf = (JobConf) conf;
+    } else {
+      this.conf = new JobConf(conf);
+    }
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/task/impl/YarnOutputFiles.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.tez.common.TezEngineTask;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.api.Processor;
+import org.apache.tez.engine.api.Task;
+import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.runtime.InputFactory;
+import org.apache.tez.engine.runtime.OutputFactory;
+import org.apache.tez.engine.runtime.ProcessorFactory;
+import org.apache.tez.engine.runtime.TaskFactory;
+import org.apache.tez.engine.runtime.TezEngineFactory;
+import org.apache.tez.engine.runtime.TezEngineFactoryImpl;
+import org.apache.tez.mapreduce.hadoop.MRTaskType;
+import org.apache.tez.mapreduce.input.SimpleInput;
+import org.apache.tez.mapreduce.output.SimpleOutput;
+import org.apache.tez.mapreduce.processor.map.MapProcessor;
+import org.apache.tez.mapreduce.processor.reduce.ReduceProcessor;
+import org.apache.tez.mapreduce.task.FinalTask;
+import org.apache.tez.mapreduce.task.InitialTask;
+import org.apache.tez.mapreduce.task.IntermediateTask;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.inject.Guice;
+import com.google.inject.Inject;
+import com.google.inject.Injector;
+import com.google.inject.assistedinject.Assisted;
+import com.google.inject.assistedinject.FactoryModuleBuilder;
+
+public class TestTaskModules {
+  
+  private static final Log LOG = LogFactory.getLog(TestTaskModules.class);
+
+  TezEngineTask taskContext;
+  JobConf job;
+  
+  @Before
+  public void setUp() {
+    taskContext = new TezEngineTask(TezTestUtils.getMockTaskAttemptId(0, 0, 0,
+        MRTaskType.REDUCE), "tez", "tez", "TODO_vertexName",
+        TestInitialModule.class.getName(), null, null);
+    job = new JobConf();
+  }
+  
+  @Test
+  public void testInitialTask() throws Exception {
+    Injector injector = Guice.createInjector(new TestInitialModule());
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(job, null);
+  }
+
+  @Test
+  public void testIntermediateTask() throws Exception {
+    Injector injector = Guice.createInjector(new TestIntermediateModule());
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task t = factory.createTask(taskContext);
+    t.initialize(job, null);
+  }
+
+  @Test
+  public void testFinalTask() throws Exception {
+    Injector injector = Guice.createInjector(new TestFinalModule());
+    TezEngineFactory factory = injector.getInstance(TezEngineFactory.class);
+    Task task = factory.createTask(taskContext);
+    LOG.info("task = " + task.getClass());
+    task.initialize(job, null);
+  }
+
+  static class TestTask implements Task {
+
+    private final Input in;
+    private final Output out;
+    private final Processor processor;
+    
+    @Inject
+    public TestTask(
+        @Assisted Processor processor, 
+        @Assisted Input in, 
+        @Assisted Output out) {
+      this.in = in;
+      this.processor = processor;
+      this.out = out;
+    }
+    
+    @Override
+    public void initialize(Configuration conf, Master master)
+        throws IOException, InterruptedException {
+      LOG.info("in = " + in.getClass());
+      LOG.info("processor = " + processor.getClass());
+      LOG.info("out = " + out.getClass());
+    }
+
+    @Override
+    public Input getInput() {
+      return in;
+    }
+
+    @Override
+    public Output getOutput() {
+      return out;
+    }
+
+    @Override
+    public Processor getProcessor() {
+      return processor;
+    }
+
+    @Override
+    public void run() throws IOException, InterruptedException {
+      // TODO Auto-generated method stub
+      
+    }
+
+    @Override
+    public void close() throws IOException, InterruptedException {
+      // TODO Auto-generated method stub
+      
+    }
+    
+  }
+  
+  static class TestInitialModule extends InitialTask {
+
+    @Override
+    protected void configure() {    
+      install(
+          new FactoryModuleBuilder().implement(
+              Input.class, SimpleInput.class).
+          build(InputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Output.class, OnFileSortedOutput.class).
+          build(OutputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Processor.class, MapProcessor.class).
+          build(ProcessorFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Task.class, TestTask.class).
+          build(TaskFactory.class)
+          );
+      
+      bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+    }
+    
+  }
+  
+
+  static class TestIntermediateModule extends IntermediateTask {
+
+    @Override
+    protected void configure() {
+      install(
+          new FactoryModuleBuilder().implement(
+              Input.class, ShuffledMergedInput.class).
+          build(InputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Output.class, OnFileSortedOutput.class).
+          build(OutputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Processor.class, ReduceProcessor.class).
+          build(ProcessorFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Task.class, TestTask.class).
+          build(TaskFactory.class)
+          );
+
+      bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+    }
+    
+  }
+  
+
+  static class TestFinalModule extends FinalTask {
+
+    @Override
+    protected void configure() {    
+      install(
+          new FactoryModuleBuilder().implement(
+              Output.class, SimpleOutput.class).
+          build(OutputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Input.class, ShuffledMergedInput.class).
+          build(InputFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Processor.class, ReduceProcessor.class).
+          build(ProcessorFactory.class)
+          );
+      install(
+          new FactoryModuleBuilder().implement(
+              Task.class, TestTask.class).
+          build(TaskFactory.class)
+          );
+      
+      bind(TezEngineFactory.class).to(TezEngineFactoryImpl.class);
+    }
+    
+  }
+  
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestTaskModules.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java?rev=1469642&view=auto
==============================================================================
--- incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java (added)
+++ incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java Thu Apr 18 23:54:18 2013
@@ -0,0 +1,148 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.mapreduce;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.ipc.ProtocolSignature;
+import org.apache.tez.common.ContainerTask;
+import org.apache.tez.common.TezTaskStatus;
+import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.records.TezTaskAttemptID;
+import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
+import org.apache.tez.mapreduce.hadoop.ContainerContext;
+import org.apache.tez.mapreduce.hadoop.TezTaskUmbilicalProtocol;
+import org.apache.tez.mapreduce.hadoop.records.ProceedToCompletionResponse;
+
+public class TestUmbilicalProtocol implements TezTaskUmbilicalProtocol {
+
+  private static final Log LOG = LogFactory.getLog(TestUmbilicalProtocol.class);
+  private ProceedToCompletionResponse proceedToCompletionResponse;
+  
+  
+  public TestUmbilicalProtocol() {
+    proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+  }
+  
+  public TestUmbilicalProtocol(boolean shouldLinger) {
+    if (shouldLinger) {
+      proceedToCompletionResponse = new ProceedToCompletionResponse(false, false);
+    } else {
+      proceedToCompletionResponse = new ProceedToCompletionResponse(false, true);
+    }
+  }
+
+  @Override
+  public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2)
+      throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public long getProtocolVersion(String arg0, long arg1) throws IOException {
+    // TODO Auto-generated method stub
+    return 0;
+  }
+
+  @Override
+  public TezTaskDependencyCompletionEventsUpdate getDependentTasksCompletionEvents(
+      int fromEventIdx, int maxEventsToFetch,
+      TezTaskAttemptID reduce) {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public ContainerTask getTask(ContainerContext containerContext)
+      throws IOException {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override
+  public boolean statusUpdate(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    LOG.info("Got 'status-update' from " + taskId + ": status=" + taskStatus);
+    return true;
+  }
+
+  @Override
+  public void reportDiagnosticInfo(TezTaskAttemptID taskid, String trace)
+      throws IOException {
+    LOG.info("Got 'diagnostic-info' from " + taskid + ": trace=" + trace);
+  }
+
+  @Override
+  public boolean ping(TezTaskAttemptID taskid) throws IOException {
+    LOG.info("Got 'ping' from " + taskid);
+    return true;
+  }
+
+  @Override
+  public void done(TezTaskAttemptID taskid) throws IOException {
+    LOG.info("Got 'done' from " + taskid);
+  }
+
+  @Override
+  public void commitPending(TezTaskAttemptID taskId, TezTaskStatus taskStatus)
+      throws IOException, InterruptedException {
+    LOG.info("Got 'commit-pending' from " + taskId + ": status=" + taskStatus);
+  }
+
+  @Override
+  public boolean canCommit(TezTaskAttemptID taskid) throws IOException {
+    LOG.info("Got 'can-commit' from " + taskid);
+    return false;
+  }
+
+  @Override
+  public void shuffleError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    LOG.info("Got 'shuffle-error' from " + taskId + ": message=" + message);
+  }
+
+  @Override
+  public void fsError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    LOG.info("Got 'fs-error' from " + taskId + ": message=" + message);
+  }
+
+  @Override
+  public void fatalError(TezTaskAttemptID taskId, String message)
+      throws IOException {
+    LOG.info("Got 'fatal-error' from " + taskId + ": message=" + message);
+  }
+
+  @Override
+  public void outputReady(TezTaskAttemptID taskAttemptId,
+      OutputContext outputContext) throws IOException {
+    // TODO Auto-generated method stub
+    
+  }
+
+  @Override
+  public ProceedToCompletionResponse proceedToCompletion(
+      TezTaskAttemptID taskAttemptId) throws IOException {
+    return proceedToCompletionResponse;
+  }
+
+}

Propchange: incubator/tez/branches/TEZ-1/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/TestUmbilicalProtocol.java
------------------------------------------------------------------------------
    svn:eol-style = native