You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/11 06:49:46 UTC

[1/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth)

Updated Branches:
  refs/heads/TEZ-398 e5919fa75 -> 1cf7f197d


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
index 79787b7..29a4b02 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/OnFileSortedOutput.java
@@ -18,52 +18,103 @@
 package org.apache.tez.engine.lib.output;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
 import org.apache.tez.engine.common.sort.impl.ExternalSorter;
 import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.Writer;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
+
+import com.google.common.collect.Lists;
 
 /**
- * {@link OnFileSortedOutput} is an {@link Output} which sorts key/value pairs 
+ * <code>OnFileSortedOutput</code> is an {@link LogicalOutput} which sorts key/value pairs 
  * written to it and persists it to a file.
  */
-public class OnFileSortedOutput implements SortingOutput {
+public class OnFileSortedOutput implements LogicalOutput {
   
   protected ExternalSorter sorter;
+  protected Configuration conf;
+  protected int numOutputs;
+  protected TezOutputContext outputContext;
+  private long startTime;
+  private long endTime;
   
-  public OnFileSortedOutput(TezEngineTaskContext task) throws IOException {
-    sorter = new DefaultSorter(task);
-  }
   
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException {
-    sorter.initialize(conf, master);
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException {
+    this.startTime = System.nanoTime();
+    this.outputContext = outputContext;
+    sorter = new DefaultSorter();
+    this.conf = TezUtils.createConfFromUserPayload(outputContext.getUserPayload());
+    // Initializing this parametr in this conf since it is used in multiple
+    // places (wherever LocalDirAllocator is used) - TezTaskOutputFiles,
+    // TezMerger, etc.
+    this.conf.setStrings(TezJobConfig.LOCAL_DIRS, outputContext.getWorkDirs());
+    sorter.initialize(outputContext, conf, numOutputs);
+    return Collections.emptyList();
   }
 
   @Override
-  public void setTask(RunningTaskContext task) {
-    sorter.setTask(task);
-  }
-  
-  public void write(Object key, Object value) throws IOException,
-      InterruptedException {
-    sorter.write(key, value);
+  public Writer getWriter() throws IOException {
+    return new KVWriter() {
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        sorter.write(key, value);
+      }
+    };
   }
 
-  public void close() throws IOException, InterruptedException {
-    sorter.flush();
-    sorter.close();
+  @Override
+  public void handleEvents(List<Event> outputEvents) {
+    // Not expecting any events.
   }
 
   @Override
-  public OutputContext getOutputContext() {
-    return null;
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numOutputs = numOutputs;
   }
 
+  @Override
+  public List<Event> close() throws IOException {
+    sorter.flush();
+    sorter.close();
+    this.endTime = System.nanoTime();
+
+    String host = System.getenv(ApplicationConstants.Environment.NM_HOST
+        .toString());
+    ByteBuffer shuffleMetadata = outputContext
+        .getServiceProviderMetaData(ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID);
+    int shufflePort = ShuffleUtils.deserializeShuffleMetaData(shuffleMetadata);
+
+    DataMovementEventPayloadProto.Builder payloadBuilder = DataMovementEventPayloadProto
+        .newBuilder();
+    payloadBuilder.setHost(host);
+    payloadBuilder.setPort(shufflePort);
+    payloadBuilder.setPathComponent(outputContext.getUniqueIdentifier());
+    payloadBuilder.setRunDuration((int) ((endTime - startTime) / 1000));
+    DataMovementEventPayloadProto payloadProto = payloadBuilder.build();
+
+    List<Event> events = Lists.newArrayListWithCapacity(numOutputs);
+
+    for (int i = 0; i < numOutputs; i++) {
+      DataMovementEvent event = new DataMovementEvent(i,
+          payloadProto.toByteArray());
+      events.add(event);
+    }
+    return events;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
index 68e0f47..bd0e933 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVReader.java
@@ -22,33 +22,57 @@ import java.io.IOException;
 
 /**
  * A key/value(s) pair based {@link Reader}.
+ * 
+ * Example usage
+ * <code>
+ * while (kvReader.moveToNext()) {
+ *   KVRecord kvRecord = getCurrentKV();
+ *   Object key =  kvRecord.getKey();
+ *   Iterable values = kvRecord.getValues();
+ * </code>
+ *
  */
 public interface KVReader extends Reader {
 
   /**
-   * Check if there is another key/value(s) pair
+   * Moves to the next key/values(s) pair
    * 
-   * @return true if another key/value(s) pair exists
+   * @return true if another key/value(s) pair exists, false if there are no more.
    * @throws IOException
    *           if an error occurs
    */
-  public boolean hasNext() throws IOException;
+  public boolean moveToNext() throws IOException;
 
   /**
-   * Gets the next key.
-   * 
-   * @return the next key, or null if none exists
+   * Return the current key/value(s) pair. Use moveToNext() to advance.
+   * @return
    * @throws IOException
-   *           if an error occurs
    */
-  public Object getNextKey() throws IOException;
+  public KVRecord getCurrentKV() throws IOException;
+  
+  
 
+  
   /**
-   * Get the next values.
-   * 
-   * @return an <code>Iterable</code> view of the values for the current key
-   * @throws IOException
-   *           if an error occurs
+   * Represents a key and an associated set of values
+   *
    */
-  public Iterable<Object> getNextValues() throws IOException;
+  public static class KVRecord {
+
+    private Object key;
+    private Iterable<Object> values;
+
+    public KVRecord(Object key, Iterable<Object> values) {
+      this.key = key;
+      this.values = values;
+    }
+
+    public Object getKey() {
+      return this.key;
+    }
+
+    public Iterable<Object> getValues() {
+      return this.values;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
index f945b63..ad48912 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/KVWriter.java
@@ -23,7 +23,7 @@ import java.io.IOException;
 /**
  * A key/value(s) pair based {@link Writer}
  */
-public interface KVWriter {
+public interface KVWriter extends Writer {
   /**
    * Writes a key/value pair.
    * 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
index b4558d0..1d76d86 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezInputContextImpl.java
@@ -49,6 +49,9 @@ public class TezInputContextImpl extends TezTaskContextImpl
     this.sourceInfo = new EventMetaData(
         EventGenerator.INPUT, taskVertexName, sourceVertexName,
         taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getAttemptNumber(), sourceVertexName); 
   }
 
   @Override
@@ -70,5 +73,4 @@ public class TezInputContextImpl extends TezTaskContextImpl
   public String getSourceVertexName() {
     return sourceVertexName;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
index ba632db..e5b81d0 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezOutputContextImpl.java
@@ -49,6 +49,9 @@ public class TezOutputContextImpl extends TezTaskContextImpl
     this.tezUmbilical = tezUmbilical;
     this.sourceInfo = new EventMetaData(EventGenerator.OUTPUT, taskVertexName,
         destinationVertexName, taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getAttemptNumber(), destinationVertexName);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
index 4e0f061..73c4a54 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezProcessorContextImpl.java
@@ -44,6 +44,9 @@ public class TezProcessorContextImpl extends TezTaskContextImpl
     this.tezUmbilical = tezUmbilical;
     this.sourceInfo = new EventMetaData(EventGenerator.PROCESSOR,
         taskVertexName, "", taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%6d_%2d", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getAttemptNumber());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
index 712eec3..b77bcdd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/newapi/impl/TezTaskContextImpl.java
@@ -18,18 +18,27 @@
 
 package org.apache.tez.engine.newapi.impl;
 
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TezCounters;
 import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 public abstract class TezTaskContextImpl implements TezTaskContext {
 
-  protected final Configuration conf;
+  private final Configuration conf;
   protected final String taskVertexName;
-  protected final TezTaskAttemptID taskAttemptID;
-  protected final TezCounters counters;
+  private final TezTaskAttemptID taskAttemptID;
+  private final TezCounters counters;
+  private String[] workDirs;
+  protected String uniqueIdentifier;
 
   @Private
   public TezTaskContextImpl(Configuration conf,
@@ -39,9 +48,18 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     this.taskVertexName = taskVertexName;
     this.taskAttemptID = taskAttemptID;
     this.counters = counters;
+    // TODO Maybe change this to be task id specific at some point. For now
+    // Shuffle code relies on this being a path specified by YARN
+    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS); 
   }
 
   @Override
+  public ApplicationId getApplicationId() {
+    return taskAttemptID.getTaskID().getVertexID().getDAGId()
+        .getApplicationId();
+  }
+  
+  @Override
   public int getTaskIndex() {
     return taskAttemptID.getTaskID().getId();
   }
@@ -52,8 +70,14 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
   }
 
   @Override
+  public String getDAGName() {
+    // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
+    // the unique identifier.
+    return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
+  }
+  
+  @Override
   public String getTaskVertexName() {
-    // TODO Auto-generated method stub
     return taskVertexName;
   }
 
@@ -63,5 +87,30 @@ public abstract class TezTaskContextImpl implements TezTaskContext {
     return counters;
   }
 
-  // TODO Add a method to get working dir
+  @Override
+  public String[] getWorkDirs() {
+    return Arrays.copyOf(workDirs, workDirs.length);
+  }
+  
+  @Override
+  public String getUniqueIdentifier() {
+    return uniqueIdentifier;
+  }
+  
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    // TODO NEWTEZ Implement once the TezContext communication is setup.
+  }
+  
+  @Override
+  public ByteBuffer getServiceConsumerMetaData(String serviceName) {
+    // TODO NEWTEZ Make sure this data is set by the AM for the Shuffle service name.
+    return null;
+  }
+  
+  @Override
+  public ByteBuffer getServiceProviderMetaData(String serviceName) {
+    return AuxiliaryServiceHelper.getServiceDataFromEnv(
+        ShuffleUtils.SHUFFLE_HANDLER_SERVICE_ID, System.getenv());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
new file mode 100644
index 0000000..3a6b2e4
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/shuffle/common/ShuffleUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.shuffle.common;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.io.DataInputByteBuffer;
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.engine.common.security.JobTokenIdentifier;
+import org.apache.tez.engine.common.security.JobTokenSecretManager;
+
+public class ShuffleUtils {
+
+  public static String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce.shuffle";
+
+  public static SecretKey getJobTokenSecretFromTokenBytes(ByteBuffer meta)
+      throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    in.reset(meta);
+    Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
+    jt.readFields(in);
+    SecretKey sk = JobTokenSecretManager.createSecretKey(jt.getPassword());
+    return sk;
+  }
+
+  public static int deserializeShuffleMetaData(ByteBuffer meta)
+      throws IOException {
+    DataInputByteBuffer in = new DataInputByteBuffer();
+    try {
+      in.reset(meta);
+      int port = in.readInt();
+      return port;
+    } finally {
+      in.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
index b2f1318..9bc430b 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/MRRSleepJob.java
@@ -80,8 +80,8 @@ import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.engine.common.objectregistry.ObjectLifeCycle;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistry;
 import org.apache.tez.engine.common.objectregistry.ObjectRegistryFactory;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
@@ -611,9 +611,9 @@ public class MRRSleepJob extends Configured implements Tool {
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
                 SchedulingType.SEQUENTIAL, 
                 new OutputDescriptor(
-                    OnFileSortedOutput.class.getName()),
+                    OldOnFileSortedOutput.class.getName()),
                 new InputDescriptor(
-                    ShuffledMergedInput.class.getName()))));
+                    OldShuffledMergedInput.class.getName()))));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
index 7fae4a3..016fbda 100644
--- a/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
+++ b/tez-mapreduce-examples/src/main/java/org/apache/tez/mapreduce/examples/OrderedWordCount.java
@@ -70,8 +70,8 @@ import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
 import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -312,9 +312,9 @@ public class OrderedWordCount {
                 DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
                 SchedulingType.SEQUENTIAL, 
                 new OutputDescriptor(
-                    OnFileSortedOutput.class.getName()),
+                    OldOnFileSortedOutput.class.getName()),
                 new InputDescriptor(
-                    ShuffledMergedInput.class.getName()))));
+                    OldShuffledMergedInput.class.getName()))));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
index 701ca87..12953e4 100644
--- a/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
+++ b/tez-mapreduce-tests/src/test/java/org/apache/tez/mapreduce/TestMRRJobsDAGApi.java
@@ -68,8 +68,8 @@ import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
 import org.apache.tez.dag.api.client.DAGStatus.State;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.examples.MRRSleepJob;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.ISleepReducer;
 import org.apache.tez.mapreduce.examples.MRRSleepJob.MRRSleepJobPartitioner;
@@ -396,23 +396,23 @@ public class TestMRRJobsDAGApi {
     Edge edge1 = new Edge(stage1Vertex, stage2Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
     Edge edge11 = new Edge(stage11Vertex, stage22Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
     Edge edge2 = new Edge(stage2Vertex, stage3Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
     Edge edge3 = new Edge(stage22Vertex, stage3Vertex, new EdgeProperty(
         DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED, 
         SchedulingType.SEQUENTIAL, new OutputDescriptor(
-        OnFileSortedOutput.class.getName()), new InputDescriptor(
-                ShuffledMergedInput.class.getName())));
+        OldOnFileSortedOutput.class.getName()), new InputDescriptor(
+                OldShuffledMergedInput.class.getName())));
 
     dag.addEdge(edge1);
     dag.addEdge(edge11);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
index 204f517..7df783b 100644
--- a/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
+++ b/tez-mapreduce/src/main/java/org/apache/hadoop/mapred/LocalJobRunnerTez.java
@@ -76,8 +76,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
 import org.apache.tez.engine.newapi.impl.TezEvent;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatRequest;
 import org.apache.tez.engine.newapi.impl.TezHeartbeatResponse;
@@ -257,7 +257,7 @@ public class LocalJobRunnerTez implements ClientProtocol {
                   Collections.singletonList(new InputSpec("srcVertex", 0,
                       SimpleInput.class.getName())),
                   Collections.singletonList(new OutputSpec("tgtVertex", 0,
-                      LocalOnFileSorterOutput.class.getName())));
+                      OldLocalOnFileSorterOutput.class.getName())));
 
           TezTaskOutput mapOutput = new TezLocalTaskOutputFiles();
           mapOutput.setConf(localConf);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
index 5793104..4fb1876 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/processor/reduce/ReduceProcessor.java
@@ -48,7 +48,7 @@ import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.SortingOutput;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
 import org.apache.tez.mapreduce.hadoop.mapred.TaskAttemptContextImpl;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.output.SimpleOutput;
@@ -108,12 +108,12 @@ implements Processor {
 
     if (in instanceof SimpleInput) {
       ((SimpleInput)in).setTask(this);
-    } else if (in instanceof ShuffledMergedInput) {
-      ((ShuffledMergedInput)in).setTask(this);
+    } else if (in instanceof OldShuffledMergedInput) {
+      ((OldShuffledMergedInput)in).setTask(this);
     }
     
     if(ins.length > 1) {
-      if (!(in instanceof ShuffledMergedInput)) {
+      if (!(in instanceof OldShuffledMergedInput)) {
         throw new IOException(
             "Only ShuffledMergedInput can support multiple inputs"
                 + ". inputCount=" + ins.length);
@@ -124,15 +124,15 @@ implements Processor {
                 + ins.length + " From contex:" + inputs.size());
       }
       // initialize and merge the remaining
-      ShuffledMergedInput s0 = ((ShuffledMergedInput)in);
+      OldShuffledMergedInput s0 = ((OldShuffledMergedInput)in);
       for(int i=1; i<ins.length; ++i) {
         Input inputi = ins[i];
-        if (!(inputi instanceof ShuffledMergedInput)) {
+        if (!(inputi instanceof OldShuffledMergedInput)) {
           throw new IOException(
               "Only ShuffledMergedInput can support multiple inputs"
                   + ". inputCount=" + ins.length);
         }      
-        ShuffledMergedInput si = ((ShuffledMergedInput)inputi);
+        OldShuffledMergedInput si = ((OldShuffledMergedInput)inputi);
         s0.mergeWith(si);
       }
     }
@@ -162,10 +162,10 @@ implements Processor {
         reporter.getCounter(TaskCounter.REDUCE_INPUT_RECORDS);
         
     // Sanity check
-    if (!(in instanceof ShuffledMergedInput)) {
+    if (!(in instanceof OldShuffledMergedInput)) {
       throw new IOException("Illegal input to reduce: " + in.getClass());
     }
-    ShuffledMergedInput shuffleInput = (ShuffledMergedInput)in;
+    OldShuffledMergedInput shuffleInput = (OldShuffledMergedInput)in;
 
     if (useNewApi) {
       try {
@@ -194,7 +194,7 @@ implements Processor {
   void runOldReducer(JobConf job,
       TezTaskUmbilicalProtocol umbilical,
       final MRTaskReporter reporter,
-      ShuffledMergedInput input,
+      OldShuffledMergedInput input,
       RawComparator comparator,
       Class keyClass,
       Class valueClass,
@@ -265,7 +265,7 @@ implements Processor {
     private Counter reduceInputValueCounter;
     private Progress reducePhase;
 
-    public ReduceValuesIterator (ShuffledMergedInput in,
+    public ReduceValuesIterator (OldShuffledMergedInput in,
         RawComparator<KEY> comparator, 
         Class<KEY> keyClass,
         Class<VALUE> valClass,
@@ -297,7 +297,7 @@ implements Processor {
   void runNewReducer(JobConf job,
       final TezTaskUmbilicalProtocol umbilical,
       final MRTaskReporter reporter,
-      ShuffledMergedInput input,
+      OldShuffledMergedInput input,
       RawComparator comparator,
       Class keyClass,
       Class valueClass,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index ae62251..3610f9f 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -38,8 +38,7 @@ import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.output.InMemorySortedOutput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfToTezTranslator;
 import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil;
@@ -47,14 +46,9 @@ import org.apache.tez.mapreduce.hadoop.mapreduce.TezNullOutputCommitter;
 import org.apache.tez.mapreduce.input.SimpleInput;
 import org.apache.tez.mapreduce.processor.MRTask;
 import org.apache.tez.mapreduce.processor.MapUtils;
-import org.jboss.netty.buffer.BigEndianHeapChannelBuffer;
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.TruncatedChannelBuffer;
-import org.jboss.netty.handler.stream.ChunkedStream;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 
 @SuppressWarnings("deprecation")
@@ -122,7 +116,7 @@ public class TestMapProcessor {
         Collections.singletonList(new InputSpec("NullVertex", 0,
             SimpleInput.class.getName())),
         Collections.singletonList(new OutputSpec("FakeVertex", 1,
-            LocalOnFileSorterOutput.class.getName())));
+            OldLocalOnFileSorterOutput.class.getName())));
 
     MRTask mrTask = (MRTask)t.getProcessor();
     Assert.assertEquals(TezNullOutputCommitter.class.getName(), mrTask
@@ -151,76 +145,76 @@ public class TestMapProcessor {
     reader.close();
   }
 
-  @Test
-  @Ignore
-  public void testMapProcessorWithInMemSort() throws Exception {
-    
-    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
-    
-    final int partitions = 2;
-    JobConf jobConf = new JobConf(defaultConf);
-    jobConf.setNumReduceTasks(partitions);
-    setUpJobConf(jobConf);
-    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
-    mapOutputs.setConf(jobConf);
-    
-    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
-    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
-        vertexName);
-    
-    JobConf job = new JobConf(stageConf);
-
-    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
-        "localized-resources").toUri().toString());
-    localFs.delete(workDir, true);
-    Task t =
-        MapUtils.runMapProcessor(
-            localFs, workDir, job, 0, new Path(workDir, "map0"), 
-            new TestUmbilicalProtocol(true), vertexName, 
-            Collections.singletonList(new InputSpec("NullVertex", 0,
-                SimpleInput.class.getName())),
-            Collections.singletonList(new OutputSpec("FakeVertex", 1,
-                InMemorySortedOutput.class.getName()))
-            );
-    InMemorySortedOutput[] outputs = (InMemorySortedOutput[])t.getOutputs();
-    
-    verifyInMemSortedStream(outputs[0], 0, 4096);
-    int i = 0;
-    for (i = 2; i < 256; i <<= 1) {
-      verifyInMemSortedStream(outputs[0], 0, i);
-    }
-    verifyInMemSortedStream(outputs[0], 1, 4096);
-    for (i = 2; i < 256; i <<= 1) {
-      verifyInMemSortedStream(outputs[0], 1, i);
-    }
-
-    t.close();
-  }
-  
-  private void verifyInMemSortedStream(
-      InMemorySortedOutput output, int partition, int chunkSize) 
-          throws Exception {
-    ChunkedStream cs = 
-        new ChunkedStream(
-            output.getSorter().getSortedStream(partition), chunkSize);
-    int actualBytes = 0;
-    ChannelBuffer b = null;
-    while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
-      LOG.info("b = " + b);
-      actualBytes += 
-          (b instanceof TruncatedChannelBuffer) ? 
-              ((TruncatedChannelBuffer)b).capacity() :
-              ((BigEndianHeapChannelBuffer)b).readableBytes();
-    }
-    
-    LOG.info("verifyInMemSortedStream" +
-    		" partition=" + partition + 
-    		" chunkSize=" + chunkSize +
-        " expected=" + 
-    		output.getSorter().getShuffleHeader(partition).getCompressedLength() + 
-        " actual=" + actualBytes);
-    Assert.assertEquals(
-        output.getSorter().getShuffleHeader(partition).getCompressedLength(), 
-        actualBytes);
-  }
+//  @Test
+//  @Ignore
+//  public void testMapProcessorWithInMemSort() throws Exception {
+//    
+//    String vertexName = MultiStageMRConfigUtil.getInitialMapVertexName();
+//    
+//    final int partitions = 2;
+//    JobConf jobConf = new JobConf(defaultConf);
+//    jobConf.setNumReduceTasks(partitions);
+//    setUpJobConf(jobConf);
+//    TezTaskOutput mapOutputs = new TezLocalTaskOutputFiles();
+//    mapOutputs.setConf(jobConf);
+//    
+//    Configuration conf = MultiStageMRConfToTezTranslator.convertMRToLinearTez(jobConf);
+//    Configuration stageConf = MultiStageMRConfigUtil.getConfForVertex(conf,
+//        vertexName);
+//    
+//    JobConf job = new JobConf(stageConf);
+//
+//    job.set(TezJobConfig.TASK_LOCAL_RESOURCE_DIR, new Path(workDir,
+//        "localized-resources").toUri().toString());
+//    localFs.delete(workDir, true);
+//    Task t =
+//        MapUtils.runMapProcessor(
+//            localFs, workDir, job, 0, new Path(workDir, "map0"), 
+//            new TestUmbilicalProtocol(true), vertexName, 
+//            Collections.singletonList(new InputSpec("NullVertex", 0,
+//                SimpleInput.class.getName())),
+//            Collections.singletonList(new OutputSpec("FakeVertex", 1,
+//                OldInMemorySortedOutput.class.getName()))
+//            );
+//    OldInMemorySortedOutput[] outputs = (OldInMemorySortedOutput[])t.getOutputs();
+//    
+//    verifyInMemSortedStream(outputs[0], 0, 4096);
+//    int i = 0;
+//    for (i = 2; i < 256; i <<= 1) {
+//      verifyInMemSortedStream(outputs[0], 0, i);
+//    }
+//    verifyInMemSortedStream(outputs[0], 1, 4096);
+//    for (i = 2; i < 256; i <<= 1) {
+//      verifyInMemSortedStream(outputs[0], 1, i);
+//    }
+//
+//    t.close();
+//  }
+//  
+//  private void verifyInMemSortedStream(
+//      OldInMemorySortedOutput output, int partition, int chunkSize) 
+//          throws Exception {
+//    ChunkedStream cs = 
+//        new ChunkedStream(
+//            output.getSorter().getSortedStream(partition), chunkSize);
+//    int actualBytes = 0;
+//    ChannelBuffer b = null;
+//    while ((b = (ChannelBuffer)cs.nextChunk()) != null) {
+//      LOG.info("b = " + b);
+//      actualBytes += 
+//          (b instanceof TruncatedChannelBuffer) ? 
+//              ((TruncatedChannelBuffer)b).capacity() :
+//              ((BigEndianHeapChannelBuffer)b).readableBytes();
+//    }
+//    
+//    LOG.info("verifyInMemSortedStream" +
+//    		" partition=" + partition + 
+//    		" chunkSize=" + chunkSize +
+//        " expected=" + 
+//    		output.getSorter().getShuffleHeader(partition).getCompressedLength() + 
+//        " actual=" + actualBytes);
+//    Assert.assertEquals(
+//        output.getSorter().getShuffleHeader(partition).getCompressedLength(), 
+//        actualBytes);
+//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
index 2428000..2a121a6 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/reduce/TestReduceProcessor.java
@@ -40,8 +40,8 @@ import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.engine.api.Task;
 import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
 import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.lib.input.LocalMergedInput;
-import org.apache.tez.engine.lib.output.LocalOnFileSorterOutput;
+import org.apache.tez.engine.lib.oldinput.LocalMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldLocalOnFileSorterOutput;
 import org.apache.tez.engine.runtime.RuntimeUtils;
 import org.apache.tez.mapreduce.TestUmbilicalProtocol;
 import org.apache.tez.mapreduce.TezTestUtils;
@@ -122,7 +122,7 @@ public class TestReduceProcessor {
         Collections.singletonList(new InputSpec("NullVertex", 0,
             SimpleInput.class.getName())),
         Collections.singletonList(new OutputSpec("FakeVertex", 1,
-            LocalOnFileSorterOutput.class.getName())));
+            OldLocalOnFileSorterOutput.class.getName())));
 
     LOG.info("Starting reduce...");
     

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
----------------------------------------------------------------------
diff --git a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
index 0fb823f..2d59b18 100644
--- a/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
+++ b/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java
@@ -95,8 +95,8 @@ import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
-import org.apache.tez.engine.lib.input.ShuffledMergedInput;
-import org.apache.tez.engine.lib.output.OnFileSortedOutput;
+import org.apache.tez.engine.lib.oldinput.OldShuffledMergedInput;
+import org.apache.tez.engine.lib.oldoutput.OldOnFileSortedOutput;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.MRHelpers;
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
@@ -463,8 +463,8 @@ public class YARNRunner implements ClientProtocol {
         EdgeProperty edgeProperty = new EdgeProperty(
             DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
             SchedulingType.SEQUENTIAL, 
-            new OutputDescriptor(OnFileSortedOutput.class.getName()),
-            new InputDescriptor(ShuffledMergedInput.class.getName()));
+            new OutputDescriptor(OldOnFileSortedOutput.class.getName()),
+            new InputDescriptor(OldShuffledMergedInput.class.getName()));
 
         Edge edge = null;
         edge = new Edge(vertices[i - 1], vertices[i], edgeProperty);


[3/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth)

Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index 16ded35..a5401fa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -25,10 +25,12 @@ import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.ChecksumFileSystem;
 import org.apache.hadoop.fs.FileSystem;
@@ -39,26 +41,21 @@ import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.combine.CombineInput;
-import org.apache.tez.engine.common.combine.CombineOutput;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.hadoop.compat.NullProgressable;
+import org.apache.tez.engine.newapi.Processor;
+import org.apache.tez.engine.newapi.TezInputContext;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -66,15 +63,15 @@ import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
 public class MergeManager {
   
   private static final Log LOG = LogFactory.getLog(MergeManager.class);
-  
-  private final TezTaskAttemptID taskAttemptId;
-  
+
   private final Configuration conf;
   private final FileSystem localFS;
   private final FileSystem rfs;
   private final LocalDirAllocator localDirAllocator;
   
   private final  TezTaskOutputFiles mapOutputFile;
+  private final Progressable nullProgressable = new NullProgressable();
+  private final Processor combineProcessor = null; // TODO NEWTEZ Fix CombineProcessor  
   
   Set<MapOutput> inMemoryMergedMapOutputs = 
     new TreeSet<MapOutput>(new MapOutput.MapOutputComparator());
@@ -97,13 +94,15 @@ public class MergeManager {
   
   private final int ioSortFactor;
 
-  private final TezTaskReporter reporter;
   private final ExceptionReporter exceptionReporter;
   
+  private final TezInputContext inputContext;
+  
   /**
    * Combiner processor to run during in-memory merge, if defined.
    */
-  private final Processor combineProcessor;
+  // TODO NEWTEZ Fix Combiner
+  //private final Processor combineProcessor;
 
   private final TezCounter spilledRecordsCounter;
 
@@ -113,31 +112,28 @@ public class MergeManager {
   
   private final CompressionCodec codec;
   
-  private final Progress mergePhase;
+  private volatile boolean finalMergeComplete = false;
 
-  public MergeManager(TezTaskAttemptID taskAttemptId, 
-                      Configuration conf, 
+  public MergeManager(Configuration conf, 
                       FileSystem localFS,
                       LocalDirAllocator localDirAllocator,  
-                      TezTaskReporter reporter,
+                      TezInputContext inputContext,
                       Processor combineProcessor,
                       TezCounter spilledRecordsCounter,
                       TezCounter reduceCombineInputCounter,
                       TezCounter mergedMapOutputsCounter,
-                      ExceptionReporter exceptionReporter,
-                      Progress mergePhase) {
-    this.taskAttemptId = taskAttemptId;
+                      ExceptionReporter exceptionReporter) {
+    // TODO NEWTEZ Change to include Combiner
+    this.inputContext = inputContext;
     this.conf = conf;
     this.localDirAllocator = localDirAllocator;
     this.exceptionReporter = exceptionReporter;
     
-    this.reporter = reporter;
-    this.combineProcessor = combineProcessor;
+    //this.combineProcessor = combineProcessor;
     this.reduceCombineInputCounter = reduceCombineInputCounter;
     this.spilledRecordsCounter = spilledRecordsCounter;
     this.mergedMapOutputsCounter = mergedMapOutputsCounter;
-    this.mapOutputFile = new TezTaskOutputFiles();
-    this.mapOutputFile.setConf(conf);
+    this.mapOutputFile = new TezTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
     
     this.localFS = localFS;
     this.rfs = ((LocalFileSystem)localFS).getRaw();
@@ -224,13 +220,6 @@ public class MergeManager {
     
     this.onDiskMerger = new OnDiskMerger(this);
     this.onDiskMerger.start();
-    
-    this.mergePhase = mergePhase;
-  }
-  
-
-  TezTaskAttemptID getReduceId() {
-    return taskAttemptId;
   }
 
   public void waitForInMemoryMerge() throws InterruptedException {
@@ -240,18 +229,18 @@ public class MergeManager {
   private boolean canShuffleToMemory(long requestedSize) {
     return (requestedSize < maxSingleShuffleLimit); 
   }
-  
+
   final private MapOutput stallShuffle = new MapOutput(null);
 
-  public synchronized MapOutput reserve(TezTaskAttemptID mapId, 
+  public synchronized MapOutput reserve(TaskAttemptIdentifier srcAttemptIdentifier, 
                                              long requestedSize,
                                              int fetcher
                                              ) throws IOException {
     if (!canShuffleToMemory(requestedSize)) {
-      LOG.info(mapId + ": Shuffling to disk since " + requestedSize + 
+      LOG.info(srcAttemptIdentifier + ": Shuffling to disk since " + requestedSize + 
                " is greater than maxSingleShuffleLimit (" + 
                maxSingleShuffleLimit + ")");
-      return new MapOutput(mapId, this, requestedSize, conf, 
+      return new MapOutput(srcAttemptIdentifier, this, requestedSize, conf, 
                                 localDirAllocator, fetcher, true,
                                 mapOutputFile);
     }
@@ -272,17 +261,17 @@ public class MergeManager {
     // all the stalled threads
     
     if (usedMemory > memoryLimit) {
-      LOG.debug(mapId + ": Stalling shuffle since usedMemory (" + usedMemory
+      LOG.debug(srcAttemptIdentifier + ": Stalling shuffle since usedMemory (" + usedMemory
           + ") is greater than memoryLimit (" + memoryLimit + ")." + 
           " CommitMemory is (" + commitMemory + ")"); 
       return stallShuffle;
     }
     
     // Allow the in-memory shuffle to progress
-    LOG.debug(mapId + ": Proceeding with shuffle since usedMemory ("
+    LOG.debug(srcAttemptIdentifier + ": Proceeding with shuffle since usedMemory ("
         + usedMemory + ") is lesser than memoryLimit (" + memoryLimit + ")."
         + "CommitMemory is (" + commitMemory + ")"); 
-    return unconditionalReserve(mapId, requestedSize, true);
+    return unconditionalReserve(srcAttemptIdentifier, requestedSize, true);
   }
   
   /**
@@ -290,9 +279,9 @@ public class MergeManager {
    * @return
    */
   private synchronized MapOutput unconditionalReserve(
-      TezTaskAttemptID mapId, long requestedSize, boolean primaryMapOutput) {
+      TaskAttemptIdentifier srcAttemptIdentifier, long requestedSize, boolean primaryMapOutput) {
     usedMemory += requestedSize;
-    return new MapOutput(mapId, this, (int)requestedSize, 
+    return new MapOutput(srcAttemptIdentifier, this, (int)requestedSize, 
         primaryMapOutput);
   }
   
@@ -349,6 +338,18 @@ public class MergeManager {
       }
     }
   }
+
+  /**
+   * Should <b>only</b> be used after the Shuffle phaze is complete, otherwise can
+   * return an invalid state since a merge may not be in progress dur to
+   * inadequate inputs
+   * 
+   * @return true if the merge process is complete, otherwise false
+   */
+  @Private
+  public boolean isMergeComplete() {
+    return finalMergeComplete;
+  }
   
   public TezRawKeyValueIterator close() throws Throwable {
     // Wait for on-going merges to complete
@@ -362,28 +363,32 @@ public class MergeManager {
       new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
     memory.addAll(inMemoryMapOutputs);
     List<Path> disk = new ArrayList<Path>(onDiskMapOutputs);
-    return finalMerge(conf, rfs, memory, disk);
+    TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+    this.finalMergeComplete = true;
+    return kvIter;
   }
    
   void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
   throws IOException, InterruptedException {
 
-    CombineInput combineIn = new CombineInput(kvIter);
-    combineIn.initialize(conf, reporter);
+    // TODO NEWTEZ Fix CombineProcessor
     
-    CombineOutput combineOut = new CombineOutput(writer);
-    combineOut.initialize(conf, reporter);
-
-    try {
-      combineProcessor.process(new Input[] {combineIn},
-          new Output[] {combineOut});
-    } catch (IOException ioe) {
-      try {
-        combineProcessor.close();
-      } catch (IOException ignoredException) {}
-
-      throw ioe;
-    }
+//    CombineInput combineIn = new CombineInput(kvIter);
+//    combineIn.initialize(conf, reporter);
+//    
+//    CombineOutput combineOut = new CombineOutput(writer);
+//    combineOut.initialize(conf, reporter);
+//
+//    try {
+//      combineProcessor.process(new Input[] {combineIn},
+//          new Output[] {combineOut});
+//    } catch (IOException ioe) {
+//      try {
+//        combineProcessor.close();
+//      } catch (IOException ignoredException) {}
+//
+//      throw ioe;
+//    }
   
   }
 
@@ -404,7 +409,7 @@ public class MergeManager {
         return;
       }
 
-      TezTaskAttemptID dummyMapId = inputs.get(0).getMapId(); 
+      TaskAttemptIdentifier dummyMapId = inputs.get(0).getAttemptIdentifier(); 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
         createInMemorySegments(inputs, inMemorySegments, 0);
@@ -424,13 +429,13 @@ public class MergeManager {
                        ConfigUtils.getIntermediateInputKeyClass(conf),
                        ConfigUtils.getIntermediateInputValueClass(conf),
                        inMemorySegments, inMemorySegments.size(),
-                       new Path(taskAttemptId.toString()),
+                       new Path(inputContext.getUniqueIdentifier()),
                        (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-                       reporter, null, null, null);
-      TezMerger.writeFile(rIter, writer, reporter, conf);
+                       nullProgressable, null, null, null);
+      TezMerger.writeFile(rIter, writer, nullProgressable, conf);
       writer.close();
 
-      LOG.info(taskAttemptId +  
+      LOG.info(inputContext.getUniqueIdentifier() +  
                " Memory-to-Memory merge of the " + noInMemorySegments +
                " files in-memory complete.");
 
@@ -463,8 +468,7 @@ public class MergeManager {
       //in the merge method)
 
       //figure out the mapId 
-      TezTaskAttemptID mapId = inputs.get(0).getMapId();
-      TezTaskID mapTaskId = mapId.getTaskID();
+      TaskAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
@@ -472,7 +476,7 @@ public class MergeManager {
       int noInMemorySegments = inMemorySegments.size();
 
       Path outputPath = 
-        mapOutputFile.getInputFileForWrite(mapTaskId,
+        mapOutputFile.getInputFileForWrite(srcTaskIdentifier.getTaskIndex(),
                                            mergeOutputSize).suffix(
                                                Constants.MERGED_OUTPUT_PREFIX);
 
@@ -492,19 +496,19 @@ public class MergeManager {
             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
             inMemorySegments, inMemorySegments.size(),
-            new Path(taskAttemptId.toString()),
+            new Path(inputContext.getUniqueIdentifier()),
             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
-            reporter, spilledRecordsCounter, null, null);
+            nullProgressable, spilledRecordsCounter, null, null);
 
         if (null == combineProcessor) {
-          TezMerger.writeFile(rIter, writer, reporter, conf);
+          TezMerger.writeFile(rIter, writer, nullProgressable, conf);
         } else {
           runCombineProcessor(rIter, writer);
         }
         writer.close();
         writer = null;
 
-        LOG.info(taskAttemptId +  
+        LOG.info(inputContext.getUniqueIdentifier() +  
             " Merge of the " + noInMemorySegments +
             " files in-memory complete." +
             " Local file is " + outputPath + " of size " + 
@@ -568,7 +572,7 @@ public class MergeManager {
                         (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                         codec, null);
       TezRawKeyValueIterator iter  = null;
-      Path tmpDir = new Path(taskAttemptId.toString());
+      Path tmpDir = new Path(inputContext.getUniqueIdentifier());
       try {
         iter = TezMerger.merge(conf, rfs,
                             (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
@@ -576,10 +580,10 @@ public class MergeManager {
                             codec, inputs.toArray(new Path[inputs.size()]), 
                             true, ioSortFactor, tmpDir, 
                             (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf), 
-                            reporter, spilledRecordsCounter, null, 
+                            nullProgressable, spilledRecordsCounter, null, 
                             mergedMapOutputsCounter, null);
 
-        TezMerger.writeFile(iter, writer, reporter, conf);
+        TezMerger.writeFile(iter, writer, nullProgressable, conf);
         writer.close();
       } catch (IOException e) {
         localFS.delete(outputPath, true);
@@ -588,7 +592,7 @@ public class MergeManager {
 
       closeOnDiskFile(outputPath);
 
-      LOG.info(taskAttemptId +
+      LOG.info(inputContext.getUniqueIdentifier() +
           " Finished merging " + inputs.size() + 
           " map output files on disk of total-size " + 
           approxOutputSize + "." + 
@@ -615,7 +619,7 @@ public class MergeManager {
       totalSize += size;
       fullSize -= size;
       IFile.Reader reader = new InMemoryReader(MergeManager.this, 
-                                                   mo.getMapId(),
+                                                   mo.getAttemptIdentifier(),
                                                    data, 0, (int)size);
       inMemorySegments.add(new Segment(reader, true, 
                                             (mo.isPrimaryMapOutput() ? 
@@ -683,7 +687,7 @@ public class MergeManager {
     // merge config params
     Class keyClass = (Class)ConfigUtils.getIntermediateInputKeyClass(job);
     Class valueClass = (Class)ConfigUtils.getIntermediateInputValueClass(job);
-    final Path tmpDir = new Path(taskAttemptId.toString());
+    final Path tmpDir = new Path(inputContext.getUniqueIdentifier());
     final RawComparator comparator =
       (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(job);
 
@@ -692,7 +696,7 @@ public class MergeManager {
     long inMemToDiskBytes = 0;
     boolean mergePhaseFinished = false;
     if (inMemoryMapOutputs.size() > 0) {
-      TezTaskID mapId = inMemoryMapOutputs.get(0).getMapId().getTaskID();
+      int srcTaskId = inMemoryMapOutputs.get(0).getAttemptIdentifier().getTaskIndex();
       inMemToDiskBytes = createInMemorySegments(inMemoryMapOutputs, 
                                                 memDiskSegments,
                                                 maxInMemReduce);
@@ -710,17 +714,16 @@ public class MergeManager {
         mergePhaseFinished = true;
         // must spill to disk, but can't retain in-mem for intermediate merge
         final Path outputPath = 
-          mapOutputFile.getInputFileForWrite(mapId,
+          mapOutputFile.getInputFileForWrite(srcTaskId,
                                              inMemToDiskBytes).suffix(
                                                  Constants.MERGED_OUTPUT_PREFIX);
         final TezRawKeyValueIterator rIter = TezMerger.merge(job, fs,
             keyClass, valueClass, memDiskSegments, numMemDiskSegments,
-            tmpDir, comparator, reporter, spilledRecordsCounter, null, 
-            mergePhase);
+            tmpDir, comparator, nullProgressable, spilledRecordsCounter, null, null);
         final Writer writer = new Writer(job, fs, outputPath,
             keyClass, valueClass, codec, null);
         try {
-          TezMerger.writeFile(rIter, writer, reporter, job);
+          TezMerger.writeFile(rIter, writer, nullProgressable, job);
           // add to list of final disk outputs.
           onDiskMapOutputs.add(outputPath);
         } catch (IOException e) {
@@ -784,13 +787,10 @@ public class MergeManager {
       final int numInMemSegments = memDiskSegments.size();
       diskSegments.addAll(0, memDiskSegments);
       memDiskSegments.clear();
-      // Pass mergePhase only if there is a going to be intermediate
-      // merges. See comment where mergePhaseFinished is being set
-      Progress thisPhase = (mergePhaseFinished) ? null : mergePhase; 
       TezRawKeyValueIterator diskMerge = TezMerger.merge(
           job, fs, keyClass, valueClass, diskSegments,
           ioSortFactor, numInMemSegments, tmpDir, comparator,
-          reporter, false, spilledRecordsCounter, null, thisPhase);
+          nullProgressable, false, spilledRecordsCounter, null, null);
       diskSegments.clear();
       if (0 == finalSegments.size()) {
         return diskMerge;
@@ -800,7 +800,7 @@ public class MergeManager {
     }
     return TezMerger.merge(job, fs, keyClass, valueClass,
                  finalSegments, finalSegments.size(), tmpDir,
-                 comparator, reporter, spilledRecordsCounter, null,
+                 comparator, nullProgressable, spilledRecordsCounter, null,
                  null);
   
   }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
index 69dd036..9dd213e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Shuffle.java
@@ -18,6 +18,10 @@
 package org.apache.tez.engine.common.shuffle.impl;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
@@ -27,18 +31,16 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.util.Progress;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.common.TezTaskStatus;
-import org.apache.tez.common.TezUtils;
-import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.dag.api.TezUncheckedException;
-import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+
+import com.google.common.base.Preconditions;
 
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
@@ -46,160 +48,176 @@ public class Shuffle implements ExceptionReporter {
   
   private static final Log LOG = LogFactory.getLog(Shuffle.class);
   private static final int PROGRESS_FREQUENCY = 2000;
-  private static final int MAX_EVENTS_TO_FETCH = 10000;
-  private static final int MIN_EVENTS_TO_FETCH = 100;
-  private static final int MAX_RPC_OUTSTANDING_EVENTS = 3000000;
-
-  private final TezEngineTaskContext taskContext;
-  private final RunningTaskContext runningTaskContext;
+  
   private final Configuration conf;
-  private final TezTaskReporter reporter;
+  private final TezInputContext inputContext;
   private final ShuffleClientMetrics metrics;
-  
+
+  private final ShuffleInputEventHandler eventHandler;
   private final ShuffleScheduler scheduler;
   private final MergeManager merger;
   private Throwable throwable = null;
   private String throwingThreadName = null;
-  private final Progress copyPhase;
-  private final Progress mergePhase;
-  private final int tasksInDegree;
+  private final int numInputs;
   private final AtomicInteger reduceStartId;
   private AtomicInteger reduceRange = new AtomicInteger(
       TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
   
-  public Shuffle(TezEngineTaskContext taskContext,
-                 RunningTaskContext runningTaskContext,
-                 Configuration conf,
-                 int tasksInDegree,
-                 TezTaskReporter reporter,
-                 Processor combineProcessor
-                 ) throws IOException {
-    this.taskContext = taskContext;
-    this.runningTaskContext = runningTaskContext;
+  private FutureTask<TezRawKeyValueIterator> runShuffleFuture;
+
+  public Shuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    // TODO NEWTEZ Handle Combiner
+    this.inputContext = inputContext;
     this.conf = conf;
-    this.reporter = reporter;
-    this.metrics = 
-        new ShuffleClientMetrics(
-            taskContext.getTaskAttemptId(), this.conf, 
-            this.taskContext.getUser(), this.taskContext.getJobName());
-    this.tasksInDegree = tasksInDegree;
+    this.metrics = new ShuffleClientMetrics(inputContext.getDAGName(),
+        inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
+        this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
+            
+    this.numInputs = numInputs;
     
     FileSystem localFS = FileSystem.getLocal(this.conf);
     LocalDirAllocator localDirAllocator = 
         new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-    
-    copyPhase = this.runningTaskContext.getProgress().addPhase("copy");
-    mergePhase = this.runningTaskContext.getProgress().addPhase("merge");
 
     // TODO TEZ Get rid of Map / Reduce references.
     TezCounter shuffledMapsCounter = 
-        reporter.getCounter(TaskCounter.SHUFFLED_MAPS);
+        inputContext.getCounters().findCounter(TaskCounter.SHUFFLED_MAPS);
     TezCounter reduceShuffleBytes =
-        reporter.getCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
+        inputContext.getCounters().findCounter(TaskCounter.REDUCE_SHUFFLE_BYTES);
     TezCounter failedShuffleCounter =
-        reporter.getCounter(TaskCounter.FAILED_SHUFFLE);
+        inputContext.getCounters().findCounter(TaskCounter.FAILED_SHUFFLE);
     TezCounter spilledRecordsCounter = 
-        reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+        inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     TezCounter reduceCombineInputCounter =
-        reporter.getCounter(TaskCounter.COMBINE_INPUT_RECORDS);
+        inputContext.getCounters().findCounter(TaskCounter.COMBINE_INPUT_RECORDS);
     TezCounter mergedMapOutputsCounter =
-        reporter.getCounter(TaskCounter.MERGED_MAP_OUTPUTS);
+        inputContext.getCounters().findCounter(TaskCounter.MERGED_MAP_OUTPUTS);
     
-    reduceStartId = new AtomicInteger( 
-        taskContext.getTaskAttemptId().getTaskID().getId()); 
+    reduceStartId = new AtomicInteger(inputContext.getTaskIndex());
     LOG.info("Shuffle assigned reduce start id: " + reduceStartId.get()
         + " with default reduce range: " + reduceRange.get());
 
-    scheduler = 
-      new ShuffleScheduler(this.conf, tasksInDegree,
-                                runningTaskContext.getStatus(), 
-                                this, copyPhase, 
-                                shuffledMapsCounter, 
-                                reduceShuffleBytes, 
-                                failedShuffleCounter);
-    merger = new MergeManager(this.taskContext.getTaskAttemptId(), 
-                                    this.conf, localFS, 
-                                    localDirAllocator, reporter, 
-                                    combineProcessor, 
-                                    spilledRecordsCounter, 
-                                    reduceCombineInputCounter, 
-                                    mergedMapOutputsCounter, 
-                                    this, mergePhase);
+    scheduler = new ShuffleScheduler(
+          this.inputContext,
+          this.conf,
+          this.numInputs,
+          this,
+          shuffledMapsCounter,
+          reduceShuffleBytes,
+          failedShuffleCounter);
+    eventHandler= new ShuffleInputEventHandler(
+          inputContext,
+          this,
+          scheduler);
+    merger = new MergeManager(
+          this.conf,
+          localFS,
+          localDirAllocator,
+          inputContext,
+          null, // TODO NEWTEZ Fix Combiner
+          spilledRecordsCounter,
+          reduceCombineInputCounter,
+          mergedMapOutputsCounter,
+          this);
   }
 
-  public TezRawKeyValueIterator run() throws IOException, InterruptedException {
-    // Scale the maximum events we fetch per RPC call to mitigate OOM issues
-    // on the ApplicationMaster when a thundering herd of reducers fetch events
-    // TODO: This should not be necessary after HADOOP-8942
-    int eventsPerReducer = Math.max(MIN_EVENTS_TO_FETCH,
-        MAX_RPC_OUTSTANDING_EVENTS / tasksInDegree);
-    int maxEventsToFetch = Math.min(MAX_EVENTS_TO_FETCH, eventsPerReducer);
+  public void handleEvents(List<Event> events) {
+    eventHandler.handleEvents(events);
+  }
+  
+  /**
+   * Indicates whether the Shuffle and Merge processing is complete.
+   * @return false if not complete, true if complete or if an error occurred.
+   */
+  public boolean isInputReady() {
+    if (runShuffleFuture == null) {
+      return false;
+    }
+    return runShuffleFuture.isDone();
+    //return scheduler.isDone() && merger.isMergeComplete();
+  }
 
-    // Start the map-completion events fetcher thread
-    final EventFetcher eventFetcher = 
-      new EventFetcher(taskContext.getTaskAttemptId(), reporter, scheduler, this,
-          maxEventsToFetch);
-    eventFetcher.start();
-    
-    // Start the map-output fetcher threads
-    final int numFetchers = 
-        conf.getInt(
-            TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
-            TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
-    Fetcher[] fetchers = new Fetcher[numFetchers];
-    for (int i=0; i < numFetchers; ++i) {
-      fetchers[i] = new Fetcher(conf, scheduler,
-          merger, reporter, metrics, this,
-          runningTaskContext.getJobTokenSecret());
-      fetchers[i].start();
+  /**
+   * Waits for the Shuffle and Merge to complete, and returns an iterator over the input.
+   * @return an iterator over the fetched input.
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public TezRawKeyValueIterator waitForInput() throws IOException, InterruptedException {
+    Preconditions.checkState(runShuffleFuture != null,
+        "waitForInput can only be called after run");
+    TezRawKeyValueIterator kvIter;
+    try {
+      kvIter = runShuffleFuture.get();
+    } catch (ExecutionException e) {
+      Throwable cause = e.getCause();
+      if (cause instanceof IOException) {
+        throw (IOException) cause;
+      } else if (cause instanceof InterruptedException) {
+        throw (InterruptedException) cause;
+      } else {
+        throw new TezUncheckedException(
+            "Unexpected exception type while running Shuffle and Merge", cause);
+      }
     }
-    
-    // Wait for shuffle to complete successfully
-    while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
-      reporter.progress();
+    return kvIter;
+  }
+
+  public void run() {
+    RunShuffleCallable runShuffle = new RunShuffleCallable();
+    runShuffleFuture = new FutureTask<TezRawKeyValueIterator>(runShuffle);
+    new Thread(runShuffleFuture, "ShuffleMergeRunner").start();
+  }
+  
+  private class RunShuffleCallable implements Callable<TezRawKeyValueIterator> {
+    @Override
+    public TezRawKeyValueIterator call() throws IOException, InterruptedException {
+      final int numFetchers = 
+          conf.getInt(
+              TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
+              TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
+      Fetcher[] fetchers = new Fetcher[numFetchers];
+      for (int i = 0; i < numFetchers; ++i) {
+        fetchers[i] = new Fetcher(conf, scheduler, merger, metrics, Shuffle.this, inputContext);
+        fetchers[i].start();
+      }
       
-      synchronized (this) {
-        if (throwable != null) {
-          throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                                 throwable);
+      while (!scheduler.waitUntilDone(PROGRESS_FREQUENCY)) {
+        synchronized (this) {
+          if (throwable != null) {
+            throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                   throwable);
+          }
         }
       }
-    }
-
-    // Stop the event-fetcher thread
-    eventFetcher.shutDown();
-    
-    // Stop the map-output fetcher threads
-    for (Fetcher fetcher : fetchers) {
-      fetcher.shutDown();
-    }
-    fetchers = null;
-    
-    // stop the scheduler
-    scheduler.close();
+      
+      // Stop the map-output fetcher threads
+      for (Fetcher fetcher : fetchers) {
+        fetcher.shutDown();
+      }
+      fetchers = null;
+      
+      // stop the scheduler
+      scheduler.close();
 
-    copyPhase.complete(); // copy is already complete
-    runningTaskContext.getStatus().setPhase(TezTaskStatus.Phase.SORT);
-    
-    runningTaskContext.statusUpdate();
-    
-    // Finish the on-going merges...
-    TezRawKeyValueIterator kvIter = null;
-    try {
-      kvIter = merger.close();
-    } catch (Throwable e) {
-      throw new ShuffleError("Error while doing final merge " , e);
-    }
 
-    // Sanity check
-    synchronized (this) {
-      if (throwable != null) {
-        throw new ShuffleError("error in shuffle in " + throwingThreadName,
-                               throwable);
+      // Finish the on-going merges...
+      TezRawKeyValueIterator kvIter = null;
+      try {
+        kvIter = merger.close();
+      } catch (Throwable e) {
+        throw new ShuffleError("Error while doing final merge " , e);
+      }
+      
+      // Sanity check
+      synchronized (Shuffle.this) {
+        if (throwable != null) {
+          throw new ShuffleError("error in shuffle in " + throwingThreadName,
+                                 throwable);
+        }
       }
+      return kvIter;
     }
-    
-    return kvIter;
   }
   
   public int getReduceStartId() {
@@ -229,19 +247,8 @@ public class Shuffle implements ExceptionReporter {
       super(msg, t);
     }
   }
-  
-  public void updateUserPayload(byte[] userPayload) throws IOException {
-    if(userPayload == null) {
-      return;
-    }
-    Configuration conf = TezUtils.createConfFromUserPayload(userPayload);
-    int reduceRange = conf.getInt(
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE,
-        TezJobConfig.TEZ_ENGINE_SHUFFLE_PARTITION_RANGE_DEFAULT);
-    setReduceRange(reduceRange);
-  }
-  
-  private void setReduceRange(int range) {
+
+  public void setPartitionRange(int range) {
     if (range == reduceRange.get()) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
index 34b26c4..850dbeb 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleClientMetrics.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.common.TezEngineUtils;
 
 class ShuffleClientMetrics implements Updater {
 
@@ -35,10 +35,10 @@ class ShuffleClientMetrics implements Updater {
   private int numThreadsBusy = 0;
   private final int numCopiers;
   
-  ShuffleClientMetrics(TezTaskAttemptID reduceId, Configuration jobConf, 
-      String user, String jobName) {
+  ShuffleClientMetrics(String dagName, String vertexName, int taskIndex, Configuration conf, 
+      String user) {
     this.numCopiers = 
-        jobConf.getInt(
+        conf.getInt(
             TezJobConfig.TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES, 
             TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_PARALLEL_COPIES);
 
@@ -46,12 +46,10 @@ class ShuffleClientMetrics implements Updater {
     this.shuffleMetrics = 
       MetricsUtil.createRecord(metricsContext, "shuffleInput");
     this.shuffleMetrics.setTag("user", user);
-    this.shuffleMetrics.setTag("jobName", jobName);
-    this.shuffleMetrics.setTag("jobId", 
-        reduceId.getTaskID().getVertexID().getDAGId().toString());
-    this.shuffleMetrics.setTag("taskId", reduceId.toString());
+    this.shuffleMetrics.setTag("dagName", dagName);
+    this.shuffleMetrics.setTag("taskId", TezEngineUtils.getTaskIdentifier(vertexName, taskIndex));
     this.shuffleMetrics.setTag("sessionId", 
-        jobConf.get(
+        conf.get(
             TezJobConfig.TEZ_ENGINE_METRICS_SESSION_ID, 
             TezJobConfig.DEFAULT_TEZ_ENGINE_METRICS_SESSION_ID));
     metricsContext.registerUpdater(this);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
new file mode 100644
index 0000000..012103f
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleInputEventHandler.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import java.net.URI;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.TezUncheckedException;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.DataMovementEventPayloadProto;
+import org.apache.tez.engine.common.shuffle.newimpl.ShuffleUserPayloads.InputInformationEventPayloadProto;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.DataMovementEvent;
+import org.apache.tez.engine.newapi.events.InputFailedEvent;
+import org.apache.tez.engine.newapi.events.InputInformationEvent;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+
+public class ShuffleInputEventHandler {
+  
+  private static final Log LOG = LogFactory.getLog(ShuffleInputEventHandler.class);
+
+  private final ShuffleScheduler scheduler;
+  private final TezInputContext inputContext;
+  private final Shuffle shuffle;
+
+  private int maxMapRuntime = 0;
+  private boolean shuffleRangeSet = false;
+  
+  public ShuffleInputEventHandler(TezInputContext inputContext,
+      Shuffle shuffle, ShuffleScheduler scheduler) {
+    this.inputContext = inputContext;
+    this.shuffle = shuffle;
+    this.scheduler = scheduler;
+  }
+
+  public void handleEvents(List<Event> events) {
+    for (Event event : events) {
+      handleEvent(event);
+    }
+  }
+  
+  
+  private void handleEvent(Event event) {
+    if (event instanceof InputInformationEvent) {
+      processInputInformationEvent((InputInformationEvent) event);
+    }
+    else if (event instanceof DataMovementEvent) {
+      processDataMovementEvent((DataMovementEvent) event);      
+    } else if (event instanceof InputFailedEvent) {
+      processTaskFailedEvent((InputFailedEvent) event);
+    }
+  }
+
+  private void processInputInformationEvent(InputInformationEvent iiEvent) {
+    InputInformationEventPayloadProto inputInfoPayload;
+    try {
+      inputInfoPayload = InputInformationEventPayloadProto.parseFrom(iiEvent.getUserPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse InputInformationEvent payload", e);
+    }
+    int partitionRange = inputInfoPayload.getPartitionRange();
+    shuffle.setPartitionRange(partitionRange);
+    this.shuffleRangeSet = true;
+  }
+
+  private void processDataMovementEvent(DataMovementEvent dmEvent) {
+    Preconditions.checkState(shuffleRangeSet == true, "Shuffle Range must be set before a DataMovementEvent is processed");
+    DataMovementEventPayloadProto shufflePayload;
+    try {
+      shufflePayload = DataMovementEventPayloadProto.parseFrom(dmEvent.getUserPayload());
+    } catch (InvalidProtocolBufferException e) {
+      throw new TezUncheckedException("Unable to parse DataMovementEvent payload", e);
+    } 
+    int partitionId = dmEvent.getSourceIndex();
+    URI baseUri = getBaseURI(shufflePayload.getHost(), shufflePayload.getPort(), partitionId);
+
+    TaskAttemptIdentifier srcAttemptIdentifier = new TaskAttemptIdentifier(dmEvent.getTargetIndex(), dmEvent.getVersion(), shufflePayload.getPathComponent());
+    scheduler.addKnownMapOutput(shufflePayload.getHost(), partitionId, baseUri.toString(), srcAttemptIdentifier);
+    
+    // TODO NEWTEZ See if this duration hack can be removed.
+    int duration = shufflePayload.getRunDuration();
+    if (duration > maxMapRuntime) {
+      maxMapRuntime = duration;
+      scheduler.informMaxMapRunTime(maxMapRuntime);
+    }
+  }
+  
+  private void processTaskFailedEvent(InputFailedEvent ifEvent) {
+    TaskAttemptIdentifier taIdentifier = new TaskAttemptIdentifier(ifEvent.getSourceIndex(), ifEvent.getVersion());
+    scheduler.obsoleteMapOutput(taIdentifier);
+    LOG.info("Obsoleting output of src-task: " + taIdentifier);
+  }
+
+  // TODO NEWTEZ Handle encrypted shuffle
+  private URI getBaseURI(String host, int port, int partitionId) {
+    StringBuilder sb = new StringBuilder("http://");
+    sb.append(host);
+    sb.append(":");
+    sb.append(String.valueOf(port));
+    sb.append("/");
+    
+    sb.append("mapOutput?job=");
+    // Required to use the existing ShuffleHandler
+    sb.append(inputContext.getApplicationId().toString().replace("application", "job"));
+    
+    sb.append("&reduce=");
+    sb.append(partitionId);
+    sb.append("&map=");
+    URI u = URI.create(sb.toString());
+    return u;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
index 6bd18ef..964533d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/ShuffleScheduler.java
@@ -27,6 +27,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
@@ -36,12 +38,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.util.Progress;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskStatus;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.TezEngineUtils;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newapi.events.InputReadErrorEvent;
+
+import com.google.common.collect.Lists;
 
 class ShuffleScheduler {
   static ThreadLocal<Long> shuffleStart = new ThreadLocal<Long>() {
@@ -55,24 +59,26 @@ class ShuffleScheduler {
   private static final long INITIAL_PENALTY = 10000;
   private static final float PENALTY_GROWTH_RATE = 1.3f;
   
-  private final Map<TezTaskID, MutableInt> finishedMaps;
-  private final int tasksInDegree;
+  // TODO NEWTEZ May need to be a string if attempting to fetch from multiple inputs.
+  private final Map<Integer, MutableInt> finishedMaps;
+  private final int numInputs;
   private int remainingMaps;
-  private Map<String, MapHost> mapLocations = new HashMap<String, MapHost>();
+  private Map<TaskAttemptIdentifier, MapHost> mapLocations = new HashMap<TaskAttemptIdentifier, MapHost>();
+  //TODO NEWTEZ Clean this and other maps at some point
+  private ConcurrentMap<String, TaskAttemptIdentifier> pathToIdentifierMap = new ConcurrentHashMap<String, TaskAttemptIdentifier>(); 
   private Set<MapHost> pendingHosts = new HashSet<MapHost>();
-  private Set<TezTaskAttemptID> obsoleteMaps = new HashSet<TezTaskAttemptID>();
+  private Set<TaskAttemptIdentifier> obsoleteMaps = new HashSet<TaskAttemptIdentifier>();
   
   private final Random random = new Random(System.currentTimeMillis());
   private final DelayQueue<Penalty> penalties = new DelayQueue<Penalty>();
   private final Referee referee = new Referee();
-  private final Map<TezTaskAttemptID,IntWritable> failureCounts =
-    new HashMap<TezTaskAttemptID,IntWritable>();
+  private final Map<TaskAttemptIdentifier, IntWritable> failureCounts =
+    new HashMap<TaskAttemptIdentifier,IntWritable>(); 
   private final Map<String,IntWritable> hostFailures = 
     new HashMap<String,IntWritable>();
-  private final TezTaskStatus status;
+  private final TezInputContext inputContext;
   private final Shuffle shuffle;
   private final int abortFailureLimit;
-  private final Progress progress;
   private final TezCounter shuffledMapsCounter;
   private final TezCounter reduceShuffleBytes;
   private final TezCounter failedShuffleCounter;
@@ -89,26 +95,25 @@ class ShuffleScheduler {
 
   private boolean reportReadErrorImmediately = true;
   
-  public ShuffleScheduler(Configuration conf,
+  public ShuffleScheduler(TezInputContext inputContext,
+                          Configuration conf,
                           int tasksInDegree,
-                          TezTaskStatus status,
                           Shuffle shuffle,
-                          Progress progress,
                           TezCounter shuffledMapsCounter,
                           TezCounter reduceShuffleBytes,
                           TezCounter failedShuffleCounter) {
-    this.tasksInDegree = tasksInDegree;
+    this.inputContext = inputContext;
+    this.numInputs = tasksInDegree;
     abortFailureLimit = Math.max(30, tasksInDegree / 10);
     remainingMaps = tasksInDegree;
-    finishedMaps = new HashMap<TezTaskID, MutableInt>(remainingMaps);
+  //TODO NEWTEZ May need to be a string or a more usable construct if attempting to fetch from multiple inputs. Define a taskId / taskAttemptId pair
+    finishedMaps = new HashMap<Integer, MutableInt>(remainingMaps);
     this.shuffle = shuffle;
-    this.status = status;
-    this.progress = progress;
     this.shuffledMapsCounter = shuffledMapsCounter;
     this.reduceShuffleBytes = reduceShuffleBytes;
     this.failedShuffleCounter = failedShuffleCounter;
     this.startTime = System.currentTimeMillis();
-    lastProgressTime = startTime;
+    this.lastProgressTime = startTime;
     referee.start();
     this.maxFailedUniqueFetches = Math.min(tasksInDegree,
         this.maxFailedUniqueFetches);
@@ -122,19 +127,19 @@ class ShuffleScheduler {
             TezJobConfig.DEFAULT_TEZ_ENGINE_SHUFFLE_NOTIFY_READERROR);
   }
 
-  public synchronized void copySucceeded(TezTaskAttemptID mapId, 
+  public synchronized void copySucceeded(TaskAttemptIdentifier srcAttemptIdentifier, 
                                          MapHost host,
                                          long bytes,
-                                         long millis,
+                                         long milis,
                                          MapOutput output
                                          ) throws IOException {
-    failureCounts.remove(mapId);
+    String taskIdentifier = TezEngineUtils.getTaskAttemptIdentifier(srcAttemptIdentifier.getTaskIndex(), srcAttemptIdentifier.getAttemptNumber());
+    failureCounts.remove(taskIdentifier);
     hostFailures.remove(host.getHostName());
-    TezTaskID taskId = mapId.getTaskID();
     
-    if (!isFinishedTaskTrue(taskId)) {
+    if (!isFinishedTaskTrue(srcAttemptIdentifier.getTaskIndex())) {
       output.commit();
-      if(incrementTaskCopyAndCheckCompletion(taskId)) {
+      if(incrementTaskCopyAndCheckCompletion(srcAttemptIdentifier.getTaskIndex())) {
         shuffledMapsCounter.increment(1);
         if (--remainingMaps == 0) {
           notifyAll();
@@ -142,38 +147,40 @@ class ShuffleScheduler {
       }
 
       // update the status
+      lastProgressTime = System.currentTimeMillis();
       totalBytesShuffledTillNow += bytes;
-      updateStatus();
+      logProgress();
       reduceShuffleBytes.increment(bytes);
-      lastProgressTime = System.currentTimeMillis();
-      LOG.debug("map " + mapId + " done " + status.getStateString());
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("src task: "
+            + TezEngineUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(), srcAttemptIdentifier.getTaskIndex(),
+                srcAttemptIdentifier.getAttemptNumber()) + " done");
+      }
     }
   }
-  
-  private void updateStatus() {
+
+  private void logProgress() {
     float mbs = (float) totalBytesShuffledTillNow / (1024 * 1024);
-    int mapsDone = tasksInDegree - remainingMaps;
+    int mapsDone = numInputs - remainingMaps;
     long secsSinceStart = (System.currentTimeMillis() - startTime) / 1000 + 1;
 
     float transferRate = mbs / secsSinceStart;
-    progress.set((float) mapsDone / tasksInDegree);
-    String statusString = mapsDone + " / " + tasksInDegree + " copied.";
-    status.setStateString(statusString);
-
-    progress.setStatus("copy(" + mapsDone + " of " + tasksInDegree + " at "
+    LOG.info("copy(" + mapsDone + " of " + numInputs + " at "
         + mbpsFormat.format(transferRate) + " MB/s)");
   }
 
-  public synchronized void copyFailed(TezTaskAttemptID mapId, MapHost host,
+  public synchronized void copyFailed(TaskAttemptIdentifier srcAttempt,
+                                      MapHost host,
                                       boolean readError) {
     host.penalize();
     int failures = 1;
-    if (failureCounts.containsKey(mapId)) {
-      IntWritable x = failureCounts.get(mapId);
+    if (failureCounts.containsKey(srcAttempt)) {
+      IntWritable x = failureCounts.get(srcAttempt);
       x.set(x.get() + 1);
       failures = x.get();
     } else {
-      failureCounts.put(mapId, new IntWritable(1));      
+      failureCounts.put(srcAttempt, new IntWritable(1));      
     }
     String hostname = host.getHostName();
     if (hostFailures.containsKey(hostname)) {
@@ -184,13 +191,17 @@ class ShuffleScheduler {
     }
     if (failures >= abortFailureLimit) {
       try {
-        throw new IOException(failures + " failures downloading " + mapId);
+        throw new IOException(failures
+            + " failures downloading "
+            + TezEngineUtils.getTaskAttemptIdentifier(
+                inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+                srcAttempt.getAttemptNumber()));
       } catch (IOException ie) {
         shuffle.reportException(ie);
       }
     }
     
-    checkAndInformJobTracker(failures, mapId, readError);
+    checkAndInformJobTracker(failures, srcAttempt, readError);
 
     checkReducerHealth();
     
@@ -206,11 +217,23 @@ class ShuffleScheduler {
   // after every read error, if 'reportReadErrorImmediately' is true or
   // after every 'maxFetchFailuresBeforeReporting' failures
   private void checkAndInformJobTracker(
-      int failures, TezTaskAttemptID mapId, boolean readError) {
+      int failures, TaskAttemptIdentifier srcAttempt, boolean readError) {
     if ((reportReadErrorImmediately && readError)
         || ((failures % maxFetchFailuresBeforeReporting) == 0)) {
-      LOG.info("Reporting fetch failure for " + mapId + " to jobtracker.");
-      status.addFailedDependency(mapId);
+      LOG.info("Reporting fetch failure for "
+          + TezEngineUtils.getTaskAttemptIdentifier(
+              inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+              srcAttempt.getAttemptNumber()) + " to jobtracker.");
+
+      List<Event> failedEvents = Lists.newArrayListWithCapacity(1);
+      failedEvents.add(new InputReadErrorEvent("Fetch failure for "
+          + TezEngineUtils.getTaskAttemptIdentifier(
+              inputContext.getSourceVertexName(), srcAttempt.getTaskIndex(),
+              srcAttempt.getAttemptNumber()) + " to jobtracker.", srcAttempt
+          .getTaskIndex(), srcAttempt.getAttemptNumber()));
+
+      inputContext.sendEvents(failedEvents);      
+      //status.addFailedDependency(mapId);
     }
   }
     
@@ -220,7 +243,7 @@ class ShuffleScheduler {
     final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
 
     long totalFailures = failedShuffleCounter.getValue();
-    int doneMaps = tasksInDegree - remainingMaps;
+    int doneMaps = numInputs - remainingMaps;
     
     boolean reducerHealthy =
       (((float)totalFailures / (totalFailures + doneMaps))
@@ -228,7 +251,7 @@ class ShuffleScheduler {
     
     // check if the reducer has progressed enough
     boolean reducerProgressedEnough =
-      (((float)doneMaps / tasksInDegree)
+      (((float)doneMaps / numInputs)
           >= MIN_REQUIRED_PROGRESS_PERCENT);
 
     // check if the reducer is stalled for a long time
@@ -252,7 +275,7 @@ class ShuffleScheduler {
 
     // kill if not healthy and has insufficient progress
     if ((failureCounts.size() >= maxFailedUniqueFetches ||
-        failureCounts.size() == (tasksInDegree - doneMaps))
+        failureCounts.size() == (numInputs - doneMaps))
         && !reducerHealthy
         && (!reducerProgressedEnough || reducerStalled)) {
       LOG.fatal("Shuffle failed with too many fetch failures " +
@@ -263,28 +286,29 @@ class ShuffleScheduler {
 
   }
   
-  public synchronized void tipFailed(TezTaskID taskId) {
-    if (!isFinishedTaskTrue(taskId)) {
-      setFinishedTaskTrue(taskId);
+  public synchronized void tipFailed(int srcTaskIndex) {
+    if (!isFinishedTaskTrue(srcTaskIndex)) {
+      setFinishedTaskTrue(srcTaskIndex);
       if (--remainingMaps == 0) {
         notifyAll();
       }
-      updateStatus();
+      logProgress();
     }
   }
   
   public synchronized void addKnownMapOutput(String hostName,
                                              int partitionId,
                                              String hostUrl,
-                                             TezTaskAttemptID mapId) {
+                                             TaskAttemptIdentifier srcAttempt) {
     String identifier = MapHost.createIdentifier(hostName, partitionId);
     MapHost host = mapLocations.get(identifier);
     if (host == null) {
       host = new MapHost(partitionId, hostName, hostUrl);
       assert identifier.equals(host.getIdentifier());
-      mapLocations.put(identifier, host);
+      mapLocations.put(srcAttempt, host);
     }
-    host.addKnownMap(mapId);
+    host.addKnownMap(srcAttempt);
+    pathToIdentifierMap.put(srcAttempt.getPathComponent(), srcAttempt);
 
     // Mark the host as pending
     if (host.getState() == MapHost.State.PENDING) {
@@ -293,13 +317,14 @@ class ShuffleScheduler {
     }
   }
   
-  public synchronized void obsoleteMapOutput(TezTaskAttemptID mapId) {
-    obsoleteMaps.add(mapId);
+  public synchronized void obsoleteMapOutput(TaskAttemptIdentifier srcAttempt) {
+    // The incoming srcAttempt does not contain a path component.
+    obsoleteMaps.add(srcAttempt);
   }
   
-  public synchronized void putBackKnownMapOutput(MapHost host, 
-                                                 TezTaskAttemptID mapId) {
-    host.addKnownMap(mapId);
+  public synchronized void putBackKnownMapOutput(MapHost host,
+                                                 TaskAttemptIdentifier srcAttempt) {
+    host.addKnownMap(srcAttempt);
   }
 
   public synchronized MapHost getHost() throws InterruptedException {
@@ -324,16 +349,20 @@ class ShuffleScheduler {
       return host;
   }
   
-  public synchronized List<TezTaskAttemptID> getMapsForHost(MapHost host) {
-    List<TezTaskAttemptID> list = host.getAndClearKnownMaps();
-    Iterator<TezTaskAttemptID> itr = list.iterator();
-    List<TezTaskAttemptID> result = new ArrayList<TezTaskAttemptID>();
+  public TaskAttemptIdentifier getIdentifierForPathComponent(String pathComponent) {
+    return pathToIdentifierMap.get(pathComponent);
+  }
+  
+  public synchronized List<TaskAttemptIdentifier> getMapsForHost(MapHost host) {
+    List<TaskAttemptIdentifier> list = host.getAndClearKnownMaps();
+    Iterator<TaskAttemptIdentifier> itr = list.iterator();
+    List<TaskAttemptIdentifier> result = new ArrayList<TaskAttemptIdentifier>();
     int includedMaps = 0;
     int totalSize = list.size();
     // find the maps that we still need, up to the limit
     while (itr.hasNext()) {
-      TezTaskAttemptID id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskID())) {
+      TaskAttemptIdentifier id = itr.next();
+      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
         result.add(id);
         if (++includedMaps >= MAX_MAPS_AT_ONCE) {
           break;
@@ -342,8 +371,8 @@ class ShuffleScheduler {
     }
     // put back the maps left after the limit
     while (itr.hasNext()) {
-      TezTaskAttemptID id = itr.next();
-      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskID())) {
+      TaskAttemptIdentifier id = itr.next();
+      if (!obsoleteMaps.contains(id) && !isFinishedTaskTrue(id.getTaskIndex())) {
         host.addKnownMap(id);
       }
     }
@@ -367,8 +396,17 @@ class ShuffleScheduler {
     mapLocations.clear();
     obsoleteMaps.clear();
     pendingHosts.clear();
+    pathToIdentifierMap.clear();
   }
-  
+
+  /**
+   * Utility method to check if the Shuffle data fetch is complete.
+   * @return
+   */
+  public synchronized boolean isDone() {
+    return remainingMaps == 0;
+  }
+
   /**
    * Wait until the shuffle finishes or until the timeout.
    * @param millis maximum wait time
@@ -448,27 +486,27 @@ class ShuffleScheduler {
     }
   }
   
-  void setFinishedTaskTrue(TezTaskID taskId) {
+  void setFinishedTaskTrue(int srcTaskIndex) {
     synchronized(finishedMaps) {
-      finishedMaps.put(taskId, new MutableInt(shuffle.getReduceRange()));
+      finishedMaps.put(srcTaskIndex, new MutableInt(shuffle.getReduceRange()));
     }
   }
   
-  boolean incrementTaskCopyAndCheckCompletion(TezTaskID mapTaskId) {
+  boolean incrementTaskCopyAndCheckCompletion(int srcTaskIndex) {
     synchronized(finishedMaps) {
-      MutableInt result = finishedMaps.get(mapTaskId);
+      MutableInt result = finishedMaps.get(srcTaskIndex);
       if(result == null) {
         result = new MutableInt(0);
-        finishedMaps.put(mapTaskId, result);
+        finishedMaps.put(srcTaskIndex, result);
       }
       result.increment();
-      return isFinishedTaskTrue(mapTaskId);
+      return isFinishedTaskTrue(srcTaskIndex);
     }
   }
   
-  boolean isFinishedTaskTrue(TezTaskID taskId) {
+  boolean isFinishedTaskTrue(int srcTaskIndex) {
     synchronized (finishedMaps) {
-      MutableInt result = finishedMaps.get(taskId);
+      MutableInt result = finishedMaps.get(srcTaskIndex);
       if(result == null) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
new file mode 100644
index 0000000..f77166d
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/TaskAttemptIdentifier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.shuffle.impl;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+
+/**
+ * Container for a task number and an attempt number for the task.
+ */
+@Private
+public class TaskAttemptIdentifier {
+
+  private final int taskIndex;
+  private final int attemptNumber;
+  private String pathComponent;
+  
+  public TaskAttemptIdentifier(int taskIndex, int attemptNumber) {
+    this.taskIndex = taskIndex;
+    this.attemptNumber = attemptNumber;
+  }
+  
+  public TaskAttemptIdentifier(int taskIndex, int attemptNumber, String pathComponent) {
+    this.taskIndex = taskIndex;
+    this.attemptNumber = attemptNumber;
+    this.pathComponent = pathComponent;
+  }
+
+  public int getTaskIndex() {
+    return taskIndex;
+  }
+
+  public int getAttemptNumber() {
+    return attemptNumber;
+  }
+  
+  public String getPathComponent() {
+    return pathComponent;
+  }
+
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + attemptNumber;
+    result = prime * result
+        + ((pathComponent == null) ? 0 : pathComponent.hashCode());
+    result = prime * result + taskIndex;
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    TaskAttemptIdentifier other = (TaskAttemptIdentifier) obj;
+    if (attemptNumber != other.attemptNumber)
+      return false;
+    if (pathComponent == null) {
+      if (other.pathComponent != null)
+        return false;
+    } else if (!pathComponent.equals(other.pathComponent))
+      return false;
+    if (taskIndex != other.taskIndex)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "TaskAttemptIdentifier [taskIndex=" + taskIndex + ", attemptNumber="
+        + attemptNumber + ", pathComponent=" + pathComponent + "]";
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
index 0befaa8..f61670e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/server/ShuffleHandler.java
@@ -63,13 +63,14 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
 import org.apache.hadoop.yarn.server.api.AuxiliaryService;
-import org.apache.tez.common.RunningTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.engine.common.security.JobTokenIdentifier;
 import org.apache.tez.engine.common.security.JobTokenSecretManager;
 import org.apache.tez.engine.common.security.SecureShuffleUtils;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.engine.common.sort.impl.ExternalSorter;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 import org.jboss.netty.bootstrap.ServerBootstrap;
 import org.jboss.netty.buffer.ChannelBuffers;
 import org.jboss.netty.channel.Channel;
@@ -245,9 +246,9 @@ public class ShuffleHandler extends AuxiliaryService {
     userRsrc.remove(appId.toString());
   }
 
-  public synchronized void init(Configuration conf, RunningTaskContext task) {
+  public void initialize(TezOutputContext outputContext, Configuration conf) throws IOException {
     this.init(new Configuration(conf));
-    tokenSecret = task.getJobTokenSecret();
+    tokenSecret = ShuffleUtils.getJobTokenSecretFromTokenBytes(outputContext.getServiceConsumerMetaData(MAPREDUCE_SHUFFLE_SERVICEID));
   }
 
   @Override
@@ -441,14 +442,16 @@ public class ShuffleHandler extends AuxiliaryService {
       for (String mapId : mapIds) {
         try {
           // TODO: Error handling - validate mapId via TezTaskAttemptId.forName
-          if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
-            String errorMessage =
-                "Illegal shuffle request mapId: " + mapId
-                    + " while actual mapId is " + sorter.getTaskAttemptId(); 
-            LOG.warn(errorMessage);
-            sendError(ctx, errorMessage, BAD_REQUEST);
-            return;
-          }
+          
+          // TODO NEWTEZ Fix this. TaskAttemptId is no longer valid. mapId validation will not work anymore.
+//          if (!mapId.equals(sorter.getTaskAttemptId().toString())) {
+//            String errorMessage =
+//                "Illegal shuffle request mapId: " + mapId
+//                    + " while actual mapId is " + sorter.getTaskAttemptId(); 
+//            LOG.warn(errorMessage);
+//            sendError(ctx, errorMessage, BAD_REQUEST);
+//            return;
+//          }
 
           lastMap =
             sendMapOutput(ctx, ch, userRsrc.get(jobId), jobId, mapId, reduceId);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index b90682e..4df1c01 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -21,6 +21,7 @@ package org.apache.tez.engine.common.sort.impl;
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.lang.reflect.Constructor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,47 +37,41 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.io.serializer.Serializer;
 import org.apache.hadoop.util.IndexedSorter;
-import org.apache.hadoop.util.Progress;
+import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.QuickSort;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.Constants;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
+import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.api.Partitioner;
 import org.apache.tez.engine.api.Processor;
 import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.combine.CombineInput;
-import org.apache.tez.engine.common.combine.CombineOutput;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.hadoop.compat.NullProgressable;
+import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public abstract class ExternalSorter {
 
   private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
 
-  public abstract void close() throws IOException, InterruptedException;
+  public abstract void close() throws IOException;
 
-  public abstract void flush() throws IOException, InterruptedException;
+  public abstract void flush() throws IOException;
 
-  public abstract void write(Object key, Object value) throws IOException,
-      InterruptedException;
+  public abstract void write(Object key, Object value) throws IOException;
 
+  protected Progressable nullProgressable = new NullProgressable();
+  protected TezOutputContext outputContext;
   protected Processor combineProcessor;
   protected Partitioner partitioner;
-  protected TezEngineTaskContext task;
-  protected RunningTaskContext runningTaskContext;
-  protected Configuration job;
+  protected Configuration conf;
   protected FileSystem rfs;
   protected TezTaskOutput mapOutputFile;
   protected int partitions;
@@ -92,69 +87,68 @@ public abstract class ExternalSorter {
   // Compression for map-outputs
   protected CompressionCodec codec;
 
+  // TODO NEWTEZ Setup CombineProcessor
+  // TODO NEWTEZ Setup Partitioner in SimpleOutput
+
   // Counters
+  // TODO TEZ Rename all counter variables [Mapping of counter to MR for compatibility in the MR layer]
   protected TezCounter mapOutputByteCounter;
   protected TezCounter mapOutputRecordCounter;
   protected TezCounter fileOutputByteCounter;
   protected TezCounter spilledRecordsCounter;
-  protected Progress sortPhase;
 
-  public void initialize(Configuration conf, Master master)
-      throws IOException, InterruptedException {
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+    this.outputContext = outputContext;
+    this.conf = conf;
+    this.partitions = numOutputs;
 
-    this.job = conf;
-    LOG.info("TEZ_ENGINE_TASK_ATTEMPT_ID: " +
-        job.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
-
-    partitions = task.getOutputSpecList().get(0).getNumOutputs();
-//    partitions =
-//        job.getInt(
-//            TezJobConfig.TEZ_ENGINE_TASK_OUTDEGREE,
-//            TezJobConfig.DEFAULT_TEZ_ENGINE_TASK_OUTDEGREE);
-    rfs = ((LocalFileSystem)FileSystem.getLocal(job)).getRaw();
+    rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
     // sorter
-    sorter = ReflectionUtils.newInstance(job.getClass(
+    sorter = ReflectionUtils.newInstance(this.conf.getClass(
         TezJobConfig.TEZ_ENGINE_INTERNAL_SORTER_CLASS, QuickSort.class,
-        IndexedSorter.class), job);
+        IndexedSorter.class), this.conf);
 
-    comparator = ConfigUtils.getIntermediateOutputKeyComparator(job);
+    comparator = ConfigUtils.getIntermediateOutputKeyComparator(this.conf);
 
     // k/v serialization
-    keyClass = ConfigUtils.getIntermediateOutputKeyClass(job);
-    valClass = ConfigUtils.getIntermediateOutputValueClass(job);
-    serializationFactory = new SerializationFactory(job);
+    keyClass = ConfigUtils.getIntermediateOutputKeyClass(this.conf);
+    valClass = ConfigUtils.getIntermediateOutputValueClass(this.conf);
+    serializationFactory = new SerializationFactory(this.conf);
     keySerializer = serializationFactory.getSerializer(keyClass);
     valSerializer = serializationFactory.getSerializer(valClass);
 
     //    counters
     mapOutputByteCounter =
-        runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_BYTES);
+        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_BYTES);
     mapOutputRecordCounter =
-      runningTaskContext.getTaskReporter().getCounter(TaskCounter.MAP_OUTPUT_RECORDS);
+        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_RECORDS);
     fileOutputByteCounter =
-        runningTaskContext.getTaskReporter().
-            getCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
+        outputContext.getCounters().findCounter(TaskCounter.MAP_OUTPUT_MATERIALIZED_BYTES);
     spilledRecordsCounter =
-        runningTaskContext.getTaskReporter().getCounter(TaskCounter.SPILLED_RECORDS);
+        outputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     // compression
-    if (ConfigUtils.shouldCompressIntermediateOutput(job)) {
+    if (ConfigUtils.shouldCompressIntermediateOutput(this.conf)) {
       Class<? extends CompressionCodec> codecClass =
-          ConfigUtils.getIntermediateOutputCompressorClass(job, DefaultCodec.class);
-      codec = ReflectionUtils.newInstance(codecClass, job);
+          ConfigUtils.getIntermediateOutputCompressorClass(this.conf, DefaultCodec.class);
+      codec = ReflectionUtils.newInstance(codecClass, this.conf);
     } else {
       codec = null;
     }
 
     // Task outputs
-    mapOutputFile =
-        (TezTaskOutput) ReflectionUtils.newInstance(
-            conf.getClass(
-                Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
-                TezTaskOutputFiles.class), conf);
-
-    // sortPhase
-    sortPhase  = runningTaskContext.getProgress().addPhase("sort");
+    mapOutputFile = instantiateTaskOutputManager(this.conf, outputContext);
+  }
+
+  // TODO NEWTEZ Add an interface (! Processor) for CombineProcessor, which MR tasks can initialize and set.
+  // Alternately add a config key with a classname, which is easy to initialize.
+  public void setCombiner(Processor combineProcessor) {
+    this.combineProcessor = combineProcessor;
+  }
+  
+  // TODO NEWTEZ Setup a config value for the Partitioner class, from where it can be initialized.
+  public void setPartitioner(Partitioner partitioner) {
+    this.partitioner = partitioner;
   }
 
   /**
@@ -168,42 +162,33 @@ public abstract class ExternalSorter {
     }
   }
 
-  public void setTask(RunningTaskContext task) {
-    this.runningTaskContext = task;
-    this.combineProcessor = task.getCombineProcessor();
-    this.partitioner = task.getPartitioner();
-  }
-
-  public TezTaskAttemptID getTaskAttemptId() {
-    return task.getTaskAttemptId();
-  }
-
   @Private
   public TezTaskOutput getMapOutput() {
     return mapOutputFile;
   }
 
   protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
-      Writer writer) throws IOException, InterruptedException {
-
-    CombineInput combineIn = new CombineInput(kvIter);
-    combineIn.initialize(job, runningTaskContext.getTaskReporter());
-
-    CombineOutput combineOut = new CombineOutput(writer);
-    combineOut.initialize(job, runningTaskContext.getTaskReporter());
-
-    try {
-      combineProcessor.process(new Input[] {combineIn},
-          new Output[] {combineOut});
-    } catch (IOException ioe) {
-      try {
-        combineProcessor.close();
-      } catch (IOException ignored) {}
-
-      // Do not close output here as the sorter should close the combine output
-
-      throw ioe;
-    }
+      Writer writer) throws IOException {
+
+    // TODO NEWTEZ Fix Combiner.
+//    CombineInput combineIn = new CombineInput(kvIter);
+//    combineIn.initialize(job, runningTaskContext.getTaskReporter());
+//
+//    CombineOutput combineOut = new CombineOutput(writer);
+//    combineOut.initialize(job, runningTaskContext.getTaskReporter());
+//
+//    try {
+//      combineProcessor.process(new Input[] {combineIn},
+//          new Output[] {combineOut});
+//    } catch (IOException ioe) {
+//      try {
+//        combineProcessor.close();
+//      } catch (IOException ignored) {}
+//
+//      // Do not close output here as the sorter should close the combine output
+//
+//      throw ioe;
+//    }
 
   }
 
@@ -228,10 +213,6 @@ public abstract class ExternalSorter {
     }
   }
 
-  public ExternalSorter(TezEngineTaskContext tezEngineTask) {
-    this.task = tezEngineTask;
-  }
-
   public InputStream getSortedStream(int partition) {
     throw new UnsupportedOperationException("getSortedStream isn't supported!");
   }
@@ -243,4 +224,23 @@ public abstract class ExternalSorter {
   public OutputContext getOutputContext() {
     return null;
   }
+  
+  
+
+  private TezTaskOutput instantiateTaskOutputManager(Configuration conf, TezOutputContext outputContext) {
+    Class<?> clazz = conf.getClass(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+        TezTaskOutputFiles.class);
+    try {
+      Constructor<?> ctor = clazz.getConstructor(Configuration.class, String.class);
+      ctor.setAccessible(true);
+      TezTaskOutput instance = (TezTaskOutput) ctor.newInstance(conf, outputContext.getUniqueIdentifier());
+      return instance;
+    } catch (Exception e) {
+      throw new TezUncheckedException(
+          "Unable to instantiate configured TezOutputFileManager: "
+              + conf.get(Constants.TEZ_ENGINE_TASK_OUTPUT_MANAGER,
+                  TezTaskOutputFiles.class.getName()), e);
+    }
+  }
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
index 00b8958..3b39900 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/IFileOutputStream.java
@@ -22,8 +22,6 @@ import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.util.DataChecksum;


[2/4] TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth)

Posted by ss...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
index 4da5ffa..bafbd4d 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/PipelinedSorter.java
@@ -43,18 +43,15 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.IndexedSorter;
 import org.apache.hadoop.util.Progress;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.common.ConfigUtils;
-import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.records.OutputContext;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.records.OutputContext;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
-public class PipelinedSorter extends ExternalSorter implements SortingOutput {
+public class PipelinedSorter extends ExternalSorter {
   
   private static final Log LOG = LogFactory.getLog(PipelinedSorter.class);
   
@@ -92,32 +89,21 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
   private int totalIndexCacheMemory;
   private int indexCacheMemoryLimit;
 
-
-  public PipelinedSorter(TezEngineTaskContext task) throws IOException {
-    super(task);
-  }
-
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException {
-    
-    if (task == null) {
-      LOG.info("Bailing!", new IOException());
-      return;
-    }
-    super.initialize(conf, master);
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+    super.initialize(outputContext, conf, numOutputs);
     
     partitionBits = bitcount(partitions)+1;
    
     //sanity checks
     final float spillper =
-      job.getFloat(
+      this.conf.getFloat(
           TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT, 
           TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
     final int sortmb = 
-        job.getInt(
+        this.conf.getInt(
             TezJobConfig.TEZ_ENGINE_IO_SORT_MB, 
             TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
-    indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+    indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
                                        TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
     if (spillper > (float)1.0 || spillper <= (float)0.0) {
       throw new IOException("Invalid \"" + TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT +
@@ -137,7 +123,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
     span = new SortSpan(largeBuffer, 1024*1024, 16);
     merger = new SpanMerger(comparator);
     final int sortThreads = 
-            job.getInt(
+            this.conf.getInt(
                 TezJobConfig.TEZ_ENGINE_SORT_THREADS, 
                 TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_THREADS);
     sortmaster = Executors.newFixedThreadPool(sortThreads);
@@ -151,7 +137,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
     }    
     valSerializer.open(span.out);
     keySerializer.open(span.out);
-    minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+    minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
   }
 
   private int bitcount(int n) {
@@ -193,8 +179,9 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
     keySerializer.open(span.out);
   }
 
+  @Override
   public void write(Object key, Object value) 
-      throws IOException, InterruptedException {
+      throws IOException {
     collect(
         key, value, partitioner.getPartition(key, value, partitions));
   }
@@ -206,7 +193,6 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
    */
   synchronized void collect(Object key, Object value, final int partition
                                    ) throws IOException {
-    runningTaskContext.getTaskReporter().progress();
     if (key.getClass() != keyClass) {
       throw new IOException("Type mismatch in key from map: expected "
                             + keyClass.getName() + ", received "
@@ -262,7 +248,6 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
     }
     mapOutputRecordCounter.increment(1);
     mapOutputByteCounter.increment(valend - keystart);
-    runningTaskContext.getTaskReporter().progress();
   }
 
   public void spill() throws IOException { 
@@ -282,7 +267,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
         //write merged output to disk
         long segmentStart = out.getPos();
         Writer writer =
-          new Writer(job, out, keyClass, valClass, codec,
+          new Writer(conf, out, keyClass, valClass, codec,
               spilledRecordsCounter);
         writer.setRLE(merger.needsRLE());
         if (combineProcessor == null) {
@@ -308,7 +293,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
         mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
             * MAP_OUTPUT_INDEX_RECORD_LENGTH);
       // TODO: cache
-      spillRec.writeToFile(indexFilename, job);
+      spillRec.writeToFile(indexFilename, conf);
       ++numSpills;
     } catch(InterruptedException ie) {
       // TODO:the combiner has been interrupted
@@ -318,8 +303,8 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
   }
 
   @Override
-  public void flush() throws IOException, InterruptedException {
-    final TezTaskAttemptID mapId = task.getTaskAttemptId();
+  public void flush() throws IOException {
+    final String uniqueIdentifier = outputContext.getUniqueIdentifier();
     Path finalOutputFile =
         mapOutputFile.getOutputFileForWrite(0); //TODO
     Path finalIndexFile =
@@ -347,8 +332,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
     
     //The output stream for the final single output file
     FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
-    
-    sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
+
     TezMerger.considerFinalMergeForProgress();
 
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
@@ -357,7 +341,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
     for(int i = 0; i < numSpills; i++) {
       // TODO: build this cache before
       Path indexFilename = mapOutputFile.getSpillIndexFile(i);
-      TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, job);
+      TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
       indexCacheList.add(spillIndex);
     }
     
@@ -370,34 +354,34 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
         TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
         Segment s =
-            new Segment(job, rfs, spillFilename, indexRecord.getStartOffset(),
+            new Segment(conf, rfs, spillFilename, indexRecord.getStartOffset(),
                              indexRecord.getPartLength(), codec, true);
         segmentList.add(i, s);
       }
 
       int mergeFactor = 
-              job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
+              this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
                   TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
       // sort the segments only if there are intermediate merges
       boolean sortSegments = segmentList.size() > mergeFactor;
       //merge
-      @SuppressWarnings("unchecked")
-      TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+      TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
                      keyClass, valClass, codec,
                      segmentList, mergeFactor,
-                     new Path(mapId.toString()),
-                     (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job), 
-                     runningTaskContext.getTaskReporter(), sortSegments,
-                     null, spilledRecordsCounter, sortPhase.phase());
+                     new Path(uniqueIdentifier),
+                     (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), 
+                     nullProgressable, sortSegments,
+                     null, spilledRecordsCounter,
+                     null); // Not using any Progress in TezMerger. Should just work.
 
       //write merged output to disk
       long segmentStart = finalOut.getPos();
       Writer writer =
-          new Writer(job, finalOut, keyClass, valClass, codec,
+          new Writer(conf, finalOut, keyClass, valClass, codec,
                            spilledRecordsCounter);
       writer.setRLE(merger.needsRLE());
       if (combineProcessor == null || numSpills < minSpillsForCombine) {
-        TezMerger.writeFile(kvIter, writer, runningTaskContext.getTaskReporter(), job);
+        TezMerger.writeFile(kvIter, writer, nullProgressable, conf);
       } else {
     	runCombineProcessor(kvIter, writer);
       }
@@ -405,8 +389,6 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
       //close
       writer.close();
 
-      sortPhase.startNextPhase();
-      
       // record offsets
       final TezIndexRecord rec = 
           new TezIndexRecord(
@@ -416,7 +398,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
       spillRec.putIndex(rec, parts);
     }
 
-    spillRec.writeToFile(finalIndexFile, job);
+    spillRec.writeToFile(finalIndexFile, conf);
     finalOut.close();
     for(int i = 0; i < numSpills; i++) {
       Path indexFilename = mapOutputFile.getSpillIndexFile(i);
@@ -520,7 +502,7 @@ public class PipelinedSorter extends ExternalSorter implements SortingOutput {
       kj = new byte[keymax];
       LOG.info("begin sorting Span"+index + " ("+length()+")");
       if(length() > 1) {
-        sorter.sort(this, 0, length(), runningTaskContext.getTaskReporter());
+        sorter.sort(this, 0, length(), nullProgressable);
       }
       LOG.info("done sorting Span"+index);
       return new SpanIterator(this);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
index eeea764..7815569 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/TezMerger.java
@@ -55,6 +55,7 @@ import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 public class TezMerger {  
   private static final Log LOG = LogFactory.getLog(TezMerger.class);
 
+  
   // Local directories
   private static LocalDirAllocator lDirAlloc = 
     new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
index adbff22..b1e17e7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/DefaultSorter.java
@@ -39,11 +39,7 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.util.IndexedSortable;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskContext;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.ExternalSorter;
 import org.apache.tez.engine.common.sort.impl.IFile;
@@ -53,13 +49,15 @@ import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
 
 @SuppressWarnings({"unchecked", "rawtypes"})
 public class DefaultSorter extends ExternalSorter implements IndexedSortable {
-
+  
   private static final Log LOG = LogFactory.getLog(DefaultSorter.class);
 
+  // TODO NEWTEZ Progress reporting to Tez framework. (making progress vs %complete)
+  
   /**
    * The size of each record in the index file for the map-outputs.
    */
@@ -112,26 +110,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   private int totalIndexCacheMemory;
   private int indexCacheMemoryLimit;
 
-  public DefaultSorter(TezTaskContext task) throws IOException {
-    // Does this assisted inject work ?
-    super((TezEngineTaskContext)task);
-  }
-
   @Override
-  public void initialize(Configuration conf, Master master)
-      throws IOException, InterruptedException {
-    if (task == null) {
-      LOG.info("Bailing!", new IOException());
-      return;
-    }
-
-    super.initialize(conf, master);
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException { 
+    super.initialize(outputContext, conf, numOutputs);
 
     // sanity checks
-    final float spillper = job.getFloat(
+    final float spillper = this.conf.getFloat(
         TezJobConfig.TEZ_ENGINE_SORT_SPILL_PERCENT,
         TezJobConfig.DEFAULT_TEZ_ENGINE_SORT_SPILL_PERCENT);
-    final int sortmb = job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
+    final int sortmb = this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_MB,
         TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_MB);
     if (spillper > (float) 1.0 || spillper <= (float) 0.0) {
       throw new IOException("Invalid \""
@@ -142,7 +129,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           + "\": " + sortmb);
     }
 
-    indexCacheMemoryLimit = job.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
+    indexCacheMemoryLimit = this.conf.getInt(TezJobConfig.TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES,
                                        TezJobConfig.DEFAULT_TEZ_ENGINE_INDEX_CACHE_MEMORY_LIMIT_BYTES);
 
     // buffers and accounting
@@ -172,7 +159,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     keySerializer.open(bb);
 
     spillInProgress = false;
-    minSpillsForCombine = job.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
+    minSpillsForCombine = this.conf.getInt(TezJobConfig.TEZ_ENGINE_COMBINE_MIN_SPILLS, 3);
     spillThread.setDaemon(true);
     spillThread.setName("SpillThread");
     spillLock.lock();
@@ -194,7 +181,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
 
   @Override
   public void write(Object key, Object value)
-      throws IOException, InterruptedException {
+      throws IOException {
     collect(
         key, value, partitioner.getPartition(key, value, partitions));
   }
@@ -206,7 +193,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
    */
   synchronized void collect(Object key, Object value, final int partition
                                    ) throws IOException {
-    runningTaskContext.getTaskReporter().progress();
+
     if (key.getClass() != keyClass) {
       throw new IOException("Type mismatch in key from map: expected "
                             + keyClass.getName() + ", received "
@@ -571,7 +558,6 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
               // wait for spill
               try {
                 while (spillInProgress) {
-                  runningTaskContext.getTaskReporter().progress();
                   spillDone.await();
                 }
               } catch (InterruptedException e) {
@@ -598,12 +584,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   }
 
   @Override
-  public void flush() throws IOException, InterruptedException {
+  public void flush() throws IOException {
     LOG.info("Starting flush of map output");
     spillLock.lock();
     try {
       while (spillInProgress) {
-        runningTaskContext.getTaskReporter().progress();
         spillDone.await();
       }
       checkSpillException();
@@ -655,7 +640,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
   }
 
   @Override
-  public void close() throws IOException, InterruptedException { }
+  public void close() throws IOException { }
 
   protected class SpillThread extends Thread {
 
@@ -698,10 +683,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     final Throwable lspillException = sortSpillException;
     if (lspillException != null) {
       if (lspillException instanceof Error) {
-        final String logMsg = "Task " + task.getTaskAttemptId() + " failed : " +
-          StringUtils.stringifyException(lspillException);
-        runningTaskContext.getTaskReporter().reportFatalError(
-            task.getTaskAttemptId(), lspillException, logMsg);
+        final String logMsg = "Task " + outputContext.getUniqueIdentifier()
+            + " failed : " + StringUtils.stringifyException(lspillException);
+        outputContext.fatalError(lspillException, logMsg);
       }
       throw new IOException("Spill failed", lspillException);
     }
@@ -739,7 +723,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       throws IOException, InterruptedException {
     final int mstart = getMetaStart();
     final int mend = getMetaEnd();
-    sorter.sort(this, mstart, mend, runningTaskContext.getTaskReporter());
+    sorter.sort(this, mstart, mend, nullProgressable);
     spill(mstart, mend);
   }
 
@@ -766,7 +750,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         IFile.Writer writer = null;
         try {
           long segmentStart = out.getPos();
-          writer = new Writer(job, out, keyClass, valClass, codec,
+          writer = new Writer(conf, out, keyClass, valClass, codec,
                                     spilledRecordsCounter);
           if (combineProcessor == null) {
             // spill directly
@@ -824,7 +808,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         Path indexFilename =
             mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                 * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        spillRec.writeToFile(indexFilename, job);
+        spillRec.writeToFile(indexFilename, conf);
       } else {
         indexCacheList.add(spillRec);
         totalIndexCacheMemory +=
@@ -859,7 +843,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         try {
           long segmentStart = out.getPos();
           // Create a new codec, don't care!
-          writer = new IFile.Writer(job, out, keyClass, valClass, codec,
+          writer = new IFile.Writer(conf, out, keyClass, valClass, codec,
                                           spilledRecordsCounter);
 
           if (i == partition) {
@@ -890,7 +874,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         Path indexFilename =
             mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                 * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        spillRec.writeToFile(indexFilename, job);
+        spillRec.writeToFile(indexFilename, conf);
       } else {
         indexCacheList.add(spillRec);
         totalIndexCacheMemory +=
@@ -988,12 +972,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     public void close() { }
   }
 
-  private void mergeParts() throws IOException, InterruptedException {
+  private void mergeParts() throws IOException {
     // get the approximate size of the final output/index files
     long finalOutFileSize = 0;
     long finalIndexFileSize = 0;
     final Path[] filename = new Path[numSpills];
-    final TezTaskAttemptID mapId = task.getTaskAttemptId();
+    final String taskIdentifier = outputContext.getUniqueIdentifier();
 
     for(int i = 0; i < numSpills; i++) {
       filename[i] = mapOutputFile.getSpillFile(i);
@@ -1007,16 +991,15 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
       } else {
         indexCacheList.get(0).writeToFile(
-          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), job);
+          mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
       }
-      sortPhase.complete();
       return;
     }
 
     // read in paged indices
     for (int i = indexCacheList.size(); i < numSpills; ++i) {
       Path indexFileName = mapOutputFile.getSpillIndexFile(i);
-      indexCacheList.add(new TezSpillRecord(indexFileName, job));
+      indexCacheList.add(new TezSpillRecord(indexFileName, conf));
     }
 
     //make correction in the length to include the sequence file header
@@ -1039,7 +1022,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         for (int i = 0; i < partitions; i++) {
           long segmentStart = finalOut.getPos();
           Writer writer =
-            new Writer(job, finalOut, keyClass, valClass, codec, null);
+            new Writer(conf, finalOut, keyClass, valClass, codec, null);
           writer.close();
 
           TezIndexRecord rec =
@@ -1049,15 +1032,13 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                   writer.getCompressedLength());
           sr.putIndex(rec, i);
         }
-        sr.writeToFile(finalIndexFile, job);
+        sr.writeToFile(finalIndexFile, conf);
       } finally {
         finalOut.close();
       }
-      sortPhase.complete();
       return;
     }
     else {
-      sortPhase.addPhases(partitions); // Divide sort phase into sub-phases
       TezMerger.considerFinalMergeForProgress();
 
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
@@ -1069,12 +1050,12 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
           Segment s =
-            new Segment(job, rfs, filename[i], indexRecord.getStartOffset(),
+            new Segment(conf, rfs, filename[i], indexRecord.getStartOffset(),
                              indexRecord.getPartLength(), codec, true);
           segmentList.add(i, s);
 
           if (LOG.isDebugEnabled()) {
-            LOG.debug("MapId=" + mapId + " Reducer=" + parts +
+            LOG.debug("TaskIdentifier=" + taskIdentifier + " Partition=" + parts +
                 "Spill =" + i + "(" + indexRecord.getStartOffset() + "," +
                 indexRecord.getRawLength() + ", " +
                 indexRecord.getPartLength() + ")");
@@ -1082,34 +1063,33 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         }
 
         int mergeFactor =
-            job.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
+            this.conf.getInt(TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR,
                 TezJobConfig.DEFAULT_TEZ_ENGINE_IO_SORT_FACTOR);
         // sort the segments only if there are intermediate merges
         boolean sortSegments = segmentList.size() > mergeFactor;
         //merge
-        TezRawKeyValueIterator kvIter = TezMerger.merge(job, rfs,
+        TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
                        keyClass, valClass, codec,
                        segmentList, mergeFactor,
-                       new Path(mapId.toString()),
-                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(job),
-                       runningTaskContext.getTaskReporter(), sortSegments,
+                       new Path(taskIdentifier),
+                       (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
+                       nullProgressable, sortSegments,
                        null, spilledRecordsCounter,
-                       sortPhase.phase());
+                       null); // Not using any Progress in TezMerger. Should just work.
 
         //write merged output to disk
         long segmentStart = finalOut.getPos();
         Writer writer =
-            new Writer(job, finalOut, keyClass, valClass, codec,
+            new Writer(conf, finalOut, keyClass, valClass, codec,
                 spilledRecordsCounter);
         if (combineProcessor == null || numSpills < minSpillsForCombine) {
           TezMerger.writeFile(kvIter, writer,
-              runningTaskContext.getTaskReporter(), job);
+              nullProgressable, conf);
         } else {
           runCombineProcessor(kvIter, writer);
         }
         writer.close();
 
-        sortPhase.startNextPhase();
         // record offsets
         final TezIndexRecord rec =
             new TezIndexRecord(
@@ -1118,17 +1098,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                 writer.getCompressedLength());
         spillRec.putIndex(rec, parts);
       }
-      spillRec.writeToFile(finalIndexFile, job);
+      spillRec.writeToFile(finalIndexFile, conf);
       finalOut.close();
       for(int i = 0; i < numSpills; i++) {
         rfs.delete(filename[i],true);
       }
     }
   }
-
-  @Override
-  public OutputContext getOutputContext() {
-    return null;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
index 6af6dab..53e6003 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/InMemoryShuffleSorter.java
@@ -29,12 +29,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.DataChecksum;
-import org.apache.tez.common.TezTaskContext;
-import org.apache.tez.engine.api.Master;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.TezOutputContext;
 
 public class InMemoryShuffleSorter extends DefaultSorter {
 
@@ -51,16 +49,11 @@ public class InMemoryShuffleSorter extends DefaultSorter {
   
   byte[] kvbuffer;
   IntBuffer kvmeta;
-  
-  public InMemoryShuffleSorter(TezTaskContext task) throws IOException {
-    super(task);
-  }
 
   @Override
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-    super.initialize(conf, master);
-    shuffleHandler.init(conf, runningTaskContext);
+  public void initialize(TezOutputContext outputContext, Configuration conf, int numOutputs) throws IOException {
+    super.initialize(outputContext, conf, numOutputs);
+    shuffleHandler.initialize(outputContext, conf);
   }
 
   @Override
@@ -98,7 +91,7 @@ public class InMemoryShuffleSorter extends DefaultSorter {
       
       shuffleHeaders.add( 
           new ShuffleHeader(
-              task.getTaskAttemptId().toString(), 
+              outputContext.getUniqueIdentifier(), // TODO Verify that this is correct. 
               length + IFILE_CHECKSUM_LENGTH, length, i)
           );
       LOG.info("shuffleHeader[" + i + "]:" +
@@ -116,7 +109,7 @@ public class InMemoryShuffleSorter extends DefaultSorter {
   }
 
   @Override
-  public void close() throws IOException, InterruptedException{
+  public void close() throws IOException {
     // FIXME
     //shuffleHandler.stop();
   }
@@ -130,9 +123,4 @@ public class InMemoryShuffleSorter extends DefaultSorter {
     return spillIndices.get(partition);
   }
 
-  @Override
-  public OutputContext getOutputContext() {
-    return new OutputContext(shuffleHandler.getPort());
-  }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
index 47e6234..d74e159 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/dflt/SortBufferInputStream.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.tez.engine.common.shuffle.impl.InMemoryWriter;
 import org.apache.tez.engine.common.sort.impl.dflt.DefaultSorter.InMemValBytes;
 
-class SortBufferInputStream extends InputStream {
+  public class SortBufferInputStream extends InputStream {
 
   private static final Log LOG = LogFactory.getLog(SortBufferInputStream.class);
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
new file mode 100644
index 0000000..7f165eb
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.newoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.shuffle.impl.TaskAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from the Child running the Task.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezLocalTaskOutputFiles extends TezTaskOutput {
+
+  public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
+    super(conf, uniqueId);
+  }
+
+  private LocalDirAllocator lDirAlloc =
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  @Override
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputIndexFile()
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+        conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputIndexFileForWrite(long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  @Override
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    return new Path(existing.getParent(),
+        Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out", conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out", size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillIndexFile(int spillNumber)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out.index", conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
+        + spillNumber + ".out.index", size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param mapId a map task id
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getInputFile(TaskAttemptIdentifier mapId)
+      throws IOException {
+    return lDirAlloc.getLocalPathToRead(String.format(
+        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, 
+        Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getTaskIndex())), conf);
+  }
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param mapId a map task id
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getInputFileForWrite(int taskId,
+                                   long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  @Override
+  public void removeAll()
+      throws IOException {
+    deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
+  }
+
+  private String[] getLocalDirs() throws IOException {
+    return conf.getStrings(TezJobConfig.LOCAL_DIRS);
+  }
+
+  @SuppressWarnings("deprecation")
+  private void deleteLocalFiles(String subdir) throws IOException {
+    String[] localDirs = getLocalDirs();
+    for (int i = 0; i < localDirs.length; i++) {
+      FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
new file mode 100644
index 0000000..6779d32
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.newoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.engine.common.shuffle.impl.TaskAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public abstract class TezTaskOutput {
+
+  protected Configuration conf;
+  protected String uniqueId;
+
+  public TezTaskOutput(Configuration conf, String uniqueId) {
+    this.conf = conf;
+    this.uniqueId = uniqueId;
+  }
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFile() throws IOException;
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public abstract Path getOutputFileForWriteInVolume(Path existing);
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputIndexFile() throws IOException;
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillFile(int spillNumber) throws IOException;
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException;
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException;
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param attemptIdentifier The identifier for the source task
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getInputFile(TaskAttemptIdentifier attemptIdentifier) throws IOException;
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param taskIdentifier The identifier for the source task
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getInputFileForWrite(
+      int taskIdentifier, long size) throws IOException;
+
+  /** Removes all of the files related to a task. */
+  public abstract void removeAll() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
new file mode 100644
index 0000000..ae9c6d8
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.common.task.local.newoutput;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.Path;
+import org.apache.tez.common.Constants;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.engine.common.shuffle.impl.TaskAttemptIdentifier;
+
+/**
+ * Manipulate the working area for the transient store for maps and reduces.
+ *
+ * This class is used by map and reduce tasks to identify the directories that
+ * they need to write to/read from for intermediate files. The callers of
+ * these methods are from child space and see mapreduce.cluster.local.dir as
+ * taskTracker/jobCache/jobId/attemptId
+ * This class should not be used from TaskTracker space.
+ */
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class TezTaskOutputFiles extends TezTaskOutput {
+  
+  public TezTaskOutputFiles(Configuration conf, String uniqueId) {
+    super(conf, uniqueId);
+  }
+
+  private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
+
+  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
+  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
+      + ".index";
+
+  
+
+  // assume configured to $localdir/usercache/$user/appcache/$appId
+  private LocalDirAllocator lDirAlloc =
+    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+  
+
+  private Path getAttemptOutputDir() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAttemptOutputDir: "
+          + Constants.TASK_OUTPUT_DIR + "/"
+          + uniqueId);
+    }
+    return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
+  }
+
+  /**
+   * Return the path to local map output file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFile() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite(long size) throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
+  }
+
+  /**
+   * Create a local map output file name on the same volume.
+   */
+  public Path getOutputFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir, uniqueId);
+    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+  }
+
+  /**
+   * Return the path to a local map output index file created earlier
+   *
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFile() throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
+  }
+
+  /**
+   * Create a local map output index file name.
+   *
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputIndexFileForWrite(long size) throws IOException {
+    Path attemptIndexOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
+        size, conf);
+  }
+
+  /**
+   * Create a local map output index file name on the same volume.
+   */
+  public Path getOutputIndexFileForWriteInVolume(Path existing) {
+    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
+    Path attemptOutputDir = new Path(outputDir, uniqueId);
+    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
+                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
+  }
+
+  /**
+   * Return a local map spill file created earlier.
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_FILE_PATTERN,
+            uniqueId, spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(String.format(SPILL_FILE_PATTERN,
+            uniqueId, spillNumber)), size, conf);
+  }
+
+  /**
+   * Return a local map spill index file created earlier
+   *
+   * @param spillNumber the number
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFile(int spillNumber) throws IOException {
+    return lDirAlloc.getLocalPathToRead(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            uniqueId, spillNumber), conf);
+  }
+
+  /**
+   * Create a local map spill index file name.
+   *
+   * @param spillNumber the number
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getSpillIndexFileForWrite(int spillNumber, long size)
+      throws IOException {
+    return lDirAlloc.getLocalPathForWrite(
+        String.format(SPILL_INDEX_FILE_PATTERN,
+            uniqueId, spillNumber), size, conf);
+  }
+
+  /**
+   * Return a local reduce input file created earlier
+   *
+   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFile(TaskAttemptIdentifier attemptIdentifier) throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+
+  /**
+   * Create a local reduce input file name.
+   *
+   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
+   * @param size the size of the file
+   * @return path
+   * @throws IOException
+   */
+  public Path getInputFileForWrite(int srcTaskId,
+      long size) throws IOException {
+    return lDirAlloc.getLocalPathForWrite(String.format(
+        uniqueId, getAttemptOutputDir().toString(), srcTaskId),
+        size, conf);
+  }
+
+  /** Removes all of the files related to a task. */
+  public void removeAll() throws IOException {
+    throw new UnsupportedOperationException("Incompatible with LocalRunner");
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
index eb1fe5f..50d270b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
@@ -40,7 +40,8 @@ import org.apache.tez.dag.records.TezTaskID;
 @InterfaceStability.Unstable
 public abstract class TezTaskOutput implements Configurable {
 
-  private Configuration conf;
+  protected Configuration conf;
+  protected String uniqueId;
 
   public TezTaskOutput() {
   }
@@ -152,6 +153,14 @@ public abstract class TezTaskOutput implements Configurable {
   /** Removes all of the files related to a task. */
   public abstract void removeAll() throws IOException;
 
+  public void setUniqueIdentifier(String uniqueId) {
+    this.uniqueId = uniqueId;
+  }
+  
+  public String getUniqueIdentifier() {
+    return this.uniqueId;
+  }
+  
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java b/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
new file mode 100644
index 0000000..5071dd2
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/hadoop/compat/NullProgressable.java
@@ -0,0 +1,33 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.hadoop.compat;
+
+import org.apache.hadoop.util.Progressable;
+
+public class NullProgressable implements Progressable {
+
+  public NullProgressable() {
+    // TODO Auto-generated constructor stub
+  }
+
+  @Override
+  public void progress() {
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
index 6b1eb10..8ae6bfe 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/LocalMergedInput.java
@@ -18,65 +18,39 @@
 package org.apache.tez.engine.lib.input;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.engine.common.localshuffle.LocalShuffle;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.TezInputContext;
 
 /**
- * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
+ * <code>LocalMergedInput</code> in an {@link LogicalInput} which shuffles intermediate
  * sorted data, merges them and provides key/<values> to the consumer. 
  */
 public class LocalMergedInput extends ShuffledMergedInput {
 
-  TezRawKeyValueIterator rIter = null;
-  
-  private Configuration conf;
-  private CombineInput raw;
 
-  public LocalMergedInput(TezEngineTaskContext task, int index) {
-    super(task, index);
-  }
-
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-    this.conf = conf;
-
-    LocalShuffle shuffle =
-        new LocalShuffle(task, runningTaskContext, this.conf, (TezTaskReporter)master);
-    rIter = shuffle.run();
-    raw = new CombineInput(rIter);
-  }
-
-  public boolean hasNext() throws IOException, InterruptedException {
-    return raw.hasNext();
-  }
+  // TODO NEWTEZ Fix CombineProcessor
+  //private CombineInput raw;
 
-  public Object getNextKey() throws IOException, InterruptedException {
-    return raw.getNextKey();
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public Iterable getNextValues() 
-      throws IOException, InterruptedException {
-    return raw.getNextValues();
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    return raw.getProgress();
-  }
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
 
-  public void close() throws IOException {
-    raw.close();
+    LocalShuffle localShuffle = new LocalShuffle(inputContext, conf, numInputs);
+    // TODO NEWTEZ async run and checkIfComplete methods
+    rawIter = localShuffle.run();
+    return Collections.emptyList();
   }
 
-  public TezRawKeyValueIterator getIterator() {
-    return rIter;
+  @Override
+  public List<Event> close() throws IOException {
+    rawIter.close();
+    return Collections.emptyList();
   }
-  
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index b11e009..fa7054a 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -18,90 +18,160 @@
 package org.apache.tez.engine.lib.input;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.combine.CombineInput;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TaskCounter;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.ConfigUtils;
+import org.apache.tez.engine.common.ValuesIterator;
 import org.apache.tez.engine.common.shuffle.impl.Shuffle;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVReader;
+import org.apache.tez.engine.newapi.LogicalInput;
+import org.apache.tez.engine.newapi.TezInputContext;
 
 /**
- * {@link ShuffledMergedInput} in an {@link Input} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer. 
+ * <code>ShuffleMergedInput</code> in a {@link LogicalInput} which shuffles
+ * intermediate sorted data, merges them and provides key/<values> to the
+ * consumer.
+ * 
+ * The Copy and Merge will be triggered by the initialization - which is handled
+ * by the Tez framework. Input is not consumable until the Copy and Merge are
+ * complete. Methods are provided to check for this, as well as to wait for
+ * completion. Attempting to get a reader on a non-complete input will block.
+ * 
  */
-public class ShuffledMergedInput implements Input {
+public class ShuffledMergedInput implements LogicalInput {
 
   static final Log LOG = LogFactory.getLog(ShuffledMergedInput.class);
-  TezRawKeyValueIterator rIter = null;
 
-  protected TezEngineTaskContext task;
-  protected int index;
-  protected RunningTaskContext runningTaskContext;
+  protected TezInputContext inputContext;
+  protected TezRawKeyValueIterator rawIter = null;
+  protected Configuration conf;
+  protected int numInputs = 0;
+  protected Shuffle shuffle;
+  @SuppressWarnings("rawtypes")
+  protected ValuesIterator vIter;
   
-  private Configuration conf;
-  private CombineInput raw;
-  private int taskIndegree = 0;
-
-  public ShuffledMergedInput(TezEngineTaskContext task, int index) {
-    this.task = task;
-    this.index = index;
-    this.taskIndegree = this.task.getInputSpecList().get(this.index)
-        .getNumInputs();
-  }
-
-  public void mergeWith(ShuffledMergedInput other) {
-    this.taskIndegree += other.taskIndegree;
-  }
-  
-  public void setTask(RunningTaskContext runningTaskContext) {
-    this.runningTaskContext = runningTaskContext;
-  }
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
   
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-    this.conf = conf;
-        
-    Shuffle shuffle = 
-      new Shuffle(
-          task, runningTaskContext, this.conf, 
-          taskIndegree,
-          (TezTaskReporter)master, 
-          runningTaskContext.getCombineProcessor());
-    rIter = shuffle.run();
+  @Override
+  public List<Event> initialize(TezInputContext inputContext) throws IOException {
+    this.inputContext = inputContext;
+    this.conf = TezUtils.createConfFromUserPayload(inputContext.getUserPayload());
     
-    raw = new CombineInput(rIter);
+    this.inputKeyCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_GROUPS);
+    this.inputValueCounter = inputContext.getCounters().findCounter(TaskCounter.REDUCE_INPUT_RECORDS);
+    
+    // Start the shuffle - copy and merge.
+    shuffle = new Shuffle(inputContext, this.conf, numInputs);
+    shuffle.run();
+    
+    return Collections.emptyList();
   }
 
-  public boolean hasNext() throws IOException, InterruptedException {
-    return raw.hasNext();
+  /**
+   * Check if the input is ready for consumption
+   * 
+   * @return true if the input is ready for consumption, or if an error occurred
+   *         processing fetching the input. false if the shuffle and merge are
+   *         still in progress
+   */
+  public boolean isInputReady() {
+    return shuffle.isInputReady();
   }
 
-  public Object getNextKey() throws IOException, InterruptedException {
-    return raw.getNextKey();
+  /**
+   * Waits for the input to become ready for consumption
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  public void waitForInputReady() throws IOException, InterruptedException {
+    rawIter = shuffle.waitForInput();
+    createValuesIteartor();
   }
 
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public Iterable getNextValues() 
-      throws IOException, InterruptedException {
-    return raw.getNextValues();
+  @Override
+  public List<Event> close() throws IOException {
+    rawIter.close();
+    return Collections.emptyList();
   }
 
-  public float getProgress() throws IOException, InterruptedException {
-    return raw.getProgress();
+  /**
+   * Get a KVReader for the Input.</p> This method will block until the input is
+   * ready - i.e. the copy and merge stages are complete. Users can use the
+   * isInputReady method to check if the input is ready, which gives an
+   * indication of whether this method will block or not.
+   * 
+   * NOTE: All values for the current K-V pair must be read prior to invoking
+   * moveToNext. Once moveToNext() is called, the valueIterator from the
+   * previous K-V pair will throw an Exception
+   * 
+   * @return a KVReader over the sorted input.
+   */
+  @Override
+  public KVReader getReader() throws IOException {
+    if (rawIter != null) {
+      try {
+        waitForInputReady();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new IOException("Interrupted while waiting for input ready", e);
+      }
+    }
+    return new KVReader() {
+      
+      @Override
+      public boolean moveToNext() throws IOException {
+        return vIter.moveToNext();
+      }
+      
+      @SuppressWarnings("unchecked")
+      @Override
+      public KVRecord getCurrentKV() {
+        return new KVRecord(vIter.getKey(), vIter.getValues());
+      }
+    };
   }
 
-  public void close() throws IOException {
-    raw.close();
+  @Override
+  public void handleEvents(List<Event> inputEvents) {
+    shuffle.handleEvents(inputEvents);
   }
 
-  public TezRawKeyValueIterator getIterator() {
-    return rIter;
+  @Override
+  public void setNumPhysicalInputs(int numInputs) {
+    this.numInputs = numInputs;
   }
   
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  private void createValuesIteartor()
+      throws IOException {
+    vIter = new ValuesIterator(rawIter,
+        (RawComparator) ConfigUtils.getIntermediateInputKeyComparator(conf),
+        ConfigUtils.getIntermediateInputKeyClass(conf),
+        ConfigUtils.getIntermediateInputValueClass(conf), conf, inputKeyCounter, inputValueCounter);
+            
+  }
+
+
+  // This functionality is currently broken. If there's inputs which need to be
+  // written to disk, there's a possibility that inputs from the different
+  // sources could clobber each others' output. Also the current structures do
+  // not have adequate information to de-dupe these (vertex name)
+//  public void mergeWith(ShuffledMergedInput other) {
+//    this.numInputs += other.getNumPhysicalInputs();
+//  }
+//  
+//  public int getNumPhysicalInputs() {
+//    return this.numInputs;
+//  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
new file mode 100644
index 0000000..269fe81
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldinput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class LocalMergedInput extends OldShuffledMergedInput {
+
+  public LocalMergedInput(TezEngineTaskContext task, int index) {
+    super(task, index);
+  }
+
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    return false;
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public Iterable getNextValues() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return 0f;
+  }
+
+  public void close() throws IOException {
+  }
+
+  public TezRawKeyValueIterator getIterator() {
+    return null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
new file mode 100644
index 0000000..c046a27
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldinput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Input;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+/**
+ * {@link OldShuffledMergedInput} in an {@link Input} which shuffles intermediate
+ * sorted data, merges them and provides key/<values> to the consumer. 
+ */
+public class OldShuffledMergedInput implements Input {
+
+
+  public OldShuffledMergedInput(TezEngineTaskContext task, int index) {
+  }
+
+  public void mergeWith(OldShuffledMergedInput other) {
+  }
+  
+  public void setTask(RunningTaskContext runningTaskContext) {
+  }
+  
+  public void initialize(Configuration conf, Master master) throws IOException,
+      InterruptedException {
+  }
+
+  public boolean hasNext() throws IOException, InterruptedException {
+    return false;
+  }
+
+  public Object getNextKey() throws IOException, InterruptedException {
+    return null;
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public Iterable getNextValues() 
+      throws IOException, InterruptedException {
+    return null;
+  }
+
+  public float getProgress() throws IOException, InterruptedException {
+    return 0f;
+  }
+
+  public void close() throws IOException {
+  }
+
+  public TezRawKeyValueIterator getIterator() {
+    return null;
+  }
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
new file mode 100644
index 0000000..9ac92ba
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldInMemorySortedOutput.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.records.OutputContext;
+
+/**
+ * {@link OldInMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class OldInMemorySortedOutput implements SortingOutput {
+  
+  public OldInMemorySortedOutput(TezEngineTaskContext task) throws IOException {
+  }
+  
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+  }
+
+  public void setTask(RunningTaskContext task) {
+  }
+  
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+  }
+
+  public void close() throws IOException, InterruptedException {
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
new file mode 100644
index 0000000..b7f913c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldLocalOnFileSorterOutput.java
@@ -0,0 +1,38 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.lib.oldoutput;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.common.TezEngineTaskContext;
+
+public class OldLocalOnFileSorterOutput extends OldOnFileSortedOutput {
+
+  private static final Log LOG = LogFactory.getLog(OldLocalOnFileSorterOutput.class);
+
+  public OldLocalOnFileSorterOutput(TezEngineTaskContext task) throws IOException {
+    super(task);
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
new file mode 100644
index 0000000..f259df9
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldoutput/OldOnFileSortedOutput.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.lib.oldoutput;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.RunningTaskContext;
+import org.apache.tez.common.TezEngineTaskContext;
+import org.apache.tez.engine.api.Master;
+import org.apache.tez.engine.api.Output;
+import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.engine.records.OutputContext;
+
+/**
+ * {@link OldOnFileSortedOutput} is an {@link Output} which sorts key/value pairs 
+ * written to it and persists it to a file.
+ */
+public class OldOnFileSortedOutput implements SortingOutput {
+
+  public OldOnFileSortedOutput(TezEngineTaskContext task) throws IOException {
+  }
+  
+  @Override
+  public void initialize(Configuration conf, Master master) 
+      throws IOException, InterruptedException {
+  }
+
+  @Override
+  public void setTask(RunningTaskContext task) {
+  }
+  
+  @Override
+  public void write(Object key, Object value) throws IOException,
+      InterruptedException {
+  }
+
+  @Override
+  public void close() throws IOException, InterruptedException {
+  }
+
+  @Override
+  public OutputContext getOutputContext() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
index 99486be..5d2a2ba 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/InMemorySortedOutput.java
@@ -18,53 +18,64 @@
 package org.apache.tez.engine.lib.output;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.SortingOutput;
+import org.apache.tez.common.TezUtils;
 import org.apache.tez.engine.common.sort.impl.dflt.InMemoryShuffleSorter;
-import org.apache.tez.engine.records.OutputContext;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.newapi.LogicalOutput;
+import org.apache.tez.engine.newapi.Output;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newapi.Writer;
 
 /**
  * {@link InMemorySortedOutput} is an {@link Output} which sorts key/value pairs 
  * written to it and persists it to a file.
  */
-public class InMemorySortedOutput implements SortingOutput {
+public class InMemorySortedOutput implements LogicalOutput {
   
   protected InMemoryShuffleSorter sorter;
+  protected int numTasks;
+  protected TezOutputContext outputContext;
   
-  public InMemorySortedOutput(TezEngineTaskContext task) throws IOException {
-    sorter = new InMemoryShuffleSorter(task);
-  }
-  
-  public void initialize(Configuration conf, Master master) 
-      throws IOException, InterruptedException {
-    sorter.initialize(conf, master);
-  }
 
-  public void setTask(RunningTaskContext task) {
-    sorter.setTask(task);
-  }
-  
-  public void write(Object key, Object value) throws IOException,
-      InterruptedException {
-    sorter.write(key, value);
+  @Override
+  public List<Event> initialize(TezOutputContext outputContext)
+      throws IOException {
+    this.outputContext = outputContext;
+    this.sorter = new InMemoryShuffleSorter();
+    sorter.initialize(outputContext, TezUtils.createConfFromUserPayload(outputContext.getUserPayload()), numTasks);
+    return Collections.emptyList();
   }
 
-  public void close() throws IOException, InterruptedException {
-    sorter.flush();
-    sorter.close();
+  @Override
+  public Writer getWriter() throws IOException {
+    return new KVWriter() {
+      
+      @Override
+      public void write(Object key, Object value) throws IOException {
+        sorter.write(key, value);
+      }
+    };
   }
 
   @Override
-  public OutputContext getOutputContext() {
-    return sorter.getOutputContext();
+  public void handleEvents(List<Event> outputEvents) {
+    // No events expected.
   }
 
-  public InMemoryShuffleSorter getSorter()  {
-    return sorter;
+  @Override
+  public void setNumPhysicalOutputs(int numOutputs) {
+    this.numTasks = numOutputs;
+  }
+  
+  @Override
+  public List<Event> close() throws IOException {
+    sorter.flush();
+    sorter.close();
+    // TODO NEWTEZ Event generation
+    return null;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
index a954f6e..d23ac1e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/output/LocalOnFileSorterOutput.java
@@ -19,39 +19,40 @@
 package org.apache.tez.engine.lib.output;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.newapi.Event;
 
 public class LocalOnFileSorterOutput extends OnFileSortedOutput {
 
   private static final Log LOG = LogFactory.getLog(LocalOnFileSorterOutput.class);
 
-  public LocalOnFileSorterOutput(TezEngineTaskContext task) throws IOException {
-    super(task);
-  }
+  
 
   @Override
-  public void close() throws IOException, InterruptedException {
+  public List<Event> close() throws IOException {
     LOG.debug("Closing LocalOnFileSorterOutput");
     super.close();
 
     TezTaskOutput mapOutputFile = sorter.getMapOutput();
-    FileSystem localFs = FileSystem.getLocal(mapOutputFile.getConf());
+    FileSystem localFs = FileSystem.getLocal(conf);
 
     Path src = mapOutputFile.getOutputFile();
     Path dst =
         mapOutputFile.getInputFileForWrite(
-            sorter.getTaskAttemptId().getTaskID(),
+            outputContext.getTaskIndex(),
             localFs.getFileStatus(src).getLen());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Renaming src = " + src + ", dst = " + dst);
     }
     localFs.rename(src, dst);
+    // TODO NEWTEZ Event generation.
+    return null;
   }
 }


[4/4] git commit: TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of TEZ-398). (sseth)

Posted by ss...@apache.org.
TEZ-417. Change Shuffle Input/Output to work with the new APIs (part of
TEZ-398). (sseth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/1cf7f197
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/1cf7f197
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/1cf7f197

Branch: refs/heads/TEZ-398
Commit: 1cf7f197dd71d5a01bb457bad1f3c79e93a3afbf
Parents: e5919fa
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Sep 10 21:49:10 2013 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Sep 10 21:49:10 2013 -0700

----------------------------------------------------------------------
 .../java/org/apache/tez/common/Constants.java   |   3 +
 .../java/org/apache/tez/common/IDUtils.java     |   3 +-
 .../org/apache/tez/common/TezJobConfig.java     |   1 +
 .../apache/tez/common/counters/TaskCounter.java |   1 +
 .../org/apache/tez/engine/api/Partitioner.java  |  35 --
 .../tez/engine/newapi/TezTaskContext.java       |  61 +++-
 .../org/apache/tez/engine/api/Partitioner.java  |  35 ++
 .../tez/engine/common/TezEngineUtils.java       |  39 +++
 .../tez/engine/common/ValuesIterator.java       | 192 +++++++++++
 .../common/localshuffle/LocalShuffle.java       |  52 ++-
 .../common/security/DelegationTokenRenewal.java | 318 -------------------
 .../common/shuffle/impl/EventFetcher.java       | 212 -------------
 .../tez/engine/common/shuffle/impl/Fetcher.java | 121 ++++---
 .../common/shuffle/impl/InMemoryReader.java     |   6 +-
 .../tez/engine/common/shuffle/impl/MapHost.java |  18 +-
 .../engine/common/shuffle/impl/MapOutput.java   |  28 +-
 .../common/shuffle/impl/MergeManager.java       | 172 +++++-----
 .../tez/engine/common/shuffle/impl/Shuffle.java | 281 ++++++++--------
 .../shuffle/impl/ShuffleClientMetrics.java      |  16 +-
 .../shuffle/impl/ShuffleInputEventHandler.java  | 132 ++++++++
 .../common/shuffle/impl/ShuffleScheduler.java   | 194 ++++++-----
 .../shuffle/impl/TaskAttemptIdentifier.java     |  95 ++++++
 .../common/shuffle/server/ShuffleHandler.java   |  25 +-
 .../engine/common/sort/impl/ExternalSorter.java | 176 +++++-----
 .../common/sort/impl/IFileOutputStream.java     |   2 -
 .../common/sort/impl/PipelinedSorter.java       |  78 ++---
 .../tez/engine/common/sort/impl/TezMerger.java  |   1 +
 .../common/sort/impl/dflt/DefaultSorter.java    | 104 +++---
 .../sort/impl/dflt/InMemoryShuffleSorter.java   |  24 +-
 .../sort/impl/dflt/SortBufferInputStream.java   |   2 +-
 .../newoutput/TezLocalTaskOutputFiles.java      | 234 ++++++++++++++
 .../task/local/newoutput/TezTaskOutput.java     | 156 +++++++++
 .../local/newoutput/TezTaskOutputFiles.java     | 232 ++++++++++++++
 .../common/task/local/output/TezTaskOutput.java |  11 +-
 .../engine/hadoop/compat/NullProgressable.java  |  33 ++
 .../tez/engine/lib/input/LocalMergedInput.java  |  68 ++--
 .../engine/lib/input/ShuffledMergedInput.java   | 184 +++++++----
 .../engine/lib/oldinput/LocalMergedInput.java   |  67 ++++
 .../lib/oldinput/OldShuffledMergedInput.java    |  74 +++++
 .../lib/oldoutput/OldInMemorySortedOutput.java  |  58 ++++
 .../oldoutput/OldLocalOnFileSorterOutput.java   |  38 +++
 .../lib/oldoutput/OldOnFileSortedOutput.java    |  62 ++++
 .../engine/lib/output/InMemorySortedOutput.java |  71 +++--
 .../lib/output/LocalOnFileSorterOutput.java     |  17 +-
 .../engine/lib/output/OnFileSortedOutput.java   | 103 ++++--
 .../org/apache/tez/engine/newapi/KVReader.java  |  52 ++-
 .../org/apache/tez/engine/newapi/KVWriter.java  |   2 +-
 .../engine/newapi/impl/TezInputContextImpl.java |   4 +-
 .../newapi/impl/TezOutputContextImpl.java       |   3 +
 .../newapi/impl/TezProcessorContextImpl.java    |   3 +
 .../engine/newapi/impl/TezTaskContextImpl.java  |  59 +++-
 .../tez/engine/shuffle/common/ShuffleUtils.java |  56 ++++
 .../tez/mapreduce/examples/MRRSleepJob.java     |   8 +-
 .../mapreduce/examples/OrderedWordCount.java    |   8 +-
 .../apache/tez/mapreduce/TestMRRJobsDAGApi.java |  20 +-
 .../apache/hadoop/mapred/LocalJobRunnerTez.java |   6 +-
 .../processor/reduce/ReduceProcessor.java       |  24 +-
 .../processor/map/TestMapProcessor.java         | 154 +++++----
 .../processor/reduce/TestReduceProcessor.java   |   6 +-
 .../org/apache/tez/mapreduce/YARNRunner.java    |   8 +-
 60 files changed, 2708 insertions(+), 1540 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/Constants.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/Constants.java b/tez-common/src/main/java/org/apache/tez/common/Constants.java
index 9f1b20a..8ea2909 100644
--- a/tez-common/src/main/java/org/apache/tez/common/Constants.java
+++ b/tez-common/src/main/java/org/apache/tez/common/Constants.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
 
 public class Constants {
 
+  // TODO NEWTEZ Check which of these constants are expecting specific pieces of information which are being removed - like taskAttemptId
+  
   public static final String TEZ = "tez";
 
   public static final String MAP_OUTPUT_FILENAME_STRING = "file.out";
@@ -31,6 +33,7 @@ public class Constants {
   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
   public static String MERGED_OUTPUT_PREFIX = ".merged";
   
+  // TODO NEWTEZ Remove this constant once the old code is removed.
   public static final String TEZ_ENGINE_TASK_ATTEMPT_ID = 
       "tez.engine.task.attempt.id";
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
index e94d939..1270e5a 100644
--- a/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
+++ b/tez-common/src/main/java/org/apache/tez/common/IDUtils.java
@@ -56,7 +56,7 @@ public class IDUtils {
     }
     throw new IllegalArgumentException(exceptionMsg);
   }
-
+  
   /** Construct a TaskAttemptID object from given string 
    * @return constructed TaskAttemptID object or null if the given String is null
    * @throws IllegalArgumentException if the given string is malformed
@@ -89,5 +89,4 @@ public class IDUtils {
     }
     throw new IllegalArgumentException(exceptionMsg);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
index 5a847f1..7d8730e 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -157,6 +157,7 @@ public class TezJobConfig {
       "tez.engine.shuffle.use.in-memory";
   public static final boolean DEFAULT_TEZ_ENGINE_SHUFFLE_USE_IN_MEMORY = false;
 
+  // TODO NEWTEZ Remove these config parameters. Will be part of an event.
   @Private
   public static final String TEZ_ENGINE_SHUFFLE_PARTITION_RANGE = 
       "tez.engine.shuffle.partition-range";

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java b/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
index 60ad1c9..b6fca27 100644
--- a/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
+++ b/tez-common/src/main/java/org/apache/tez/common/counters/TaskCounter.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public enum TaskCounter {
+  // TODO Eventually, rename counters to be non-MR specific and map them to MR equivalent.
   MAP_INPUT_RECORDS, 
   MAP_OUTPUT_RECORDS,
   MAP_SKIPPED_RECORDS,

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
deleted file mode 100644
index cbef463..0000000
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/api/Partitioner.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.api;
-
-/**
- * {@link Partitioner} is used by the TEZ framework to partition 
- * output key/value pairs.
- */
-public interface Partitioner {
-  
-  /**
-   * Get partition for given key/value.
-   * @param key key
-   * @param value value
-   * @param numPartitions number of partitions
-   * @return
-   */
-  int getPartition(Object key, Object value, int numPartitions);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
----------------------------------------------------------------------
diff --git a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
index 4cc5668..341377a 100644
--- a/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
+++ b/tez-engine-api/src/main/java/org/apache/tez/engine/newapi/TezTaskContext.java
@@ -18,8 +18,10 @@
 
 package org.apache.tez.engine.newapi;
 
+import java.nio.ByteBuffer;
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.counters.TezCounters;
 
 /**
@@ -28,8 +30,18 @@ import org.apache.tez.common.counters.TezCounters;
  */
 public interface TezTaskContext {
 
+  // TODO NEWTEZ
+  // Scale the maximum events we fetch per RPC call to mitigate OOM issues
+  // on the ApplicationMaster when a thundering herd of reducers fetch events
+  // This should not be necessary after HADOOP-8942
 
   /**
+   * Get the {@link ApplicationId} for the running app
+   * @return the {@link ApplicationId}
+   */
+  public ApplicationId getApplicationId();
+  
+  /**
    * Get the index of this Task
    * @return Task Index
    */
@@ -42,12 +54,17 @@ public interface TezTaskContext {
   public int getAttemptNumber();
 
   /**
+   * Get the name of the DAG
+   * @return the DAG name
+   */
+  public String getDAGName();
+  
+  /**
    * Get the name of the Vertex in which the task is running
    * @return Vertex Name
    */
   public String getTaskVertexName();
 
-
   public TezCounters getCounters();
 
   /**
@@ -62,4 +79,46 @@ public interface TezTaskContext {
    */
   public byte[] getUserPayload();
 
+  /**
+   * Get the work diectories for the Input/Output/Processor
+   * @return an array of work dirs
+   */
+  public String[] getWorkDirs();
+  
+  /**
+   * Returns an identifier which is unique to the specific Input, Processor or
+   * Output
+   * 
+   * @return
+   */
+  public String getUniqueIdentifier();
+  
+  /**
+   * Report a fatal error to the framework. This will cause the entire task to
+   * fail and should not be used for reporting temporary or recoverable errors
+   * 
+   * @param exception an exception representing the error
+   */
+  public void fatalError(Throwable exception, String message);
+  
+  /**
+   * Returns meta-data for the specified service. As an example, when the MR
+   * ShuffleHandler is used - this would return the jobToken serialized as bytes
+   * 
+   * @param serviceName
+   *          the name of the service for which meta-data is required
+   * @return a ByteBuffer representing the meta-data
+   */
+  public ByteBuffer getServiceConsumerMetaData(String serviceName);
+
+  /**
+   * Return Provider meta-data for the specified service As an example, when the
+   * MR ShuffleHandler is used - this would return the shuffle port serialized
+   * as bytes
+   * 
+   * @param serviceName
+   *          the name of the service for which provider meta-data is required
+   * @return a ByteBuffer representing the meta-data
+   */
+  public ByteBuffer getServiceProviderMetaData(String serviceName);
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
new file mode 100644
index 0000000..cbef463
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/Partitioner.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.engine.api;
+
+/**
+ * {@link Partitioner} is used by the TEZ framework to partition 
+ * output key/value pairs.
+ */
+public interface Partitioner {
+  
+  /**
+   * Get partition for given key/value.
+   * @param key key
+   * @param value value
+   * @param numPartitions number of partitions
+   * @return
+   */
+  int getPartition(Object key, Object value, int numPartitions);
+  
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
new file mode 100644
index 0000000..b3287c9
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -0,0 +1,39 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+public class TezEngineUtils {
+
+  public static String getTaskIdentifier(String vertexName, int taskIndex) {
+    return String.format("%s_%06d", vertexName, taskIndex);
+  }
+
+  public static String getTaskAttemptIdentifier(int taskIndex,
+      int taskAttemptNumber) {
+    return String.format("%d_%d", taskIndex, taskAttemptNumber);
+  }
+
+  // TODO Maybe include a dag name in this.
+  public static String getTaskAttemptIdentifier(String vertexName,
+      int taskIndex, int taskAttemptNumber) {
+    return String.format("%s_%06d_%02d", vertexName, taskIndex,
+        taskAttemptNumber);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
new file mode 100644
index 0000000..a33d00b
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/ValuesIterator.java
@@ -0,0 +1,192 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.engine.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.serializer.Deserializer;
+import org.apache.hadoop.io.serializer.SerializationFactory;
+import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterates values while keys match in sorted input.
+ * 
+ * This class is not thread safe. Accessing methods from multiple threads will
+ * lead to corrupt data.
+ * 
+ */
+public class ValuesIterator<KEY,VALUE> {
+  protected TezRawKeyValueIterator in; //input iterator
+  private KEY key;               // current key
+  private KEY nextKey;
+  private VALUE value;             // current value
+  //private boolean hasNext;                      // more w/ this key
+  private boolean more;                         // more in file
+  private RawComparator<KEY> comparator;
+  private Deserializer<KEY> keyDeserializer;
+  private Deserializer<VALUE> valDeserializer;
+  private DataInputBuffer keyIn = new DataInputBuffer();
+  private DataInputBuffer valueIn = new DataInputBuffer();
+  private TezCounter inputKeyCounter;
+  private TezCounter inputValueCounter;
+  
+  private int keyCtr = 0;
+  private boolean hasMoreValues; // For the current key.
+  private boolean isFirstRecord = true;
+  
+  public ValuesIterator (TezRawKeyValueIterator in, 
+                         RawComparator<KEY> comparator, 
+                         Class<KEY> keyClass,
+                         Class<VALUE> valClass, Configuration conf,
+                         TezCounter inputKeyCounter,
+                         TezCounter inputValueCounter)
+    throws IOException {
+    this.in = in;
+    this.comparator = comparator;
+    this.inputKeyCounter = inputKeyCounter;
+    this.inputValueCounter = inputValueCounter;
+    SerializationFactory serializationFactory = new SerializationFactory(conf);
+    this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
+    this.keyDeserializer.open(keyIn);
+    this.valDeserializer = serializationFactory.getDeserializer(valClass);
+    this.valDeserializer.open(this.valueIn);
+  }
+
+  TezRawKeyValueIterator getRawIterator() { return in; }
+
+  /**
+   * Move to the next K-Vs pair
+   * @return true if another pair exists, otherwise false.
+   * @throws IOException 
+   */
+  public boolean moveToNext() throws IOException {
+    if (isFirstRecord) {
+      readNextKey();
+      key = nextKey;
+      nextKey = null;
+      hasMoreValues = more;
+      isFirstRecord = false;
+    } else {
+      nextKey();
+    }
+    return more;
+  }
+  
+  /** The current key. */
+  public KEY getKey() { 
+    return key; 
+  }
+  
+  public Iterable<VALUE> getValues() {
+    return new Iterable<VALUE>() {
+
+      @Override
+      public Iterator<VALUE> iterator() {
+        
+        return new Iterator<VALUE>() {
+
+          private final int keyNumber = keyCtr;
+          
+          @Override
+          public boolean hasNext() {
+            return hasMoreValues;
+          }
+
+          @Override
+          public VALUE next() {
+            if (!hasMoreValues) {
+              throw new NoSuchElementException("iterate past last value");
+            }
+            Preconditions
+                .checkState(
+                    keyNumber == keyCtr,
+                    "Cannot use values iterator on the previous K-V pair after moveToNext has been invoked to move to the next K-V pair");
+            
+            try {
+              readNextValue();
+              readNextKey();
+            } catch (IOException ie) {
+              throw new RuntimeException("problem advancing post rec#"+keyCtr, ie);
+            }
+            inputValueCounter.increment(1);
+            return value;
+          }
+
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException("Cannot remove elements");
+          }
+        };
+      }
+    };
+  }
+  
+  
+
+  /** Start processing next unique key. */
+  private void nextKey() throws IOException {
+    // read until we find a new key
+    while (hasMoreValues) { 
+      readNextKey();
+    }
+    if (more) {
+      inputKeyCounter.increment(1);
+      ++keyCtr;
+    }
+    
+    // move the next key to the current one
+    KEY tmpKey = key;
+    key = nextKey;
+    nextKey = tmpKey;
+    hasMoreValues = more;
+  }
+
+  /** 
+   * read the next key - which may be the same as the current key.
+   */
+  private void readNextKey() throws IOException {
+    more = in.next();
+    if (more) {      
+      DataInputBuffer nextKeyBytes = in.getKey();
+      keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
+      nextKey = keyDeserializer.deserialize(nextKey);
+      hasMoreValues = key != null && (comparator.compare(key, nextKey) == 0);
+    } else {
+      hasMoreValues = false;
+    }
+  }
+
+  /**
+   * Read the next value
+   * @throws IOException
+   */
+  private void readNextValue() throws IOException {
+    DataInputBuffer nextValueBytes = in.getValue();
+    valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
+    value = valDeserializer.deserialize(value);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
index e19e2c8..38b04d3 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
@@ -29,25 +29,24 @@ import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.newoutput.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.newapi.TezInputContext;
 
 @SuppressWarnings({"rawtypes"})
 public class LocalShuffle {
 
-  private final TezEngineTaskContext taskContext;
-  private final RunningTaskContext runningTaskContext;
+  // TODO NEWTEZ This is broken.
+
+  private final TezInputContext inputContext;
   private final Configuration conf;
-  private final int tasksInDegree;
+  private final int numInputs;
 
   private final Class keyClass;
   private final Class valClass;
@@ -60,18 +59,15 @@ public class LocalShuffle {
   private final CompressionCodec codec;
   private final TezTaskOutput mapOutputFile;
 
-  public LocalShuffle(TezEngineTaskContext taskContext, 
-      RunningTaskContext runningTaskContext, 
-      Configuration conf,
-      TezTaskReporter reporter
-      ) throws IOException {
-    this.taskContext = taskContext;
-    this.runningTaskContext = runningTaskContext;
+  public LocalShuffle(TezInputContext inputContext, Configuration conf, int numInputs) throws IOException {
+    this.inputContext = inputContext;
     this.conf = conf;
+    this.numInputs = numInputs;
+    
     this.keyClass = ConfigUtils.getIntermediateInputKeyClass(conf);
     this.valClass = ConfigUtils.getIntermediateInputValueClass(conf);
     this.comparator = ConfigUtils.getIntermediateInputKeyComparator(conf);
-
+    
     this.sortFactor =
         conf.getInt(
             TezJobConfig.TEZ_ENGINE_IO_SORT_FACTOR, 
@@ -79,10 +75,9 @@ public class LocalShuffle {
     
     this.rfs = FileSystem.getLocal(conf).getRaw();
 
-    this.spilledRecordsCounter = 
-        reporter.getCounter(TaskCounter.SPILLED_RECORDS);
+    this.spilledRecordsCounter = inputContext.getCounters().findCounter(TaskCounter.SPILLED_RECORDS);
     
-    // compression
+ // compression
     if (ConfigUtils.isIntermediateInputCompressed(conf)) {
       Class<? extends CompressionCodec> codecClass =
           ConfigUtils.getIntermediateInputCompressorClass(conf, DefaultCodec.class);
@@ -90,19 +85,16 @@ public class LocalShuffle {
     } else {
       this.codec = null;
     }
-
-    this.tasksInDegree = taskContext.getInputSpecList().get(0).getNumInputs();
-
+    
     // Always local
-    this.mapOutputFile = new TezLocalTaskOutputFiles();
-    this.mapOutputFile.setConf(conf);
-
+    this.mapOutputFile = new TezLocalTaskOutputFiles(conf, inputContext.getUniqueIdentifier());
   }
+ 
   
   public TezRawKeyValueIterator run() throws IOException {
     // Copy is complete, obviously! 
-    this.runningTaskContext.getProgress().addPhase("copy").complete();
 
+    
     // Merge
     return TezMerger.merge(conf, rfs, 
         keyClass, valClass,
@@ -110,17 +102,17 @@ public class LocalShuffle {
         getMapFiles(),
         false, 
         sortFactor,
-        new Path(taskContext.getTaskAttemptId().toString()), 
+        new Path(inputContext.getUniqueIdentifier()), // TODO NEWTEZ This is likely broken 
         comparator,
-        runningTaskContext.getTaskReporter(), spilledRecordsCounter, null, null);
+        null, spilledRecordsCounter, null, null);
   }
   
   private Path[] getMapFiles() 
   throws IOException {
     List<Path> fileList = new ArrayList<Path>();
       // for local jobs
-      for(int i = 0; i < tasksInDegree; ++i) {
-        fileList.add(mapOutputFile.getInputFile(i));
+      for(int i = 0; i < numInputs; ++i) {
+        //fileList.add(mapOutputFile.getInputFile(i));
       }
       
     return fileList.toArray(new Path[0]);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java b/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
deleted file mode 100644
index a3ac968..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/security/DelegationTokenRenewal.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.security;
-
-import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.LinkedBlockingQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.Credentials;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class DelegationTokenRenewal {
-  private static final Log LOG = LogFactory.getLog(DelegationTokenRenewal.class);
-  public static final String SCHEME = "hdfs";
-  
-  /**
-   * class that is used for keeping tracks of DT to renew
-   *
-   */
-  private static class DelegationTokenToRenew {
-    public final Token<?> token;
-    public final ApplicationId jobId;
-    public final Configuration conf;
-    public long expirationDate;
-    public TimerTask timerTask;
-    
-    public DelegationTokenToRenew(
-        ApplicationId jId, Token<?> t, 
-        Configuration newConf, long newExpirationDate) {
-      token = t;
-      jobId = jId;
-      conf = newConf;
-      expirationDate = newExpirationDate;
-      timerTask = null;
-      if(token==null || jobId==null || conf==null) {
-        throw new IllegalArgumentException("invalid params for Renew Token" +
-            ";t="+token+";j="+jobId+";c="+conf);
-      }
-    }
-    public void setTimerTask(TimerTask tTask) {
-      timerTask = tTask;
-    }
-    @Override
-    public String toString() {
-      return token + ";exp="+expirationDate;
-    }
-    @Override
-    public boolean equals (Object obj) {
-      if (obj == this) {
-        return true;
-      } else if (obj == null || getClass() != obj.getClass()) {
-        return false;
-      } else {
-        return token.equals(((DelegationTokenToRenew)obj).token);
-      }
-    }
-    @Override
-    public int hashCode() {
-      return token.hashCode();
-    }
-  }
-  
-  // global single timer (daemon)
-  private static Timer renewalTimer = new Timer(true);
-  
-  //delegation token canceler thread
-  private static DelegationTokenCancelThread dtCancelThread =
-    new DelegationTokenCancelThread();
-  static {
-    dtCancelThread.start();
-  }
-
-  
-  //managing the list of tokens using Map
-  // jobId=>List<tokens>
-  private static Set<DelegationTokenToRenew> delegationTokens = 
-    Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
-  
-  private static class DelegationTokenCancelThread extends Thread {
-    private static class TokenWithConf {
-      Token<?> token;
-      Configuration conf;
-      TokenWithConf(Token<?> token, Configuration conf) {
-        this.token = token;
-        this.conf = conf;
-      }
-    }
-    private LinkedBlockingQueue<TokenWithConf> queue =  
-      new LinkedBlockingQueue<TokenWithConf>();
-     
-    public DelegationTokenCancelThread() {
-      super("Delegation Token Canceler");
-      setDaemon(true);
-    }
-    public void cancelToken(Token<?> token,  
-        Configuration conf) {
-      TokenWithConf tokenWithConf = new TokenWithConf(token, conf);
-      while (!queue.offer(tokenWithConf)) {
-        LOG.warn("Unable to add token " + token + " for cancellation. " +
-        		 "Will retry..");
-        try {
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    }
-
-    public void run() {
-      while (true) {
-        TokenWithConf tokenWithConf = null;
-        try {
-          tokenWithConf = queue.take();
-          final TokenWithConf current = tokenWithConf;
-          
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Canceling token " + tokenWithConf.token.getService());
-          }
-          // need to use doAs so that http can find the kerberos tgt
-          UserGroupInformation.getLoginUser().doAs(
-              new PrivilegedExceptionAction<Void>() {
-
-                @Override
-                public Void run() throws Exception {
-                  current.token.cancel(current.conf);
-                  return null;
-                }
-              });
-        } catch (IOException e) {
-          LOG.warn("Failed to cancel token " + tokenWithConf.token + " " +  
-              StringUtils.stringifyException(e));
-        } catch (InterruptedException ie) {
-          return;
-        } catch (Throwable t) {
-          LOG.warn("Got exception " + StringUtils.stringifyException(t) + 
-                   ". Exiting..");
-          System.exit(-1);
-        }
-      }
-    }
-  }
-  //adding token
-  private static void addTokenToList(DelegationTokenToRenew t) {
-    delegationTokens.add(t);
-  }
-  
-  public static synchronized void registerDelegationTokensForRenewal(
-      ApplicationId jobId, Credentials ts, Configuration conf) throws IOException {
-    if(ts==null)
-      return; //nothing to add
-    
-    Collection <Token<?>> tokens = ts.getAllTokens();
-    long now = System.currentTimeMillis();
-
-    for (Token<?> t : tokens) {
-      // first renew happens immediately
-      if (t.isManaged()) {
-        DelegationTokenToRenew dtr = new DelegationTokenToRenew(jobId, t, conf,
-            now);
-
-        addTokenToList(dtr);
-
-        setTimerForTokenRenewal(dtr, true);
-        LOG.info("registering token for renewal for service =" + t.getService()
-            + " and jobID = " + jobId);
-      }
-    }
-  }
-    
-  /**
-   * Task - to renew a token
-   *
-   */
-  private static class RenewalTimerTask extends TimerTask {
-    private DelegationTokenToRenew dttr;
-    
-    RenewalTimerTask(DelegationTokenToRenew t) {  dttr = t;  }
-    
-    @Override
-    public void run() {
-      Token<?> token = dttr.token;
-      long newExpirationDate=0;
-      try {
-        // need to use doAs so that http can find the kerberos tgt
-        dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
-            new PrivilegedExceptionAction<Long>() {
-
-              @Override
-              public Long run() throws Exception {
-                return dttr.token.renew(dttr.conf);
-              }
-            });
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("renewing for:" + token.getService() + ";newED="
-              + dttr.expirationDate);
-        }
-        setTimerForTokenRenewal(dttr, false);// set the next one
-      } catch (Exception e) {
-        LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
-        removeFailedDelegationToken(dttr);
-      }
-    }
-  }
-  
-  /**
-   * find the soonest expiring token and set it for renew
-   */
-  private static void setTimerForTokenRenewal(
-      DelegationTokenToRenew token, boolean firstTime) {
-      
-    // calculate timer time
-    long now = System.currentTimeMillis();
-    long renewIn;
-    if(firstTime) {
-      renewIn = now;
-    } else {
-      long expiresIn = (token.expirationDate - now); 
-      renewIn = now + expiresIn - expiresIn/10; // little before expiration
-    }
-    
-    // need to create new timer every time
-    TimerTask tTask = new RenewalTimerTask(token);
-    token.setTimerTask(tTask); // keep reference to the timer
-
-    renewalTimer.schedule(token.timerTask, new Date(renewIn));
-  }
-
-  /**
-   * removing all tokens renewals
-   */
-  static public void close() {
-    renewalTimer.cancel();
-    delegationTokens.clear();
-  }
-  
-  // cancel a token
-  private static void cancelToken(DelegationTokenToRenew t) {
-    dtCancelThread.cancelToken(t.token, t.conf);
-  }
-  
-  /**
-   * removing failed DT
-   * @param jobId
-   */
-  private static void removeFailedDelegationToken(DelegationTokenToRenew t) {
-    ApplicationId jobId = t.jobId;
-    if (LOG.isDebugEnabled())
-      LOG.debug("removing failed delegation token for jobid=" + jobId + 
-          ";t=" + t.token.getService());
-    delegationTokens.remove(t);
-    // cancel the timer
-    if(t.timerTask!=null)
-      t.timerTask.cancel();
-  }
-  
-  /**
-   * removing DT for completed jobs
-   * @param jobId
-   */
-  public static void removeDelegationTokenRenewalForJob(ApplicationId jobId) {
-    synchronized (delegationTokens) {
-      Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
-      while(it.hasNext()) {
-        DelegationTokenToRenew dttr = it.next();
-        if (dttr.jobId.equals(jobId)) {
-          if (LOG.isDebugEnabled())
-            LOG.debug("removing delegation token for jobid=" + jobId + 
-                ";t=" + dttr.token.getService());
-
-          // cancel the timer
-          if(dttr.timerTask!=null)
-            dttr.timerTask.cancel();
-
-          // cancel the token
-          cancelToken(dttr);
-
-          it.remove();
-        }
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
deleted file mode 100644
index 51e05af..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/EventFetcher.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.shuffle.impl;
-
-import java.io.IOException;
-import java.net.URI;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.records.TezDependentTaskCompletionEvent;
-import org.apache.tez.engine.records.TezTaskDependencyCompletionEventsUpdate;
-
-class EventFetcher extends Thread {
-  private static final long SLEEP_TIME = 1000;
-  private static final int MAX_RETRIES = 10;
-  private static final int RETRY_PERIOD = 5000;
-  private static final Log LOG = LogFactory.getLog(EventFetcher.class);
-
-  private final TezTaskAttemptID reduce;
-  private final Master umbilical;
-  private final ShuffleScheduler scheduler;
-  private int fromEventIdx = 0;
-  private int maxEventsToFetch;
-  private Shuffle shuffle = null;
-  
-  private int maxMapRuntime = 0;
-
-  private volatile boolean stopped = false;
-  
-  public EventFetcher(TezTaskAttemptID reduce,
-                      Master umbilical,
-                      ShuffleScheduler scheduler,
-                      Shuffle shuffle,
-                      int maxEventsToFetch) {
-    setName("EventFetcher for fetching Map Completion Events");
-    setDaemon(true);    
-    this.reduce = reduce;
-    this.umbilical = umbilical;
-    this.scheduler = scheduler;
-    this.shuffle = shuffle;
-    this.maxEventsToFetch = maxEventsToFetch;
-  }
-
-  @Override
-  public void run() {
-    int failures = 0;
-    LOG.info(reduce + " Thread started: " + getName());
-    
-    try {
-      while (!stopped && !Thread.currentThread().isInterrupted()) {
-        try {
-          int numNewMaps = getMapCompletionEvents();
-          failures = 0;
-          if (numNewMaps > 0) {
-            LOG.info(reduce + ": " + "Got " + numNewMaps + " new map-outputs");
-          }
-          LOG.debug("GetMapEventsThread about to sleep for " + SLEEP_TIME);
-          if (!Thread.currentThread().isInterrupted()) {
-            Thread.sleep(SLEEP_TIME);
-          }
-        } catch (InterruptedException e) {
-          LOG.info("EventFetcher is interrupted.. Returning");
-          return;
-        } catch (IOException ie) {
-          LOG.info("Exception in getting events", ie);
-          // check to see whether to abort
-          if (++failures >= MAX_RETRIES) {
-            throw new IOException("too many failures downloading events", ie);
-          }
-          // sleep for a bit
-          if (!Thread.currentThread().isInterrupted()) {
-            Thread.sleep(RETRY_PERIOD);
-          }
-        }
-      }
-    } catch (InterruptedException e) {
-      return;
-    } catch (Throwable t) {
-      shuffle.reportException(t);
-      return;
-    }
-  }
-
-  public void shutDown() {
-    this.stopped = true;
-    interrupt();
-    try {
-      join(5000);
-    } catch(InterruptedException ie) {
-      LOG.warn("Got interrupted while joining " + getName(), ie);
-    }
-  }
-  
-  /** 
-   * Queries the {@link TaskTracker} for a set of map-completion events 
-   * from a given event ID.
-   * @throws IOException
-   */  
-  protected int getMapCompletionEvents() throws IOException {
-    
-    int numNewMaps = 0;
-    TezDependentTaskCompletionEvent events[] = null;
-
-    do {
-      TezTaskDependencyCompletionEventsUpdate update =
-          umbilical.getDependentTasksCompletionEvents(
-              fromEventIdx,
-              maxEventsToFetch,
-              reduce);
-      events = update.getDependentTaskCompletionEvents();
-      LOG.debug("Got " + events.length + " map completion events from " +
-               fromEventIdx);
-      // Check if the reset is required.
-      // Since there is no ordering of the task completion events at the
-      // reducer, the only option to sync with the new jobtracker is to reset
-      // the events index
-      if (update.shouldReset()) {
-        fromEventIdx = 0;
-        scheduler.resetKnownMaps();
-      }
-
-      // Update the last seen event ID
-      fromEventIdx += events.length;
-
-      // Process the TaskCompletionEvents:
-      // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
-      // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop
-      //    fetching from those maps.
-      // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
-      //    outputs at all.
-      for (TezDependentTaskCompletionEvent event : events) {
-        byte[] userPayload = event.getUserPayload();
-        if(userPayload != null) {
-          shuffle.updateUserPayload(userPayload);
-        }
-        switch (event.getStatus()) {
-        case SUCCEEDED:
-          addMapHosts(event);
-          numNewMaps ++;
-          int duration = event.getTaskRunTime();
-          if (duration > maxMapRuntime) {
-            maxMapRuntime = duration;
-            scheduler.informMaxMapRunTime(maxMapRuntime);
-          }
-          break;
-        case FAILED:
-        case KILLED:
-        case OBSOLETE:
-          scheduler.obsoleteMapOutput(event.getTaskAttemptID());
-          LOG.info("Ignoring obsolete output of " + event.getStatus() + 
-              " map-task: '" + event.getTaskAttemptID() + "'");
-          break;
-        case TIPFAILED:
-          scheduler.tipFailed(event.getTaskAttemptID().getTaskID());
-          LOG.info("Ignoring output of failed map TIP: '" +  
-              event.getTaskAttemptID() + "'");
-          break;
-        }
-      }
-    } while (events.length == maxEventsToFetch);
-
-    return numNewMaps;
-  }
-  
-  private void addMapHosts(TezDependentTaskCompletionEvent event) {
-    int reduceRange = shuffle.getReduceRange();
-    for(int i=0; i<reduceRange; ++i) {
-      int partitionId = reduce.getTaskID().getId()+i;
-      URI u = getBaseURI(event.getTaskTrackerHttp(), partitionId);
-      scheduler.addKnownMapOutput(u.getHost() + ":" + u.getPort(),
-          partitionId,
-          u.toString(),
-          event.getTaskAttemptID());
-    }
-  }
-  
-  private URI getBaseURI(String url, int reduceId) {
-    StringBuffer baseUrl = new StringBuffer(url);
-    if (!url.endsWith("/")) {
-      baseUrl.append("/");
-    }
-    baseUrl.append("mapOutput?job=");
-    // TODO TEZ HACK to get shuffle working. ApplicationId vs JobId shuffle handler.
-    // FIXME dag or application or ???
-    String jobID = reduce.getTaskID().getVertexID().getDAGId().
-        getApplicationId().toString().replace("application", "job");
-
-    baseUrl.append(jobID);
-    baseUrl.append("&reduce=");
-    baseUrl.append(reduceId);
-    baseUrl.append("&map=");
-    URI u = URI.create(baseUrl.toString());
-    return u;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
index 0acceaf..86e5b56 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/Fetcher.java
@@ -43,17 +43,16 @@ import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.tez.common.IDUtils;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.common.TezTaskReporter;
 import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.security.SecureShuffleUtils;
 import org.apache.tez.engine.common.shuffle.impl.MapOutput.Type;
+import org.apache.tez.engine.common.shuffle.server.ShuffleHandler;
 import org.apache.tez.engine.common.sort.impl.IFileInputStream;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.shuffle.common.ShuffleUtils;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -63,8 +62,7 @@ class Fetcher extends Thread {
   
   /** Basic/unit connection timeout (in milliseconds) */
   private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-  
-  private final Progressable reporter;
+
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
@@ -99,27 +97,28 @@ class Fetcher extends Thread {
 
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
-      TezTaskReporter reporter, ShuffleClientMetrics metrics,
-      Shuffle shuffle, SecretKey jobTokenSecret) {
+      ShuffleClientMetrics metrics,
+      Shuffle shuffle, TezInputContext inputContext) throws IOException {
     this.job = job;
-    this.reporter = reporter;
     this.scheduler = scheduler;
     this.merger = merger;
     this.metrics = metrics;
     this.shuffle = shuffle;
     this.id = ++nextId;
-    this.jobTokenSecret = jobTokenSecret;
-    ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    this.jobTokenSecret = ShuffleUtils
+        .getJobTokenSecretFromTokenBytes(inputContext
+            .getServiceConsumerMetaData(ShuffleHandler.MAPREDUCE_SHUFFLE_SERVICEID));
+    ioErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.IO_ERROR.toString());
-    wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    wrongLengthErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_LENGTH.toString());
-    badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    badIdErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.BAD_ID.toString());
-    wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    wrongMapErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_MAP.toString());
-    connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    connectionErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.CONNECTION.toString());
-    wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME,
+    wrongReduceErrs = inputContext.getCounters().findCounter(SHUFFLE_ERR_GRP_NAME,
         ShuffleErrors.WRONG_REDUCE.toString());
 
     if (ConfigUtils.isIntermediateInputCompressed(job)) {
@@ -156,6 +155,7 @@ class Fetcher extends Thread {
       }
     }
   }
+  
   public void run() {
     try {
       while (!stopped && !Thread.currentThread().isInterrupted()) {
@@ -221,28 +221,28 @@ class Fetcher extends Thread {
   @VisibleForTesting
   protected void copyFromHost(MapHost host) throws IOException {
     // Get completed maps on 'host'
-    List<TezTaskAttemptID> maps = scheduler.getMapsForHost(host);
+    List<TaskAttemptIdentifier> srcAttempts = scheduler.getMapsForHost(host);
     
     // Sanity check to catch hosts with only 'OBSOLETE' maps, 
     // especially at the tail of large jobs
-    if (maps.size() == 0) {
+    if (srcAttempts.size() == 0) {
       return;
     }
     
     if(LOG.isDebugEnabled()) {
       LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
-        + maps);
+        + srcAttempts);
     }
     
     // List of maps to be fetched yet
-    Set<TezTaskAttemptID> remaining = new HashSet<TezTaskAttemptID>(maps);
+    Set<TaskAttemptIdentifier> remaining = new HashSet<TaskAttemptIdentifier>(srcAttempts);
     
     // Construct the url and connect
     DataInputStream input;
     boolean connectSucceeded = false;
     
     try {
-      URL url = getMapOutputURL(host, maps);
+      URL url = getMapOutputURL(host, srcAttempts);
       HttpURLConnection connection = openConnection(url);
       
       // generate hash of the url
@@ -294,19 +294,19 @@ class Fetcher extends Thread {
       // If connect did not succeed, just mark all the maps as failed,
       // indirectly penalizing the host
       if (!connectSucceeded) {
-        for(TezTaskAttemptID left: remaining) {
+        for(TaskAttemptIdentifier left: remaining) {
           scheduler.copyFailed(left, host, connectSucceeded);
         }
       } else {
         // If we got a read error at this stage, it implies there was a problem
         // with the first map, typically lost map. So, penalize only that map
         // and add the rest
-        TezTaskAttemptID firstMap = maps.get(0);
+        TaskAttemptIdentifier firstMap = srcAttempts.get(0);
         scheduler.copyFailed(firstMap, host, connectSucceeded);
       }
       
       // Add back all the remaining maps, WITHOUT marking them as failed
-      for(TezTaskAttemptID left: remaining) {
+      for(TaskAttemptIdentifier left: remaining) {
         scheduler.putBackKnownMapOutput(host, left);
       }
       
@@ -318,14 +318,14 @@ class Fetcher extends Thread {
       // On any error, faildTasks is not null and we exit
       // after putting back the remaining maps to the 
       // yet_to_be_fetched list and marking the failed tasks.
-      TezTaskAttemptID[] failedTasks = null;
+      TaskAttemptIdentifier[] failedTasks = null;
       while (!remaining.isEmpty() && failedTasks == null) {
         failedTasks = copyMapOutput(host, input, remaining);
       }
       
       if(failedTasks != null && failedTasks.length > 0) {
         LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
-        for(TezTaskAttemptID left: failedTasks) {
+        for(TaskAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true);
         }
       }
@@ -338,19 +338,19 @@ class Fetcher extends Thread {
             + remaining.size() + " left.");
       }
     } finally {
-      for (TezTaskAttemptID left : remaining) {
+      for (TaskAttemptIdentifier left : remaining) {
         scheduler.putBackKnownMapOutput(host, left);
       }
     }
   }
   
-  private static TezTaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TezTaskAttemptID[0];
+  private static TaskAttemptIdentifier[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptIdentifier[0];
   
-  private TezTaskAttemptID[] copyMapOutput(MapHost host,
+  private TaskAttemptIdentifier[] copyMapOutput(MapHost host,
                                 DataInputStream input,
-                                Set<TezTaskAttemptID> remaining) {
+                                Set<TaskAttemptIdentifier> remaining) {
     MapOutput mapOutput = null;
-    TezTaskAttemptID mapId = null;
+    TaskAttemptIdentifier srcAttemptId = null;
     long decompressedLength = -1;
     long compressedLength = -1;
     
@@ -361,7 +361,8 @@ class Fetcher extends Thread {
       try {
         ShuffleHeader header = new ShuffleHeader();
         header.readFields(input);
-        mapId = IDUtils.toTaskAttemptId(header.mapId);
+        String pathComponent = header.mapId;
+        srcAttemptId = scheduler.getIdentifierForPathComponent(pathComponent);
         compressedLength = header.compressedLength;
         decompressedLength = header.uncompressedLength;
         forReduce = header.forReduce;
@@ -369,23 +370,23 @@ class Fetcher extends Thread {
         badIdErrs.increment(1);
         LOG.warn("Invalid map id ", e);
         //Don't know which one was bad, so consider all of them as bad
-        return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+        return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
       }
 
  
       // Do some basic sanity verification
       if (!verifySanity(compressedLength, decompressedLength, forReduce,
-          remaining, mapId)) {
-        return new TezTaskAttemptID[] {mapId};
+          remaining, srcAttemptId)) {
+        return new TaskAttemptIdentifier[] {srcAttemptId};
       }
       
       if(LOG.isDebugEnabled()) {
-        LOG.debug("header: " + mapId + ", len: " + compressedLength + 
+        LOG.debug("header: " + srcAttemptId + ", len: " + compressedLength + 
             ", decomp len: " + decompressedLength);
       }
       
       // Get the location for the map output - either in-memory or on-disk
-      mapOutput = merger.reserve(mapId, decompressedLength, id);
+      mapOutput = merger.reserve(srcAttemptId, decompressedLength, id);
       
       // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
@@ -396,7 +397,7 @@ class Fetcher extends Thread {
       
       // Go!
       LOG.info("fetcher#" + id + " about to shuffle output of map " + 
-               mapOutput.getMapId() + " decomp: " +
+               mapOutput.getAttemptIdentifier() + " decomp: " +
                decompressedLength + " len: " + compressedLength + " to " +
                mapOutput.getType());
       if (mapOutput.getType() == Type.MEMORY) {
@@ -408,32 +409,32 @@ class Fetcher extends Thread {
       
       // Inform the shuffle scheduler
       long endTime = System.currentTimeMillis();
-      scheduler.copySucceeded(mapId, host, compressedLength, 
+      scheduler.copySucceeded(srcAttemptId, host, compressedLength, 
                               endTime - startTime, mapOutput);
       // Note successful shuffle
-      remaining.remove(mapId);
+      remaining.remove(srcAttemptId);
       metrics.successFetch();
       return null;
     } catch (IOException ioe) {
       ioErrs.increment(1);
-      if (mapId == null || mapOutput == null) {
+      if (srcAttemptId == null || mapOutput == null) {
         LOG.info("fetcher#" + id + " failed to read map header" + 
-                 mapId + " decomp: " + 
+                 srcAttemptId + " decomp: " + 
                  decompressedLength + ", " + compressedLength, ioe);
-        if(mapId == null) {
-          return remaining.toArray(new TezTaskAttemptID[remaining.size()]);
+        if(srcAttemptId == null) {
+          return remaining.toArray(new TaskAttemptIdentifier[remaining.size()]);
         } else {
-          return new TezTaskAttemptID[] {mapId};
+          return new TaskAttemptIdentifier[] {srcAttemptId};
         }
       }
       
-      LOG.warn("Failed to shuffle output of " + mapId + 
+      LOG.warn("Failed to shuffle output of " + srcAttemptId + 
                " from " + host.getHostName(), ioe); 
 
       // Inform the shuffle-scheduler
       mapOutput.abort();
       metrics.failedFetch();
-      return new TezTaskAttemptID[] {mapId};
+      return new TaskAttemptIdentifier[] {srcAttemptId};
     }
 
   }
@@ -448,11 +449,11 @@ class Fetcher extends Thread {
    * @return true/false, based on if the verification succeeded or not
    */
   private boolean verifySanity(long compressedLength, long decompressedLength,
-      int forReduce, Set<TezTaskAttemptID> remaining, TezTaskAttemptID mapId) {
+      int forReduce, Set<TaskAttemptIdentifier> remaining, TaskAttemptIdentifier srcAttemptId) {
     if (compressedLength < 0 || decompressedLength < 0) {
       wrongLengthErrs.increment(1);
       LOG.warn(getName() + " invalid lengths in map output header: id: " +
-               mapId + " len: " + compressedLength + ", decomp len: " + 
+          srcAttemptId + " len: " + compressedLength + ", decomp len: " + 
                decompressedLength);
       return false;
     }
@@ -462,15 +463,15 @@ class Fetcher extends Thread {
     if (forReduce < reduceStartId || forReduce >= reduceStartId+reduceRange) {
       wrongReduceErrs.increment(1);
       LOG.warn(getName() + " data for the wrong reduce map: " +
-               mapId + " len: " + compressedLength + " decomp len: " +
+               srcAttemptId + " len: " + compressedLength + " decomp len: " +
                decompressedLength + " for reduce " + forReduce);
       return false;
     }
 
     // Sanity check
-    if (!remaining.contains(mapId)) {
+    if (!remaining.contains(srcAttemptId)) {
       wrongMapErrs.increment(1);
-      LOG.warn("Invalid map-output! Received output for " + mapId);
+      LOG.warn("Invalid map-output! Received output for " + srcAttemptId);
       return false;
     }
     
@@ -485,17 +486,17 @@ class Fetcher extends Thread {
    * @return
    * @throws MalformedURLException
    */
-  private URL getMapOutputURL(MapHost host, List<TezTaskAttemptID> maps
+  private URL getMapOutputURL(MapHost host, List<TaskAttemptIdentifier> srcAttempts
                               )  throws MalformedURLException {
     // Get the base url
     StringBuffer url = new StringBuffer(host.getBaseUrl());
     
     boolean first = true;
-    for (TezTaskAttemptID mapId : maps) {
+    for (TaskAttemptIdentifier mapId : srcAttempts) {
       if (!first) {
         url.append(",");
       }
-      url.append(mapId);
+      url.append(mapId.getPathComponent());
       first = false;
     }
    
@@ -566,9 +567,8 @@ class Fetcher extends Thread {
     try {
       IOUtils.readFully(input, shuffleData, 0, shuffleData.length);
       metrics.inputBytes(shuffleData.length);
-      reporter.progress();
       LOG.info("Read " + shuffleData.length + " bytes from map-output for " +
-               mapOutput.getMapId());
+               mapOutput.getAttemptIdentifier());
     } catch (IOException ioe) {      
       // Close the streams
       IOUtils.cleanup(LOG, input);
@@ -593,17 +593,16 @@ class Fetcher extends Thread {
         int n = input.read(buf, 0, (int) Math.min(bytesLeft, BYTES_TO_READ));
         if (n < 0) {
           throw new IOException("read past end of stream reading " + 
-                                mapOutput.getMapId());
+                                mapOutput.getAttemptIdentifier());
         }
         output.write(buf, 0, n);
         bytesLeft -= n;
         metrics.inputBytes(n);
-        reporter.progress();
       }
 
       LOG.info("Read " + (compressedLength - bytesLeft) + 
                " bytes from map-output for " +
-               mapOutput.getMapId());
+               mapOutput.getAttemptIdentifier());
 
       output.close();
     } catch (IOException ioe) {
@@ -617,7 +616,7 @@ class Fetcher extends Thread {
     // Sanity check
     if (bytesLeft != 0) {
       throw new IOException("Incomplete map output received for " +
-                            mapOutput.getMapId() + " from " +
+                            mapOutput.getAttemptIdentifier() + " from " +
                             host.getHostName() + " (" + 
                             bytesLeft + " bytes missing of " + 
                             compressedLength + ")"

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
index 7cea558..d10ebaa 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/InMemoryReader.java
@@ -25,8 +25,6 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.IFile.Reader;
 
@@ -36,14 +34,14 @@ import org.apache.tez.engine.common.sort.impl.IFile.Reader;
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class InMemoryReader extends Reader {
-  private final TezTaskAttemptID taskAttemptId;
+  private final TaskAttemptIdentifier taskAttemptId;
   private final MergeManager merger;
   DataInputBuffer memDataIn = new DataInputBuffer();
   private int start;
   private int length;
   private int prevKeyPos;
 
-  public InMemoryReader(MergeManager merger, TezTaskAttemptID taskAttemptId,
+  public InMemoryReader(MergeManager merger, TaskAttemptIdentifier taskAttemptId,
                         byte[] data, int start, int length)
   throws IOException {
     super(null, null, length - start, null, null);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
index 24f7635..cd644de 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapHost.java
@@ -20,9 +20,6 @@ package org.apache.tez.engine.common.shuffle.impl;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-
 class MapHost {
   
   public static enum State {
@@ -37,7 +34,8 @@ class MapHost {
   private final int partitionId;
   private final String baseUrl;
   private final String identifier;
-  private List<TezTaskAttemptID> maps = new ArrayList<TezTaskAttemptID>();
+  // Tracks attempt IDs
+  private List<TaskAttemptIdentifier> maps = new ArrayList<TaskAttemptIdentifier>();
   
   public MapHost(int partitionId, String hostName, String baseUrl) {
     this.partitionId = partitionId;
@@ -70,16 +68,16 @@ class MapHost {
     return baseUrl;
   }
 
-  public synchronized void addKnownMap(TezTaskAttemptID mapId) {
-    maps.add(mapId);
+  public synchronized void addKnownMap(TaskAttemptIdentifier srcAttempt) {
+    maps.add(srcAttempt);
     if (state == State.IDLE) {
       state = State.PENDING;
     }
   }
-  
-  public synchronized List<TezTaskAttemptID> getAndClearKnownMaps() {
-    List<TezTaskAttemptID> currentKnownMaps = maps;
-    maps = new ArrayList<TezTaskAttemptID>();
+
+  public synchronized List<TaskAttemptIdentifier> getAndClearKnownMaps() {
+    List<TaskAttemptIdentifier> currentKnownMaps = maps;
+    maps = new ArrayList<TaskAttemptIdentifier>();
     return currentKnownMaps;
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/1cf7f197/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
index 272709e..f0b48fd 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
@@ -29,8 +29,8 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
-import org.apache.tez.dag.records.TezTaskAttemptID;
-import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+
 
 class MapOutput {
   private static final Log LOG = LogFactory.getLog(MapOutput.class);
@@ -42,10 +42,10 @@ class MapOutput {
     DISK
   }
   
+  private TaskAttemptIdentifier attemptIdentifier;
   private final int id;
   
   private final MergeManager merger;
-  private final TezTaskAttemptID mapId;
   
   private final long size;
   
@@ -61,13 +61,13 @@ class MapOutput {
   
   private final boolean primaryMapOutput;
   
-  MapOutput(TezTaskAttemptID mapId, MergeManager merger, long size, 
+  MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, long size, 
             Configuration conf, LocalDirAllocator localDirAllocator,
             int fetcher, boolean primaryMapOutput, 
             TezTaskOutputFiles mapOutputFile)
          throws IOException {
     this.id = ID.incrementAndGet();
-    this.mapId = mapId;
+    this.attemptIdentifier = attemptIdentifier;
     this.merger = merger;
 
     type = Type.DISK;
@@ -79,7 +79,7 @@ class MapOutput {
     
     this.localFS = FileSystem.getLocal(conf);
     outputPath =
-      mapOutputFile.getInputFileForWrite(mapId.getTaskID(),size);
+      mapOutputFile.getInputFileForWrite(this.attemptIdentifier.getTaskIndex(), size);
     tmpOutputPath = outputPath.suffix(String.valueOf(fetcher));
 
     disk = localFS.create(tmpOutputPath);
@@ -87,10 +87,10 @@ class MapOutput {
     this.primaryMapOutput = primaryMapOutput;
   }
   
-  MapOutput(TezTaskAttemptID mapId, MergeManager merger, int size, 
+  MapOutput(TaskAttemptIdentifier attemptIdentifier, MergeManager merger, int size, 
             boolean primaryMapOutput) {
     this.id = ID.incrementAndGet();
-    this.mapId = mapId;
+    this.attemptIdentifier = attemptIdentifier;
     this.merger = merger;
 
     type = Type.MEMORY;
@@ -107,10 +107,10 @@ class MapOutput {
     this.primaryMapOutput = primaryMapOutput;
   }
 
-  public MapOutput(TezTaskAttemptID mapId) {
+  public MapOutput(TaskAttemptIdentifier attemptIdentifier) {
     this.id = ID.incrementAndGet();
-    this.mapId = mapId;
-    
+    this.attemptIdentifier = attemptIdentifier;
+
     type = Type.WAIT;
     merger = null;
     memory = null;
@@ -159,8 +159,8 @@ class MapOutput {
     return disk;
   }
 
-  public TezTaskAttemptID getMapId() {
-    return mapId;
+  public TaskAttemptIdentifier getAttemptIdentifier() {
+    return this.attemptIdentifier;
   }
 
   public Type getType() {
@@ -198,7 +198,7 @@ class MapOutput {
   }
   
   public String toString() {
-    return "MapOutput(" + mapId + ", " + type + ")";
+    return "MapOutput( AttemptIdentifier: " + attemptIdentifier + ", Type: " + type + ")";
   }
   
   public static class MapOutputComparator