You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/09/25 09:31:21 UTC

[14/50] [abbrv] Rename *.new* packages back to what they should be, remove dead code from the old packages - mapreduce module - tez-engine module (part of TEZ-398). (sseth)

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
new file mode 100644
index 0000000..2e10a93
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezInputContextImpl.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezInputContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public class TezInputContextImpl extends TezTaskContextImpl
+    implements TezInputContext {
+
+  private final byte[] userPayload;
+  private final String sourceVertexName;
+  private final EventMetaData sourceInfo;
+
+  @Private
+  public TezInputContextImpl(Configuration conf, int appAttemptNumber,
+      TezUmbilical tezUmbilical, String taskVertexName,
+      String sourceVertexName, TezTaskAttemptID taskAttemptID,
+      TezCounters counters, byte[] userPayload,
+      RuntimeTask runtimeTask, Map<String, ByteBuffer> serviceConsumerMetadata) {
+    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+    this.userPayload = userPayload;
+    this.sourceVertexName = sourceVertexName;
+    this.sourceInfo = new EventMetaData(
+        EventProducerConsumerType.INPUT, taskVertexName, sourceVertexName,
+        taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getTaskAttemptNumber(), sourceVertexName);
+  }
+
+  @Override
+  public void sendEvents(List<Event> events) {
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+
+  @Override
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  @Override
+  public String getSourceVertexName() {
+    return sourceVertexName;
+  }
+
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
new file mode 100644
index 0000000..ef58de2
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezOutputContextImpl.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezOutputContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public class TezOutputContextImpl extends TezTaskContextImpl
+    implements TezOutputContext {
+
+  private final byte[] userPayload;
+  private final String destinationVertexName;
+  private final EventMetaData sourceInfo;
+
+  @Private
+  public TezOutputContextImpl(Configuration conf, int appAttemptNumber,
+      TezUmbilical tezUmbilical, String taskVertexName,
+      String destinationVertexName,
+      TezTaskAttemptID taskAttemptID, TezCounters counters,
+      byte[] userPayload, RuntimeTask runtimeTask,
+      Map<String, ByteBuffer> serviceConsumerMetadata) {
+    super(conf, appAttemptNumber, taskVertexName, taskAttemptID,
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+    this.userPayload = userPayload;
+    this.destinationVertexName = destinationVertexName;
+    this.sourceInfo = new EventMetaData(EventProducerConsumerType.OUTPUT,
+        taskVertexName, destinationVertexName, taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d_%s", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getTaskAttemptNumber(), destinationVertexName);
+  }
+
+  @Override
+  public void sendEvents(List<Event> events) {
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+
+  @Override
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  @Override
+  public String getDestinationVertexName() {
+    return destinationVertexName;
+  }
+
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
new file mode 100644
index 0000000..3f20d5c
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezProcessorContextImpl.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.api.impl.EventMetaData.EventProducerConsumerType;
+import org.apache.tez.engine.newapi.Event;
+import org.apache.tez.engine.newapi.TezProcessorContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public class TezProcessorContextImpl extends TezTaskContextImpl
+  implements TezProcessorContext {
+
+  private final byte[] userPayload;
+  private final EventMetaData sourceInfo;
+
+  public TezProcessorContextImpl(Configuration conf, int appAttemptNumber,
+      TezUmbilical tezUmbilical, String vertexName,
+      TezTaskAttemptID taskAttemptID, TezCounters counters,
+      byte[] userPayload, RuntimeTask runtimeTask,
+      Map<String, ByteBuffer> serviceConsumerMetadata) {
+    super(conf, appAttemptNumber, vertexName, taskAttemptID,
+        counters, runtimeTask, tezUmbilical, serviceConsumerMetadata);
+    this.userPayload = userPayload;
+    this.sourceInfo = new EventMetaData(EventProducerConsumerType.PROCESSOR,
+        taskVertexName, "", taskAttemptID);
+    this.uniqueIdentifier = String.format("%s_%s_%06d_%02d", taskAttemptID
+        .getTaskID().getVertexID().getDAGId().toString(), taskVertexName,
+        getTaskIndex(), getTaskAttemptNumber());
+  }
+
+  @Override
+  public void sendEvents(List<Event> events) {
+    List<TezEvent> tezEvents = new ArrayList<TezEvent>(events.size());
+    for (Event e : events) {
+      TezEvent tEvt = new TezEvent(e, sourceInfo);
+      tezEvents.add(tEvt);
+    }
+    tezUmbilical.addEvents(tezEvents);
+  }
+
+  @Override
+  public byte[] getUserPayload() {
+    return userPayload;
+  }
+
+  @Override
+  public void setProgress(float progress) {
+    runtimeTask.setProgress(progress);
+  }
+
+  @Override
+  public void fatalError(Throwable exception, String message) {
+    super.signalFatalError(exception, message, sourceInfo);
+  }
+
+  @Override
+  public boolean canCommit() throws IOException {
+    return tezUmbilical.canCommit(this.taskAttemptID);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
new file mode 100644
index 0000000..2312c49
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezTaskContextImpl.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.AuxiliaryServiceHelper;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.engine.newapi.TezTaskContext;
+import org.apache.tez.engine.newruntime.RuntimeTask;
+
+public abstract class TezTaskContextImpl implements TezTaskContext {
+
+  private final Configuration conf;
+  protected final String taskVertexName;
+  protected final TezTaskAttemptID taskAttemptID;
+  private final TezCounters counters;
+  private String[] workDirs;
+  protected String uniqueIdentifier;
+  protected final RuntimeTask runtimeTask;
+  protected final TezUmbilical tezUmbilical;
+  private final Map<String, ByteBuffer> serviceConsumerMetadata;
+  private final int appAttemptNumber;
+
+  @Private
+  public TezTaskContextImpl(Configuration conf, int appAttemptNumber,
+      String taskVertexName, TezTaskAttemptID taskAttemptID,
+      TezCounters counters, RuntimeTask runtimeTask,
+      TezUmbilical tezUmbilical, Map<String, ByteBuffer> serviceConsumerMetadata) {
+    this.conf = conf;
+    this.taskVertexName = taskVertexName;
+    this.taskAttemptID = taskAttemptID;
+    this.counters = counters;
+    // TODO Maybe change this to be task id specific at some point. For now
+    // Shuffle code relies on this being a path specified by YARN
+    this.workDirs = this.conf.getStrings(TezJobConfig.LOCAL_DIRS);
+    this.runtimeTask = runtimeTask;
+    this.tezUmbilical = tezUmbilical;
+    this.serviceConsumerMetadata = serviceConsumerMetadata;
+    // TODO NEWTEZ at some point dag attempt should not map to app attempt
+    this.appAttemptNumber = appAttemptNumber;
+  }
+
+  @Override
+  public ApplicationId getApplicationId() {
+    return taskAttemptID.getTaskID().getVertexID().getDAGId()
+        .getApplicationId();
+  }
+
+  @Override
+  public int getTaskIndex() {
+    return taskAttemptID.getTaskID().getId();
+  }
+
+  @Override
+  public int getDAGAttemptNumber() {
+    return appAttemptNumber;
+  }
+
+  @Override
+  public int getTaskAttemptNumber() {
+    return taskAttemptID.getId();
+  }
+
+  @Override
+  public String getDAGName() {
+    // TODO NEWTEZ Change to some form of the DAG name, for now using dagId as
+    // the unique identifier.
+    return taskAttemptID.getTaskID().getVertexID().getDAGId().toString();
+  }
+
+  @Override
+  public String getTaskVertexName() {
+    return taskVertexName;
+  }
+
+
+  @Override
+  public TezCounters getCounters() {
+    return counters;
+  }
+
+  @Override
+  public String[] getWorkDirs() {
+    return Arrays.copyOf(workDirs, workDirs.length);
+  }
+
+  @Override
+  public String getUniqueIdentifier() {
+    return uniqueIdentifier;
+  }
+
+  @Override
+  public ByteBuffer getServiceConsumerMetaData(String serviceName) {
+    return (ByteBuffer) serviceConsumerMetadata.get(serviceName)
+        .asReadOnlyBuffer().rewind();
+  }
+
+  @Override
+  public ByteBuffer getServiceProviderMetaData(String serviceName) {
+    return AuxiliaryServiceHelper.getServiceDataFromEnv(
+        serviceName, System.getenv());
+  }
+
+  protected void signalFatalError(Throwable t, String message,
+      EventMetaData sourceInfo) {
+    runtimeTask.setFatalError(t, message);
+    String diagnostics;
+    if (t != null && message != null) {
+      diagnostics = "exceptionThrown=" + StringUtils.stringifyException(t)
+          + ", errorMessage=" + message;
+    } else if (t == null && message == null) {
+      diagnostics = "Unknown error";
+    } else {
+      diagnostics = t != null ?
+          "exceptionThrown=" + StringUtils.stringifyException(t)
+          : " errorMessage=" + message;
+    }
+    tezUmbilical.signalFatalError(taskAttemptID, diagnostics, sourceInfo);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
new file mode 100644
index 0000000..925d87b
--- /dev/null
+++ b/tez-engine/src/main/java/org/apache/tez/engine/api/impl/TezUmbilical.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.engine.api.impl;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public interface TezUmbilical {
+
+  public void addEvents(Collection<TezEvent> events);
+
+  public void signalFatalError(TezTaskAttemptID taskAttemptID,
+      String diagnostics,
+      EventMetaData sourceInfo);
+
+  public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
index 78d2e0c..927f0ad 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastInputManager.java
@@ -26,7 +26,7 @@ import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.engine.newapi.TezInputContext;
 import org.apache.tez.engine.shuffle.common.DiskFetchedInput;
 import org.apache.tez.engine.shuffle.common.FetchedInput;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
index b36c240..0b86a8e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/input/BroadcastKVReader.java
@@ -30,10 +30,10 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.serializer.Deserializer;
 import org.apache.hadoop.io.serializer.SerializationFactory;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.tez.engine.api.KVReader;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.shuffle.impl.InMemoryReader;
 import org.apache.tez.engine.common.sort.impl.IFile;
-import org.apache.tez.engine.newapi.KVReader;
 import org.apache.tez.engine.shuffle.common.FetchedInput;
 import org.apache.tez.engine.shuffle.common.FetchedInput.Type;
 import org.apache.tez.engine.shuffle.common.MemoryFetchedInput;

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
index 9f3dbbe..84ddd28 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/broadcast/output/FileBasedKVWriter.java
@@ -29,13 +29,13 @@ import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
+import org.apache.tez.engine.api.KVWriter;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.TezEngineUtils;
 import org.apache.tez.engine.common.sort.impl.IFile;
 import org.apache.tez.engine.common.sort.impl.TezIndexRecord;
 import org.apache.tez.engine.common.sort.impl.TezSpillRecord;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
-import org.apache.tez.engine.newapi.KVWriter;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.newapi.TezOutputContext;
 
 public class FileBasedKVWriter implements KVWriter {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
index 3920ce6..ab78d82 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/TezEngineUtils.java
@@ -30,8 +30,8 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.engine.api.Partitioner;
 import org.apache.tez.engine.common.combine.Combiner;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.engine.newapi.TezOutputContext;
 import org.apache.tez.engine.newapi.TezTaskContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
deleted file mode 100644
index bf504bb..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineInput.java
+++ /dev/null
@@ -1,176 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.combine;
-
-import java.io.IOException;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.hadoop.io.serializer.Deserializer;
-import org.apache.hadoop.util.Progressable;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-@SuppressWarnings({ "unchecked", "rawtypes" })
-public class CombineInput implements Input {
-
-  private final TezRawKeyValueIterator input;
-  private TezCounter inputValueCounter;
-  private TezCounter inputKeyCounter;
-  private RawComparator<Object> comparator;
-  private Object key;                                  // current key
-  private Object value;                              // current value
-  private boolean firstValue = false;                 // first value in key
-  private boolean nextKeyIsSame = false;              // more w/ this key
-  private boolean hasMore;                            // more in file
-  protected Progressable reporter;
-  private Deserializer keyDeserializer;
-  private Deserializer valueDeserializer;
-  private DataInputBuffer buffer = new DataInputBuffer();
-  private BytesWritable currentRawKey = new BytesWritable();
-  private ValueIterable iterable = new ValueIterable();
-  
-  public CombineInput(TezRawKeyValueIterator kvIter) {
-    this.input = kvIter;
-  }
-
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-  }
-
-  public boolean hasNext() throws IOException, InterruptedException {
-    while (hasMore && nextKeyIsSame) {
-      nextKeyValue();
-    }
-    if (hasMore) {
-      if (inputKeyCounter != null) {
-        inputKeyCounter.increment(1);
-      }
-      return nextKeyValue();
-    } else {
-      return false;
-    }
-  }
-
-  private boolean nextKeyValue() throws IOException, InterruptedException {
-    if (!hasMore) {
-      key = null;
-      value = null;
-      return false;
-    }
-    firstValue = !nextKeyIsSame;
-    DataInputBuffer nextKey = input.getKey();
-    currentRawKey.set(nextKey.getData(), nextKey.getPosition(), 
-                      nextKey.getLength() - nextKey.getPosition());
-    buffer.reset(currentRawKey.getBytes(), 0, currentRawKey.getLength());
-    key = keyDeserializer.deserialize(key);
-    DataInputBuffer nextVal = input.getValue();
-    buffer.reset(nextVal.getData(), nextVal.getPosition(), nextVal.getLength());
-    value = valueDeserializer.deserialize(value);
-
-    hasMore = input.next();
-    if (hasMore) {
-      nextKey = input.getKey();
-      nextKeyIsSame = comparator.compare(currentRawKey.getBytes(), 0, 
-                                     currentRawKey.getLength(),
-                                     nextKey.getData(),
-                                     nextKey.getPosition(),
-                                     nextKey.getLength() - nextKey.getPosition()
-                                         ) == 0;
-    } else {
-      nextKeyIsSame = false;
-    }
-    inputValueCounter.increment(1);
-    return true;
-  }
-
-  public Object getNextKey() throws IOException, InterruptedException {
-    return key;
-  }
-
-  public Iterable getNextValues() throws IOException,
-      InterruptedException {
-    return iterable;
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    return input.getProgress().getProgress();
-  }
-
-  public void close() throws IOException {
-    input.close();
-  }
-
-  public TezRawKeyValueIterator getIterator() {
-    return this.input;
-  }
-
-  protected class ValueIterator implements Iterator<Object> {
-
-
-    public boolean hasNext() {
-      return firstValue || nextKeyIsSame;
-    }
-
-    public Object next() {
-
-      // if this is the first record, we don't need to advance
-      if (firstValue) {
-        firstValue = false;
-        return value;
-      }
-      // if this isn't the first record and the next key is different, they
-      // can't advance it here.
-      if (!nextKeyIsSame) {
-        throw new NoSuchElementException("iterate past last value");
-      }
-      // otherwise, go to the next key/value pair
-      try {
-        nextKeyValue();
-        return value;
-      } catch (IOException ie) {
-        throw new RuntimeException("next value iterator failed", ie);
-      } catch (InterruptedException ie) {
-        // this is bad, but we can't modify the exception list of java.util
-        throw new RuntimeException("next value iterator interrupted", ie);        
-      }
-    }
-
-    public void remove() {
-      throw new UnsupportedOperationException("remove not implemented");
-    }
-  }
-
-
-  
-  protected class ValueIterable implements Iterable<Object> {
-    private ValueIterator iterator = new ValueIterator();
-    public Iterator<Object> iterator() {
-      return iterator;
-    } 
-  }
-  
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
deleted file mode 100644
index 10a1b90..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/combine/CombineOutput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.combine;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.api.Output;
-import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.records.OutputContext;
-
-public class CombineOutput implements Output {
-
-  private final Writer writer;
-  
-  public CombineOutput(Writer writer) {
-    this.writer = writer;
-  }
-
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-    // TODO Auto-generated method stub
-
-  }
-
-  public void write(Object key, Object value) throws IOException,
-      InterruptedException {
-    writer.append(key, value);
-  }
-
-  @Override
-  public OutputContext getOutputContext() {
-    return null;
-  }
-  
-  public void close() throws IOException, InterruptedException {
-    writer.close();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
index 38b04d3..1cb89a7 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/localshuffle/LocalShuffle.java
@@ -35,8 +35,8 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-import org.apache.tez.engine.common.task.local.newoutput.TezLocalTaskOutputFiles;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezLocalTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.newapi.TezInputContext;
 
 @SuppressWarnings({"rawtypes"})

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
index 46851c7..b2a0b54 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MapOutput.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.BoundedByteArrayOutputStream;
 import org.apache.tez.engine.common.InputAttemptIdentifier;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
 
 
 class MapOutput {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
index ad9bb5f..bf2be4e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/shuffle/impl/MergeManager.java
@@ -54,7 +54,7 @@ import org.apache.tez.engine.common.sort.impl.TezMerger;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
 import org.apache.tez.engine.common.sort.impl.TezMerger.Segment;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutputFiles;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutputFiles;
 import org.apache.tez.engine.hadoop.compat.NullProgressable;
 import org.apache.tez.engine.newapi.TezInputContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
deleted file mode 100644
index 35d7723..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/SortingOutput.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.common.sort;
-
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.engine.api.Output;
-
-/**
- * {@link SortingOutput} is an {@link Output} which sorts incoming key/value
- * pairs.
- */
-public interface SortingOutput extends Output {
-  
-  // TODO PreCommit rename
-  public void setTask(RunningTaskContext runningTaskContext);
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
index 1b5e015..8b4bd4e 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/sort/impl/ExternalSorter.java
@@ -48,7 +48,7 @@ import org.apache.tez.engine.common.TezEngineUtils;
 import org.apache.tez.engine.common.combine.Combiner;
 import org.apache.tez.engine.common.shuffle.impl.ShuffleHeader;
 import org.apache.tez.engine.common.sort.impl.IFile.Writer;
-import org.apache.tez.engine.common.task.local.newoutput.TezTaskOutput;
+import org.apache.tez.engine.common.task.local.output.TezTaskOutput;
 import org.apache.tez.engine.hadoop.compat.NullProgressable;
 import org.apache.tez.engine.newapi.TezOutputContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
deleted file mode 100644
index ae6a371..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/impl/CombineValuesIterator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements.  See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership.  The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.engine.common.task.impl;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
-import org.apache.tez.common.TezTaskReporter;
-import org.apache.tez.common.counters.TezCounter;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/** Iterator to return Combined values */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class CombineValuesIterator<KEY,VALUE>
-extends ValuesIterator<KEY,VALUE> {
-
-  private final TezCounter combineInputCounter;
-
-  public CombineValuesIterator(TezRawKeyValueIterator in,
-      RawComparator<KEY> comparator, Class<KEY> keyClass,
-      Class<VALUE> valClass, Configuration conf, TezTaskReporter reporter,
-      TezCounter combineInputCounter) throws IOException {
-    super(in, comparator, keyClass, valClass, conf, reporter);
-    this.combineInputCounter = combineInputCounter;
-  }
-
-  public VALUE next() {
-    combineInputCounter.increment(1);
-    return super.next();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
deleted file mode 100644
index bbe4e34..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezLocalTaskOutputFiles.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.newoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from the Child running the Task.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezLocalTaskOutputFiles extends TezTaskOutput {
-
-  public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
-    super(conf, uniqueId);
-  }
-
-  private LocalDirAllocator lDirAlloc =
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-
-
-  /**
-   * Return the path to local map output file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
-  }
-
-  /**
-   * Create a local map output file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
-  }
-  
-  /**
-   * Create a local map output file name. This should *only* be used if the size
-   * of the file is not known. Otherwise use the equivalent which accepts a size
-   * parameter.
-   * 
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputFileForWrite() throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR
-        + Path.SEPARATOR + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING,
-        conf);
-  }
-
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  @Override
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-  }
-
-  /**
-   * Return the path to a local map output index file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputIndexFile()
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
-        conf);
-  }
-
-  /**
-   * Create a local map output index file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputIndexFileForWrite(long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
-        size, conf);
-  }
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  @Override
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    return new Path(existing.getParent(),
-        Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-  }
-
-  /**
-   * Return a local map spill file created earlier.
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out", conf);
-  }
-
-  /**
-   * Create a local map spill file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out", size, conf);
-  }
-
-  /**
-   * Return a local map spill index file created earlier
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillIndexFile(int spillNumber)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out.index", conf);
-  }
-
-  /**
-   * Create a local map spill index file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out.index", size, conf);
-  }
-
-  /**
-   * Return a local reduce input file created earlier
-   *
-   * @param mapId a map task id
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getInputFile(InputAttemptIdentifier mapId)
-      throws IOException {
-    return lDirAlloc.getLocalPathToRead(String.format(
-        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, 
-        Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
-  }
-
-  /**
-   * Create a local reduce input file name.
-   *
-   * @param mapId a map task id
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  @Override
-  public Path getInputFileForWrite(int taskId,
-                                   long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
-        size, conf);
-  }
-
-  /** Removes all of the files related to a task. */
-  @Override
-  public void removeAll()
-      throws IOException {
-    deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
-  }
-
-  private String[] getLocalDirs() throws IOException {
-    return conf.getStrings(TezJobConfig.LOCAL_DIRS);
-  }
-
-  @SuppressWarnings("deprecation")
-  private void deleteLocalFiles(String subdir) throws IOException {
-    String[] localDirs = getLocalDirs();
-    for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
deleted file mode 100644
index 87a5aec..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutput.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.newoutput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public abstract class TezTaskOutput {
-
-  protected Configuration conf;
-  protected String uniqueId;
-
-  public TezTaskOutput(Configuration conf, String uniqueId) {
-    this.conf = conf;
-    this.uniqueId = uniqueId;
-  }
-
-  /**
-   * Return the path to local map output file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputFile() throws IOException;
-
-  /**
-   * Create a local map output file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputFileForWrite(long size) throws IOException;
-
-  /**
-   * Create a local output file name. This method is meant to be used *only* if
-   * the size of the file is not know up front.
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputFileForWrite() throws IOException;
-  
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  public abstract Path getOutputFileForWriteInVolume(Path existing);
-
-  /**
-   * Return the path to a local map output index file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputIndexFile() throws IOException;
-
-  /**
-   * Create a local map output index file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getOutputIndexFileForWrite(long size) throws IOException;
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
-
-  /**
-   * Return a local map spill file created earlier.
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillFile(int spillNumber) throws IOException;
-
-  /**
-   * Create a local map spill file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException;
-
-  /**
-   * Return a local map spill index file created earlier
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
-
-  /**
-   * Create a local map spill index file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException;
-
-  /**
-   * Return a local reduce input file created earlier
-   *
-   * @param attemptIdentifier The identifier for the source task
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
-
-  /**
-   * Create a local reduce input file name.
-   *
-   * @param taskIdentifier The identifier for the source task
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public abstract Path getInputFileForWrite(
-      int taskIdentifier, long size) throws IOException;
-
-  /** Removes all of the files related to a task. */
-  public abstract void removeAll() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
deleted file mode 100644
index a37f05f..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/newoutput/TezTaskOutputFiles.java
+++ /dev/null
@@ -1,246 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.engine.common.task.local.newoutput;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
-import org.apache.tez.common.Constants;
-import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.engine.common.InputAttemptIdentifier;
-
-/**
- * Manipulate the working area for the transient store for maps and reduces.
- *
- * This class is used by map and reduce tasks to identify the directories that
- * they need to write to/read from for intermediate files. The callers of
- * these methods are from child space and see mapreduce.cluster.local.dir as
- * taskTracker/jobCache/jobId/attemptId
- * This class should not be used from TaskTracker space.
- */
-
-@InterfaceAudience.Private
-@InterfaceStability.Unstable
-public class TezTaskOutputFiles extends TezTaskOutput {
-  
-  public TezTaskOutputFiles(Configuration conf, String uniqueId) {
-    super(conf, uniqueId);
-  }
-
-  private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
-
-  private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
-  private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
-      + ".index";
-
-  
-
-  // assume configured to $localdir/usercache/$user/appcache/$appId
-  private LocalDirAllocator lDirAlloc =
-    new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
-  
-
-  private Path getAttemptOutputDir() {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("getAttemptOutputDir: "
-          + Constants.TASK_OUTPUT_DIR + "/"
-          + uniqueId);
-    }
-    return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
-  }
-
-  /**
-   * Return the path to local map output file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFile() throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFileForWrite(long size) throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), size, conf);
-  }
-
-  /**
-   * Create a local map output file name. This should *only* be used if the size
-   * of the file is not known. Otherwise use the equivalent which accepts a size
-   * parameter.
-   * 
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputFileForWrite() throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output file name on the same volume.
-   */
-  public Path getOutputFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir, uniqueId);
-    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
-  }
-
-  /**
-   * Return the path to a local map output index file created earlier
-   *
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFile() throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
-                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
-  }
-
-  /**
-   * Create a local map output index file name.
-   *
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getOutputIndexFileForWrite(long size) throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
-                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathForWrite(attemptIndexOutput.toString(),
-        size, conf);
-  }
-
-  /**
-   * Create a local map output index file name on the same volume.
-   */
-  public Path getOutputIndexFileForWriteInVolume(Path existing) {
-    Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir, uniqueId);
-    return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
-                                      Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-  }
-
-  /**
-   * Return a local map spill file created earlier.
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_FILE_PATTERN,
-            uniqueId, spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(String.format(SPILL_FILE_PATTERN,
-            uniqueId, spillNumber)), size, conf);
-  }
-
-  /**
-   * Return a local map spill index file created earlier
-   *
-   * @param spillNumber the number
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            uniqueId, spillNumber), conf);
-  }
-
-  /**
-   * Create a local map spill index file name.
-   *
-   * @param spillNumber the number
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getSpillIndexFileForWrite(int spillNumber, long size)
-      throws IOException {
-    return lDirAlloc.getLocalPathForWrite(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            uniqueId, spillNumber), size, conf);
-  }
-
-  /**
-   * Return a local reduce input file created earlier
-   *
-   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
-   * @return path
-   * @throws IOException
-   */
-  public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-
-  /**
-   * Create a local reduce input file name.
-   *
-   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
-   * @param size the size of the file
-   * @return path
-   * @throws IOException
-   */
-  public Path getInputFileForWrite(int srcTaskId,
-      long size) throws IOException {
-    return lDirAlloc.getLocalPathForWrite(String.format(
-        uniqueId, getAttemptOutputDir().toString(), srcTaskId),
-        size, conf);
-  }
-
-  /** Removes all of the files related to a task. */
-  public void removeAll() throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
index 69484af..40e6b1a 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezLocalTaskOutputFiles.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
@@ -41,11 +41,13 @@ import org.apache.tez.dag.records.TezTaskID;
 @InterfaceStability.Unstable
 public class TezLocalTaskOutputFiles extends TezTaskOutput {
 
+  public TezLocalTaskOutputFiles(Configuration conf, String uniqueId) {
+    super(conf, uniqueId);
+  }
+
   private LocalDirAllocator lDirAlloc =
     new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
 
-  public TezLocalTaskOutputFiles() {
-  }
 
   /**
    * Return the path to local map output file created earlier
@@ -57,7 +59,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   public Path getOutputFile()
       throws IOException {
     return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, getConf());
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, conf);
   }
 
   /**
@@ -71,7 +73,22 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   public Path getOutputFileForWrite(long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
-        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, getConf());
+        + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING, size, conf);
+  }
+  
+  /**
+   * Create a local map output file name. This should *only* be used if the size
+   * of the file is not known. Otherwise use the equivalent which accepts a size
+   * parameter.
+   * 
+   * @return path
+   * @throws IOException
+   */
+  @Override
+  public Path getOutputFileForWrite() throws IOException {
+    return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR
+        + Path.SEPARATOR + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING,
+        conf);
   }
 
   /**
@@ -93,7 +110,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
       throws IOException {
     return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
         + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
-        getConf());
+        conf);
   }
 
   /**
@@ -108,7 +125,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + Path.SEPARATOR
         + Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING + Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING,
-        size, getConf());
+        size, conf);
   }
 
   /**
@@ -131,7 +148,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   public Path getSpillFile(int spillNumber)
       throws IOException {
     return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out", getConf());
+        + spillNumber + ".out", conf);
   }
 
   /**
@@ -146,7 +163,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   public Path getSpillFileForWrite(int spillNumber, long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out", size, getConf());
+        + spillNumber + ".out", size, conf);
   }
 
   /**
@@ -160,7 +177,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   public Path getSpillIndexFile(int spillNumber)
       throws IOException {
     return lDirAlloc.getLocalPathToRead(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out.index", getConf());
+        + spillNumber + ".out.index", conf);
   }
 
   /**
@@ -175,7 +192,7 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
   public Path getSpillIndexFileForWrite(int spillNumber, long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(Constants.TASK_OUTPUT_DIR + "/spill"
-        + spillNumber + ".out.index", size, getConf());
+        + spillNumber + ".out.index", size, conf);
   }
 
   /**
@@ -186,11 +203,11 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
    * @throws IOException
    */
   @Override
-  public Path getInputFile(int mapId)
+  public Path getInputFile(InputAttemptIdentifier mapId)
       throws IOException {
     return lDirAlloc.getLocalPathToRead(String.format(
         Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, 
-        Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId)), getConf());
+        Constants.TASK_OUTPUT_DIR, Integer.valueOf(mapId.getInputIdentifier().getSrcTaskIndex())), conf);
   }
 
   /**
@@ -202,12 +219,12 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
    * @throws IOException
    */
   @Override
-  public Path getInputFileForWrite(TezTaskID mapId,
+  public Path getInputFileForWrite(int taskId,
                                    long size)
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(String.format(
-        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, mapId.getId()),
-        size, getConf());
+        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING, Constants.TASK_OUTPUT_DIR, taskId),
+        size, conf);
   }
 
   /** Removes all of the files related to a task. */
@@ -217,20 +234,15 @@ public class TezLocalTaskOutputFiles extends TezTaskOutput {
     deleteLocalFiles(Constants.TASK_OUTPUT_DIR);
   }
 
-  @Override
-  public void setConf(Configuration conf) {
-    super.setConf(conf);
-  }
-
   private String[] getLocalDirs() throws IOException {
-    return getConf().getStrings(TezJobConfig.LOCAL_DIRS);
+    return conf.getStrings(TezJobConfig.LOCAL_DIRS);
   }
 
   @SuppressWarnings("deprecation")
   private void deleteLocalFiles(String subdir) throws IOException {
     String[] localDirs = getLocalDirs();
     for (int i = 0; i < localDirs.length; i++) {
-      FileSystem.getLocal(getConf()).delete(new Path(localDirs[i], subdir));
+      FileSystem.getLocal(conf).delete(new Path(localDirs[i], subdir));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
index 50d270b..e1d83ad 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutput.java
@@ -23,9 +23,8 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
@@ -38,12 +37,14 @@ import org.apache.tez.dag.records.TezTaskID;
  */
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
-public abstract class TezTaskOutput implements Configurable {
+public abstract class TezTaskOutput {
 
   protected Configuration conf;
   protected String uniqueId;
 
-  public TezTaskOutput() {
+  public TezTaskOutput(Configuration conf, String uniqueId) {
+    this.conf = conf;
+    this.uniqueId = uniqueId;
   }
 
   /**
@@ -64,6 +65,15 @@ public abstract class TezTaskOutput implements Configurable {
   public abstract Path getOutputFileForWrite(long size) throws IOException;
 
   /**
+   * Create a local output file name. This method is meant to be used *only* if
+   * the size of the file is not know up front.
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public abstract Path getOutputFileForWrite() throws IOException;
+  
+  /**
    * Create a local map output file name on the same volume.
    */
   public abstract Path getOutputFileForWriteInVolume(Path existing);
@@ -133,42 +143,23 @@ public abstract class TezTaskOutput implements Configurable {
   /**
    * Return a local reduce input file created earlier
    *
-   * @param mapId a map task id
+   * @param attemptIdentifier The identifier for the source task
    * @return path
    * @throws IOException
    */
-  public abstract Path getInputFile(int mapId) throws IOException;
+  public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
 
   /**
    * Create a local reduce input file name.
    *
-   * @param mapId a map task id
+   * @param taskIdentifier The identifier for the source task
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
   public abstract Path getInputFileForWrite(
-      TezTaskID mapId, long size) throws IOException;
+      int taskIdentifier, long size) throws IOException;
 
   /** Removes all of the files related to a task. */
   public abstract void removeAll() throws IOException;
-
-  public void setUniqueIdentifier(String uniqueId) {
-    this.uniqueId = uniqueId;
-  }
-  
-  public String getUniqueIdentifier() {
-    return this.uniqueId;
-  }
-  
-  @Override
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  @Override
-  public Configuration getConf() {
-    return conf;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
index b7874f0..b8f051b 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/common/task/local/output/TezTaskOutputFiles.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.Constants;
 import org.apache.tez.common.TezJobConfig;
-import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.engine.common.InputAttemptIdentifier;
 
 /**
  * Manipulate the working area for the transient store for maps and reduces.
@@ -40,32 +40,35 @@ import org.apache.tez.dag.records.TezTaskID;
  * taskTracker/jobCache/jobId/attemptId
  * This class should not be used from TaskTracker space.
  */
+
 @InterfaceAudience.Private
 @InterfaceStability.Unstable
 public class TezTaskOutputFiles extends TezTaskOutput {
+  
+  public TezTaskOutputFiles(Configuration conf, String uniqueId) {
+    super(conf, uniqueId);
+  }
 
   private static final Log LOG = LogFactory.getLog(TezTaskOutputFiles.class);
-  private Configuration conf;
 
   private static final String SPILL_FILE_PATTERN = "%s_spill_%d.out";
   private static final String SPILL_INDEX_FILE_PATTERN = SPILL_FILE_PATTERN
       + ".index";
 
-  public TezTaskOutputFiles() {
-  }
+  
 
   // assume configured to $localdir/usercache/$user/appcache/$appId
   private LocalDirAllocator lDirAlloc =
     new LocalDirAllocator(TezJobConfig.LOCAL_DIRS);
+  
 
   private Path getAttemptOutputDir() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("getAttemptOutputDir: "
           + Constants.TASK_OUTPUT_DIR + "/"
-          + conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+          + uniqueId);
     }
-    return new Path(Constants.TASK_OUTPUT_DIR,
-        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    return new Path(Constants.TASK_OUTPUT_DIR, uniqueId);
   }
 
   /**
@@ -94,12 +97,25 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
+   * Create a local map output file name. This should *only* be used if the size
+   * of the file is not known. Otherwise use the equivalent which accepts a size
+   * parameter.
+   * 
+   * @return path
+   * @throws IOException
+   */
+  public Path getOutputFileForWrite() throws IOException {
+    Path attemptOutput =
+      new Path(getAttemptOutputDir(), Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
+    return lDirAlloc.getLocalPathForWrite(attemptOutput.toString(), conf);
+  }
+
+  /**
    * Create a local map output file name on the same volume.
    */
   public Path getOutputFileForWriteInVolume(Path existing) {
     Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir,
-        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    Path attemptOutputDir = new Path(outputDir, uniqueId);
     return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING);
   }
 
@@ -136,8 +152,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
    */
   public Path getOutputIndexFileForWriteInVolume(Path existing) {
     Path outputDir = new Path(existing.getParent(), Constants.TASK_OUTPUT_DIR);
-    Path attemptOutputDir = new Path(outputDir,
-        conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID));
+    Path attemptOutputDir = new Path(outputDir, uniqueId);
     return new Path(attemptOutputDir, Constants.TEZ_ENGINE_TASK_OUTPUT_FILENAME_STRING +
                                       Constants.TEZ_ENGINE_TASK_OUTPUT_INDEX_SUFFIX_STRING);
   }
@@ -152,7 +167,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public Path getSpillFile(int spillNumber) throws IOException {
     return lDirAlloc.getLocalPathToRead(
         String.format(SPILL_FILE_PATTERN,
-            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+            uniqueId, spillNumber), conf);
   }
 
   /**
@@ -167,7 +182,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(
         String.format(String.format(SPILL_FILE_PATTERN,
-            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber)), size, conf);
+            uniqueId, spillNumber)), size, conf);
   }
 
   /**
@@ -180,7 +195,7 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public Path getSpillIndexFile(int spillNumber) throws IOException {
     return lDirAlloc.getLocalPathToRead(
         String.format(SPILL_INDEX_FILE_PATTERN,
-            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), conf);
+            uniqueId, spillNumber), conf);
   }
 
   /**
@@ -195,33 +210,32 @@ public class TezTaskOutputFiles extends TezTaskOutput {
       throws IOException {
     return lDirAlloc.getLocalPathForWrite(
         String.format(SPILL_INDEX_FILE_PATTERN,
-            conf.get(Constants.TEZ_ENGINE_TASK_ATTEMPT_ID), spillNumber), size, conf);
+            uniqueId, spillNumber), size, conf);
   }
 
   /**
    * Return a local reduce input file created earlier
    *
-   * @param mapId a map task id
+   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
    * @return path
    * @throws IOException
    */
-  public Path getInputFile(int mapId) throws IOException {
+  public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
     throw new UnsupportedOperationException("Incompatible with LocalRunner");
   }
 
   /**
    * Create a local reduce input file name.
    *
-   * @param mapId a map task id
+   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
    * @param size the size of the file
    * @return path
    * @throws IOException
    */
-  public Path getInputFileForWrite(TezTaskID mapId,
+  public Path getInputFileForWrite(int srcTaskId,
       long size) throws IOException {
     return lDirAlloc.getLocalPathForWrite(String.format(
-        Constants.TEZ_ENGINE_TASK_INPUT_FILE_FORMAT_STRING,
-        getAttemptOutputDir().toString(), mapId.getId()),
+        uniqueId, getAttemptOutputDir().toString(), srcTaskId),
         size, conf);
   }
 
@@ -229,13 +243,4 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   public void removeAll() throws IOException {
     throw new UnsupportedOperationException("Incompatible with LocalRunner");
   }
-
-  public void setConf(Configuration conf) {
-    this.conf = conf;
-  }
-
-  public Configuration getConf() {
-    return conf;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
index eccd119..c719fba 100644
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
+++ b/tez-engine/src/main/java/org/apache/tez/engine/lib/input/ShuffledMergedInput.java
@@ -29,12 +29,12 @@ import org.apache.tez.common.TezJobConfig;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
+import org.apache.tez.engine.api.KVReader;
 import org.apache.tez.engine.common.ConfigUtils;
 import org.apache.tez.engine.common.ValuesIterator;
 import org.apache.tez.engine.common.shuffle.impl.Shuffle;
 import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
 import org.apache.tez.engine.newapi.Event;
-import org.apache.tez.engine.newapi.KVReader;
 import org.apache.tez.engine.newapi.LogicalInput;
 import org.apache.tez.engine.newapi.TezInputContext;
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
deleted file mode 100644
index 269fe81..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/LocalMergedInput.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldinput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- * {@link LocalMergedInput} in an {@link Input} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer. 
- */
-public class LocalMergedInput extends OldShuffledMergedInput {
-
-  public LocalMergedInput(TezEngineTaskContext task, int index) {
-    super(task, index);
-  }
-
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-  }
-
-  public boolean hasNext() throws IOException, InterruptedException {
-    return false;
-  }
-
-  public Object getNextKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public Iterable getNextValues() 
-      throws IOException, InterruptedException {
-    return null;
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    return 0f;
-  }
-
-  public void close() throws IOException {
-  }
-
-  public TezRawKeyValueIterator getIterator() {
-    return null;
-  }
-  
-}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/b4950f98/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
----------------------------------------------------------------------
diff --git a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java b/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
deleted file mode 100644
index c046a27..0000000
--- a/tez-engine/src/main/java/org/apache/tez/engine/lib/oldinput/OldShuffledMergedInput.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.tez.engine.lib.oldinput;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.tez.common.RunningTaskContext;
-import org.apache.tez.common.TezEngineTaskContext;
-import org.apache.tez.engine.api.Input;
-import org.apache.tez.engine.api.Master;
-import org.apache.tez.engine.common.sort.impl.TezRawKeyValueIterator;
-
-/**
- * {@link OldShuffledMergedInput} in an {@link Input} which shuffles intermediate
- * sorted data, merges them and provides key/<values> to the consumer. 
- */
-public class OldShuffledMergedInput implements Input {
-
-
-  public OldShuffledMergedInput(TezEngineTaskContext task, int index) {
-  }
-
-  public void mergeWith(OldShuffledMergedInput other) {
-  }
-  
-  public void setTask(RunningTaskContext runningTaskContext) {
-  }
-  
-  public void initialize(Configuration conf, Master master) throws IOException,
-      InterruptedException {
-  }
-
-  public boolean hasNext() throws IOException, InterruptedException {
-    return false;
-  }
-
-  public Object getNextKey() throws IOException, InterruptedException {
-    return null;
-  }
-
-  @SuppressWarnings({ "unchecked", "rawtypes" })
-  public Iterable getNextValues() 
-      throws IOException, InterruptedException {
-    return null;
-  }
-
-  public float getProgress() throws IOException, InterruptedException {
-    return 0f;
-  }
-
-  public void close() throws IOException {
-  }
-
-  public TezRawKeyValueIterator getIterator() {
-    return null;
-  }
-  
-}