You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/01/21 00:16:52 UTC

tez git commit: TEZ-1855. Avoid scanning for previously written files within Inputs / Outputs (Rajesh Balamohan)

Repository: tez
Updated Branches:
  refs/heads/master a5579c4ab -> ecf1ff58f


TEZ-1855. Avoid scanning for previously written files within Inputs / Outputs (Rajesh Balamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/ecf1ff58
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/ecf1ff58
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/ecf1ff58

Branch: refs/heads/master
Commit: ecf1ff58f85d350d539755c05625a7ffc2388d94
Parents: a5579c4
Author: Rajesh Balamohan <rb...@hortonworks.com>
Authored: Wed Jan 21 04:46:42 2015 +0530
Committer: Rajesh Balamohan <rb...@hortonworks.com>
Committed: Wed Jan 21 04:46:42 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../processor/map/TestMapProcessor.java         | 13 +++-
 .../common/sort/impl/ExternalSorter.java        | 18 ++++-
 .../common/sort/impl/PipelinedSorter.java       | 34 ++++----
 .../common/sort/impl/dflt/DefaultSorter.java    | 28 ++++---
 .../common/task/local/output/TezTaskOutput.java | 52 -------------
 .../task/local/output/TezTaskOutputFiles.java   | 81 --------------------
 .../writers/UnorderedPartitionedKVWriter.java   | 14 +++-
 .../output/OrderedPartitionedKVOutput.java      |  2 +-
 .../TestUnorderedPartitionedKVWriter.java       | 59 ++++----------
 10 files changed, 91 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index ed13462..3ec7002 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.7.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-1855. Avoid scanning for previously written files within Inputs / Outputs.
   TEZ-1902. Fix findbugs warnings in tez-mapreduce.
   TEZ-1963. Fix post memory merge to be > 2 GB.
   TEZ-1901. Fix findbugs warnings in tez-examples.

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
index 3b62dfc..68d8cef 100644
--- a/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
+++ b/tez-mapreduce/src/test/java/org/apache/tez/mapreduce/processor/map/TestMapProcessor.java
@@ -24,7 +24,9 @@ import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputBuffer;
 import org.apache.hadoop.io.LongWritable;
@@ -96,6 +98,15 @@ public class TestMapProcessor {
     job.setNumReduceTasks(1);
   }
 
+  private Path getMapOutputFile(Configuration jobConf, OutputContext outputContext)
+      throws IOException {
+    LocalDirAllocator lDirAlloc = new LocalDirAllocator(TezRuntimeFrameworkConfigs.LOCAL_DIRS);
+    Path attemptOutput = new Path(new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, outputContext.getUniqueIdentifier()),
+        Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
+    Path mapOutputFile = lDirAlloc.getLocalPathToRead(attemptOutput.toString(), jobConf);
+    return  mapOutputFile;
+  }
+
   @Before
   @After
   public void cleanup() throws Exception {
@@ -152,7 +163,7 @@ public class TestMapProcessor {
 //        .getCommitter().getClass().getName());
 //    t.close();
 
-    Path mapOutputFile = mapOutputs.getOutputFile();
+    Path mapOutputFile = getMapOutputFile(jobConf, outputContext);
     LOG.info("mapOutputFile = " + mapOutputFile);
     IFile.Reader reader =
         new IFile.Reader(localFs, mapOutputFile, null, null, null, false, 0, -1);

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index 9aee53a..19d60e4 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -22,7 +22,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
+import java.util.Map;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -63,7 +65,10 @@ public abstract class ExternalSorter {
 
   private static final Log LOG = LogFactory.getLog(ExternalSorter.class);
 
-  public abstract void close() throws IOException;
+  public void close() throws IOException {
+    spillFileIndexPaths.clear();
+    spillFilePaths.clear();
+  }
 
   public abstract void flush() throws IOException;
 
@@ -103,6 +108,12 @@ public abstract class ExternalSorter {
   // Compression for map-outputs
   protected final CompressionCodec codec;
 
+  protected final Map<Integer, Path> spillFilePaths = Maps.newHashMap();
+  protected final Map<Integer, Path> spillFileIndexPaths = Maps.newHashMap();
+
+  protected Path finalOutputFile;
+  protected Path finalIndexFile;
+
   // Counters
   // MR compatilbity layer needs to rename counters back to what MR requries.
 
@@ -243,6 +254,11 @@ public abstract class ExternalSorter {
     return mapOutputFile;
   }
 
+  @Private
+  public Path getFinalIndexFile() {
+    return finalIndexFile;
+  }
+
   protected void runCombineProcessor(TezRawKeyValueIterator kvIter,
       Writer writer) throws IOException {
     try {

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 35ea954..9b171ab 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -246,7 +246,8 @@ public class PipelinedSorter extends ExternalSorter {
         + (partitions * APPROX_HEADER_LENGTH);
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
     final Path filename =
-      mapOutputFile.getSpillFileForWrite(numSpills, size);    
+      mapOutputFile.getSpillFileForWrite(numSpills, size);
+    spillFilePaths.put(numSpills, filename);
     FSDataOutputStream out = rfs.create(filename, true, 4096);
 
     try {
@@ -281,6 +282,7 @@ public class PipelinedSorter extends ExternalSorter {
       Path indexFilename =
         mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
             * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+      spillFileIndexPaths.put(numSpills, indexFilename);
       // TODO: cache
       spillRec.writeToFile(indexFilename, conf);
       ++numSpills;
@@ -294,9 +296,9 @@ public class PipelinedSorter extends ExternalSorter {
   @Override
   public void flush() throws IOException {
     final String uniqueIdentifier = outputContext.getUniqueIdentifier();
-    Path finalOutputFile =
+    finalOutputFile =
         mapOutputFile.getOutputFileForWrite(0); //TODO
-    Path finalIndexFile =
+    finalIndexFile =
         mapOutputFile.getOutputIndexFileForWrite(0); //TODO
 
     LOG.info("Starting flush of map output");
@@ -312,12 +314,13 @@ public class PipelinedSorter extends ExternalSorter {
     if(numSpills == 1) {
       // someday be able to pass this directly to shuffle
       // without writing to disk
-      final Path filename =
-          mapOutputFile.getSpillFile(0);
-      Path indexFilename =
-              mapOutputFile.getSpillIndexFile(0);
-      sameVolRename(filename, mapOutputFile.getOutputFileForWriteInVolume(filename));
-      sameVolRename(indexFilename, mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename));
+      final Path filename = spillFilePaths.get(0);
+      final Path indexFilename = spillFileIndexPaths.get(0);
+      finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
+      finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
+
+      sameVolRename(filename, finalOutputFile);
+      sameVolRename(indexFilename, finalIndexFile);
       return;
     }
     
@@ -325,11 +328,10 @@ public class PipelinedSorter extends ExternalSorter {
     FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
 
     final TezSpillRecord spillRec = new TezSpillRecord(partitions);
-    final ArrayList<TezSpillRecord> indexCacheList = new ArrayList<TezSpillRecord>();
 
     for(int i = 0; i < numSpills; i++) {
       // TODO: build this cache before
-      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
+      Path indexFilename = spillFileIndexPaths.get(i);
       TezSpillRecord spillIndex = new TezSpillRecord(indexFilename, conf);
       indexCacheList.add(spillIndex);
     }
@@ -339,7 +341,7 @@ public class PipelinedSorter extends ExternalSorter {
       List<Segment> segmentList =
           new ArrayList<Segment>(numSpills);
       for(int i = 0; i < numSpills; i++) {
-        Path spillFilename = mapOutputFile.getSpillFile(i);
+        Path spillFilename = spillFilePaths.get(i);
         TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
         Segment s =
@@ -390,14 +392,16 @@ public class PipelinedSorter extends ExternalSorter {
     spillRec.writeToFile(finalIndexFile, conf);
     finalOut.close();
     for(int i = 0; i < numSpills; i++) {
-      Path indexFilename = mapOutputFile.getSpillIndexFile(i);
-      Path spillFilename = mapOutputFile.getSpillFile(i);
+      Path indexFilename = spillFileIndexPaths.get(i);
+      Path spillFilename = spillFilePaths.get(i);
       rfs.delete(indexFilename,true);
       rfs.delete(spillFilename,true);
     }
+
+    spillFileIndexPaths.clear();
+    spillFilePaths.clear();
   }
 
-  public void close() { }
 
   private interface PartitionedRawKeyValueIterator extends TezRawKeyValueIterator {
     int getPartition();

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index f9e6935..b99f319 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -636,7 +636,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     //FIXME
     //kvbuffer = null;
     mergeParts();
-    Path outputPath = mapOutputFile.getOutputFile();
+    Path outputPath = finalOutputFile;
     fileOutputByteCounter.increment(rfs.getFileStatus(outputPath).getLen());
   }
 
@@ -747,6 +747,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
       final Path filename =
           mapOutputFile.getSpillFileForWrite(numSpills, size);
+      spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename);
 
       int spindex = mstart;
@@ -820,6 +821,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         Path indexFilename =
             mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                 * MAP_OUTPUT_INDEX_RECORD_LENGTH);
+        spillFileIndexPaths.put(numSpills, indexFilename);
         spillRec.writeToFile(indexFilename, conf);
       } else {
         indexCacheList.add(spillRec);
@@ -847,6 +849,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       final TezSpillRecord spillRec = new TezSpillRecord(partitions);
       final Path filename =
           mapOutputFile.getSpillFileForWrite(numSpills, size);
+      spillFilePaths.put(numSpills, filename);
       out = rfs.create(filename);
 
       // we don't run the combiner for a single record
@@ -895,7 +898,8 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         Path indexFilename =
             mapOutputFile.getSpillIndexFileForWrite(numSpills, partitions
                 * MAP_OUTPUT_INDEX_RECORD_LENGTH);
-        spillRec.writeToFile(indexFilename, conf);
+        spillFileIndexPaths.put(numSpills, indexFilename);
+         spillRec.writeToFile(indexFilename, conf);
       } else {
         indexCacheList.add(spillRec);
         totalIndexCacheMemory +=
@@ -1001,25 +1005,25 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     final String taskIdentifier = outputContext.getUniqueIdentifier();
 
     for(int i = 0; i < numSpills; i++) {
-      filename[i] = mapOutputFile.getSpillFile(i);
+      filename[i] = spillFilePaths.get(i);
       finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
     }
     if (numSpills == 1) { //the spill is the final output
-      sameVolRename(filename[0],
-          mapOutputFile.getOutputFileForWriteInVolume(filename[0]));
+      finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename[0]);
+      finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]);
+
+      sameVolRename(filename[0], finalOutputFile);
       if (indexCacheList.size() == 0) {
-        sameVolRename(mapOutputFile.getSpillIndexFile(0),
-            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]));
+        sameVolRename(spillFileIndexPaths.get(0), finalIndexFile);
       } else {
-        indexCacheList.get(0).writeToFile(
-            mapOutputFile.getOutputIndexFileForWriteInVolume(filename[0]), conf);
+        indexCacheList.get(0).writeToFile(finalIndexFile, conf);
       }
       return;
     }
 
     // read in paged indices
     for (int i = indexCacheList.size(); i < numSpills; ++i) {
-      Path indexFileName = mapOutputFile.getSpillIndexFile(i);
+      Path indexFileName = spillFileIndexPaths.get(i);
       indexCacheList.add(new TezSpillRecord(indexFileName, conf));
     }
 
@@ -1027,9 +1031,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     //lengths for each partition
     finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
     finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-    Path finalOutputFile =
+    finalOutputFile =
         mapOutputFile.getOutputFileForWrite(finalOutFileSize);
-    Path finalIndexFile =
+    finalIndexFile =
         mapOutputFile.getOutputIndexFileForWrite(finalIndexFileSize);
 
     //The output stream for the final single output file

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
index 1c08f28..e9f33af 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutput.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 /**
  * Manipulate the working area for the transient store for components in tez-runtime-library
@@ -53,16 +52,6 @@ public abstract class TezTaskOutput {
   }
 
   /**
-   * Return the path to local output file created earlier.
-   *
-   * TODO TEZ-1855: Remove this. Leads to an extra localdir scan just to update counters.
-   *
-   * @return path the path of the local output file
-   * @throws IOException
-   */
-  public abstract Path getOutputFile() throws IOException;
-
-  /**
    * Create a local output file name.
    *
    * @param size the size of the file
@@ -90,16 +79,6 @@ public abstract class TezTaskOutput {
   public abstract Path getOutputFileForWriteInVolume(Path existing);
 
   /**
-   * Return the path to a local output index file created earlier
-   *
-   * TODO TEZ-1855: Remove this. Leads to an additional scan to find empty partitions.
-   *
-   * @return path the path of the index file
-   * @throws IOException
-   */
-  public abstract Path getOutputIndexFile() throws IOException;
-
-  /**
    * Create a local output index file name.
    *
    * @param size the size of the file
@@ -117,16 +96,6 @@ public abstract class TezTaskOutput {
   public abstract Path getOutputIndexFileForWriteInVolume(Path existing);
 
   /**
-   * Return a local output spill file created earlier.
-   *
-   * @param spillNumber the spill number
-   * @return path the path of the previously written spill file corresponding to the spillNumber
-   * @throws IOException
-   * // KKK Try removing this. Unnecessary file scans - can be stored in memory instead.
-   */
-  public abstract Path getSpillFile(int spillNumber) throws IOException;
-
-  /**
    * Create a local output spill file name.
    *
    * @param spillNumber the spill number
@@ -137,16 +106,6 @@ public abstract class TezTaskOutput {
   public abstract Path getSpillFileForWrite(int spillNumber, long size)
       throws IOException;
 
-  /**
-   * Return a local output spill index file created earlier
-   *
-   * TODO TEZ-1855: Remove this. Should be possible to cache this instead of requiring a directory scan.
-   *
-   * @param spillNumber the spill number
-   * @return path the path of the previously written spill index file corresponding to the spillNumber
-   * @throws IOException
-   */
-  public abstract Path getSpillIndexFile(int spillNumber) throws IOException;
 
   /**
    * Create a local output spill index file name.
@@ -160,17 +119,6 @@ public abstract class TezTaskOutput {
       throws IOException;
 
   /**
-   * Return a local input file created earlier
-   *
-   * TODO: TEZ-1855. Remove this.
-   *
-   * @param attemptIdentifier The identifier for the source
-   * @return path the path to the input file
-   * @throws IOException
-   */
-  public abstract Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException;
-
-  /**
    * Create a local input file name.
    *
    * @param srcIdentifier The identifier for the source

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
index be7e4ab..59aab17 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/task/local/output/TezTaskOutputFiles.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
 import org.apache.tez.runtime.library.common.Constants;
-import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 
 /**
  * Manipulate the working area for the transient store for components in tez-runtime-library
@@ -71,23 +70,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
     return new Path(Constants.TEZ_RUNTIME_TASK_OUTPUT_DIR, uniqueId);
   }
 
-  /**
-   * Return the path to local output file created earlier.
-   *
-   * ${appDir}/output/${uniqueId}/file.out
-   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out
-   *
-   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
-   *
-   * @return path the path of the local output file
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputFile() throws IOException {
-    Path attemptOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptOutput.toString(), conf);
-  }
 
   /**
    * Create a local output file name.
@@ -147,24 +129,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
     return new Path(attemptOutputDir, Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING);
   }
 
-  /**
-   * Return the path to a local output index file created earlier
-   *
-   * ${appDir}/output/${uniqueId}/file.out.index
-   * e.g. application_1418684642047_0006/output/attempt_1418684642047_0006_1_00_000000_0_10003/file.out.index
-   *
-   * The structure of this file name is critical, to be served by the MapReduce ShuffleHandler.
-   *
-   * @return path the path of the index file
-   * @throws IOException
-   */
-  @Override
-  public Path getOutputIndexFile() throws IOException {
-    Path attemptIndexOutput =
-      new Path(getAttemptOutputDir(), Constants.TEZ_RUNTIME_TASK_OUTPUT_FILENAME_STRING +
-                                      Constants.TEZ_RUNTIME_TASK_OUTPUT_INDEX_SUFFIX_STRING);
-    return lDirAlloc.getLocalPathToRead(attemptIndexOutput.toString(), conf);
-  }
 
   /**
    * Create a local output index file name.
@@ -206,23 +170,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Return a local output spill file created earlier.
-   *
-   * ${appDir}/${uniqueId}_spill_${spillNumber}.out
-   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10003_spill_0.out
-   *
-   * @param spillNumber the spill number
-   * @return path the path of the previously written spill file corresponding to the spillNumber
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_FILE_PATTERN,
-            uniqueId, spillNumber), conf);
-  }
-
-  /**
    * Create a local spill file name.
    *
    * ${appDir}/${uniqueId}_spill_${spillNumber}.out
@@ -242,20 +189,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
   }
 
   /**
-   * Return a local map spill index file created earlier
-   *
-   * @param spillNumber the number
-   * @return path the path of the previously written spill index file corresponding to the spillNumber
-   * @throws IOException
-   */
-  @Override
-  public Path getSpillIndexFile(int spillNumber) throws IOException {
-    return lDirAlloc.getLocalPathToRead(
-        String.format(SPILL_INDEX_FILE_PATTERN,
-            uniqueId, spillNumber), conf);
-  }
-
-  /**
    * Create a local output spill index file name.
    *
    * ${appDir}/${uniqueId}_spill_${spillNumber}.out.index
@@ -274,20 +207,6 @@ public class TezTaskOutputFiles extends TezTaskOutput {
             uniqueId, spillNumber), size, conf);
   }
 
-  /**
-   * Return a local input file created earlier
-   *
-   * ${appDir}/${uniqueId}_spill_${spillNumber}.out
-   * e.g. application_1418684642047_0006/attempt_1418684642047_0006_1_00_000000_0_10004_spill_0.out
-   *
-   * @param attemptIdentifier an identifier for a task. The attempt information is ignored.
-   * @return path the path to the input file
-   * @throws IOException
-   */
-  @Override
-  public Path getInputFile(InputAttemptIdentifier attemptIdentifier) throws IOException {
-    throw new UnsupportedOperationException("Incompatible with LocalRunner");
-  }
 
   /**
    * Create a local input file name.

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 2a1df40..33c2122 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -126,6 +126,11 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
   final AtomicInteger numSpills = new AtomicInteger(0);
   private final AtomicInteger pendingSpillCount = new AtomicInteger(0);
 
+  @VisibleForTesting
+  Path finalIndexPath;
+  @VisibleForTesting
+  Path finalOutPath;
+
   private final ReentrantLock spillLock = new ReentrantLock();
   private final Condition spillInProgress = spillLock.newCondition();
 
@@ -334,6 +339,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       Path outPath = null;
       if (isFinalSpill) {
         outPath = outputFileHandler.getOutputFileForWrite(spillSize);
+        finalOutPath = outPath;
       } else {
         outPath = outputFileHandler.getSpillFileForWrite(spillNumber, spillSize);
       }
@@ -370,8 +376,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
       }
       if (isFinalSpill) {
         long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-        Path finalSpillFile = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
-        spillRecord.writeToFile(finalSpillFile, conf);
+        finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+        spillRecord.writeToFile(finalIndexPath, conf);
         fileOutputBytesCounter.increment(indexFileSizeEstimate);
         LOG.info("Finished final and only spill");
       } else {
@@ -516,8 +522,8 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
     }
 
     long indexFileSizeEstimate = numPartitions * Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH;
-    Path finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
-    Path finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
+    finalOutPath = outputFileHandler.getOutputFileForWrite(expectedSize);
+    finalIndexPath = outputFileHandler.getOutputIndexFileForWrite(indexFileSizeEstimate);
 
     TezSpillRecord finalSpillRecord = new TezSpillRecord(numPartitions);
 

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 5af9f86..de9e877 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -158,7 +158,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
 
     boolean outputGenerated = true;
     if (sendEmptyPartitionDetails) {
-      Path indexFile = sorter.getMapOutput().getOutputIndexFile();
+      Path indexFile = sorter.getFinalIndexFile();
       TezSpillRecord spillRecord = new TezSpillRecord(indexFile, conf);
       BitSet emptyPartitionDetails = new BitSet();
       int emptyPartitions = 0;

http://git-wip-us.apache.org/repos/asf/tez/blob/ecf1ff58/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
index 9950d30..cb385ea 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/writers/TestUnorderedPartitionedKVWriter.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doReturn;
@@ -56,7 +55,6 @@ import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
-import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.tez.common.TezCommonUtils;
 import org.apache.tez.common.TezRuntimeFrameworkConfigs;
@@ -335,27 +333,13 @@ public class TestUnorderedPartitionedKVWriter {
     // Verify the data
     // Verify the actual data
     TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
-    Path outputFilePath = null;
-    Path spillFilePath = null;
-    try {
-      outputFilePath = taskOutput.getOutputFile();
-    } catch (DiskErrorException e) {
-      if (numRecordsWritten > 0) {
-        fail();
-      } else {
-        // Record checking not required.
-        return;
-      }
-    }
-    try {
-      spillFilePath = taskOutput.getOutputIndexFile();
-    } catch (DiskErrorException e) {
-      if (numRecordsWritten > 0) {
-        fail();
-      } else {
-        // Record checking not required.
-        return;
-      }
+    Path outputFilePath = kvWriter.finalOutPath;
+    Path spillFilePath = kvWriter.finalIndexPath;
+    if (numRecordsWritten > 0) {
+      assertTrue(localFs.exists(outputFilePath));
+      assertTrue(localFs.exists(spillFilePath));
+    } else {
+      return;
     }
 
     // Special case for 0 records.
@@ -533,27 +517,14 @@ public class TestUnorderedPartitionedKVWriter {
 
     // Verify the actual data
     TezTaskOutput taskOutput = new TezTaskOutputFiles(conf, uniqueId);
-    Path outputFilePath = null;
-    Path spillFilePath = null;
-    try {
-      outputFilePath = taskOutput.getOutputFile();
-    } catch (DiskErrorException e) {
-      if (numRecordsWritten > 0) {
-        fail();
-      } else {
-        // Record checking not required.
-        return;
-      }
-    }
-    try {
-      spillFilePath = taskOutput.getOutputIndexFile();
-    } catch (DiskErrorException e) {
-      if (numRecordsWritten > 0) {
-        fail();
-      } else {
-        // Record checking not required.
-        return;
-      }
+    Path outputFilePath = kvWriter.finalOutPath;
+    Path spillFilePath = kvWriter.finalIndexPath;
+
+    if (numRecordsWritten > 0) {
+      assertTrue(localFs.exists(outputFilePath));
+      assertTrue(localFs.exists(spillFilePath));
+    } else {
+      return;
     }
 
     // Special case for 0 records.