You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/06 00:13:51 UTC

tez git commit: TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 8c44f2484 -> 146ab0702


TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/146ab070
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/146ab070
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/146ab070

Branch: refs/heads/master
Commit: 146ab0702a25cda7020de936e270e291ca567e3c
Parents: 8c44f24
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed May 6 03:43:49 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed May 6 03:43:49 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../runtime/api/AbstractLogicalIOProcessor.java |   4 +
 .../api/ProcessorFrameworkInterface.java        |  11 +-
 .../library/api/TezRuntimeConfiguration.java    |  11 +
 .../common/readers/UnorderedKVReader.java       |   1 +
 .../runtime/library/common/shuffle/Fetcher.java |   1 +
 .../common/shuffle/impl/ShuffleManager.java     |   5 +
 .../orderedgrouped/FetcherOrderedGrouped.java   |   7 +-
 .../shuffle/orderedgrouped/MergeManager.java    | 124 +++++++--
 .../shuffle/orderedgrouped/MergeThread.java     |  18 +-
 .../common/shuffle/orderedgrouped/Shuffle.java  |  16 +-
 .../orderedgrouped/ShuffleScheduler.java        |   1 +
 .../common/sort/impl/ExternalSorter.java        |  38 +++
 .../common/sort/impl/PipelinedSorter.java       | 261 +++++++++++--------
 .../library/common/sort/impl/TezMerger.java     |  31 ++-
 .../common/sort/impl/dflt/DefaultSorter.java    |  70 +++--
 .../library/input/OrderedGroupedKVInput.java    |   1 +
 .../runtime/library/input/UnorderedKVInput.java |   1 +
 .../output/OrderedPartitionedKVOutput.java      |   1 +
 .../library/output/UnorderedKVOutput.java       |   1 +
 .../output/UnorderedPartitionedKVOutput.java    |   1 +
 .../library/common/TestValuesIterator.java      |  20 +-
 .../orderedgrouped/TestMergeManager.java        |  87 +++++--
 .../library/common/sort/impl/TestTezMerger.java |   3 +-
 24 files changed, 518 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d7a1e1f..7ba8021 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
 
 ALL CHANGES:
+  TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable.
   TEZ-2401. Tez UI: All-dag page has duration keep counting for KILLED dag.
   TEZ-2392. Have all readers throw an Exception on incorrect next() usage.
   TEZ-2408. TestTaskAttempt fails to compile against hadoop-2.4 and hadoop-2.2.

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
index 7714321..5a4cbe8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
@@ -49,4 +49,8 @@ public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor,
     return context;
   }
 
+  @Override
+  public void abort() {
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
index f0ba9c9..89d4e3c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
@@ -18,9 +18,11 @@
 
 package org.apache.tez.runtime.api;
 
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
 import java.util.List;
 
-import org.apache.hadoop.classification.InterfaceAudience.Public;
 
 /**
  * Represents the Tez framework part of an {@link org.apache.tez.runtime.api.Processor}.
@@ -56,4 +58,11 @@ public interface ProcessorFrameworkInterface {
    *           if an error occurs
    */
   public void close() throws Exception;
+
+  /**
+   * Indicates <code>Processor</code> to abort. Cleanup can be done.
+   *
+   */
+  @Unstable
+  public void abort();
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index a818de8..3d9a701 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -311,6 +311,16 @@ public class TezRuntimeConfiguration {
    */
   public static final boolean TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT = false;
 
+  /**
+   * Used only for internal testing. Strictly not recommended to be used elsewhere. This
+   * parameter could be changed/dropped later.
+   */
+  @Unstable
+  @Private
+  public static final String TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT = TEZ_RUNTIME_PREFIX
+      + "cleanup.files.on.interrupt";
+  public static final boolean TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT = false;
+
   // TODO TEZ-1233 - allow this property to be set per vertex
   // TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
 
@@ -374,6 +384,7 @@ public class TezRuntimeConfiguration {
     tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
     tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
     tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS);
+    tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
 
     defaultConf.addResource("core-default.xml");
     defaultConf.addResource("core-site.xml");

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index b14a461..fc2e312 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -184,6 +184,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
       currentFetchedInput = shuffleManager.getNextInput();
     } catch (InterruptedException e) {
       LOG.warn("Interrupted while waiting for next available input", e);
+      Thread.currentThread().interrupt();
       throw new IOException(e);
     }
     if (currentFetchedInput == null) {

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 3154943..48fe0f2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -376,6 +376,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
       // fall back to HTTP fetch below
       LOG.warn("Double locking detected for " + host);
     } catch (InterruptedException sleepInterrupted) {
+      Thread.currentThread().interrupt();
       // fall back to HTTP fetch below
       LOG.warn("Lock was interrupted for " + host);
     } finally {

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 749143a..d47e652 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -748,6 +748,11 @@ public class ShuffleManager implements FetcherCallback {
   /////////////////// End of Methods from FetcherCallbackHandler
 
   public void shutdown() throws InterruptedException {
+    if (Thread.currentThread().isInterrupted()) {
+      //TODO: need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile
+      //As of now relying on job cleanup (when all directories would be cleared)
+      LOG.info("Thread interrupted. Need to cleanup the local dirs");
+    }
     if (!isShutdown.getAndSet(true)) {
       // Shut down any pending fetchers
       LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": "

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 8d20aa7..fbaabff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -177,6 +177,9 @@ class FetcherOrderedGrouped extends Thread {
         }
       }
     } catch (InterruptedException ie) {
+      //TODO: might not be respected when fetcher is in progress / server is busy.  TEZ-711
+      //Set the status back
+      Thread.currentThread().interrupt();
       return;
     } catch (Throwable t) {
       shuffle.reportException(t);
@@ -191,7 +194,9 @@ class FetcherOrderedGrouped extends Thread {
     try {
       join(5000);
     } catch (InterruptedException ie) {
-      LOG.warn("Got interrupt while joining " + getName(), ie);
+      //Reset the status
+      Thread.currentThread().interrupt();
+      LOG.warn("Got interrupt while joining " + getName());
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 2e6ebd9..5a35f2f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
@@ -137,6 +138,8 @@ public class MergeManager {
 
   private AtomicInteger mergeFileSequenceId = new AtomicInteger(0);
 
+  private final boolean cleanup;
+
   /**
    * Construct the MergeManager. Must call start before it becomes usable.
    */
@@ -174,6 +177,9 @@ public class MergeManager {
     this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
     this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
 
+    this.cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
+
     this.codec = codec;
     this.ifileReadAhead = ifileReadAheadEnabled;
     if (this.ifileReadAhead) {
@@ -514,27 +520,61 @@ public class MergeManager {
   public boolean isMergeComplete() {
     return finalMergeComplete;
   }
-  
+
   public TezRawKeyValueIterator close() throws Throwable {
     // Wait for on-going merges to complete
-    if (memToMemMerger != null) { 
+    if (memToMemMerger != null) {
       memToMemMerger.close();
     }
     inMemoryMerger.close();
     onDiskMerger.close();
-    
-    List<MapOutput> memory = 
+
+    List<MapOutput> memory =
       new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
     inMemoryMergedMapOutputs.clear();
     memory.addAll(inMemoryMapOutputs);
     inMemoryMapOutputs.clear();
     List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
     onDiskMapOutputs.clear();
-    TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
-    this.finalMergeComplete = true;
-    return kvIter;
+    try {
+      TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+      this.finalMergeComplete = true;
+      return kvIter;
+    } catch(InterruptedException e) {
+      //Cleanup the disk segments
+      if (cleanup) {
+        cleanup(localFS, disk);
+        cleanup(localFS, onDiskMapOutputs);
+      }
+      Thread.currentThread().interrupt(); //reset interrupt status
+      throw e;
+    }
+  }
+
+
+  static void cleanup(FileSystem fs, Collection<FileChunk> fileChunkList) {
+    for (FileChunk fileChunk : fileChunkList) {
+      cleanup(fs, fileChunk.getPath());
+    }
   }
-   
+
+  static void cleanup(FileSystem fs, Path path) {
+    if (path == null) {
+      return;
+    }
+
+    try {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Deleting " + path);
+      }
+      fs.delete(path, true);
+    } catch (IOException e) {
+      LOG.info("Error in deleting " + path);
+    }
+  }
+
+
+
   void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
       throws IOException, InterruptedException {
     combiner.combine(kvIter, writer);
@@ -555,7 +595,7 @@ public class MergeManager {
     }
 
     @Override
-    public void merge(List<MapOutput> inputs) throws IOException {
+    public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
       if (inputs == null || inputs.size() == 0) {
         return;
       }
@@ -597,13 +637,28 @@ public class MergeManager {
       // Note the output of the merge
       closeInMemoryMergedFile(mergedMapOutputs);
     }
+
+    @Override
+    public void cleanup(List<MapOutput> inputs, boolean deleteData) throws IOException,
+        InterruptedException {
+      //No OP
+    }
   }
   
   /**
    * Merges multiple in-memory segment to a disk segment
    */
   private class InMemoryMerger extends MergeThread<MapOutput> {
-    
+
+    @VisibleForTesting
+    volatile InputAttemptIdentifier srcTaskIdentifier;
+
+    @VisibleForTesting
+    volatile Path outputPath;
+
+    @VisibleForTesting
+    volatile Path tmpDir;
+
     public InMemoryMerger(MergeManager manager) {
       super(manager, Integer.MAX_VALUE, exceptionReporter);
       setName("MemtoDiskMerger [" + TezUtilsInternal
@@ -628,7 +683,7 @@ public class MergeManager {
       //in the merge method)
 
       //figure out the mapId 
-      InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
+      srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
 
       List<Segment> inMemorySegments = new ArrayList<Segment>();
       long mergeOutputSize = 
@@ -639,7 +694,7 @@ public class MergeManager {
       
       // All disk writes done by this merge are overhead - due to the lac of
       // adequate memory to keep all segments in memory.
-      Path outputPath = mapOutputFile.getInputFileForWrite(
+      outputPath = mapOutputFile.getInputFileForWrite(
           srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
           mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
       LOG.info("Patch..InMemoryMerger outputPath: " + outputPath);
@@ -657,13 +712,13 @@ public class MergeManager {
         LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
             " segments...");
 
+        tmpDir = new Path(inputContext.getUniqueIdentifier());
         // Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
         rIter = TezMerger.merge(conf, rfs,
             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
             (Class)ConfigUtils.getIntermediateInputValueClass(conf),
             inMemorySegments, inMemorySegments.size(),
-            new Path(inputContext.getUniqueIdentifier()),
-            (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+            tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
             nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
         // spilledRecordsCounter is tracking the number of keys that will be
         // read from each of the segments being merged - which is essentially
@@ -700,6 +755,18 @@ public class MergeManager {
       closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen));
     }
 
+    @Override
+    public void cleanup(List<MapOutput> inputs, boolean deleteData)
+        throws IOException, InterruptedException {
+      if (deleteData) {
+        //Additional check at task level
+        if (cleanup) {
+          LOG.info("Try deleting stale data");
+          MergeManager.cleanup(localFS, outputPath);
+          MergeManager.cleanup(localFS, tmpDir);
+        }
+      }
+    }
   }
 
   /**
@@ -708,6 +775,11 @@ public class MergeManager {
   @VisibleForTesting
   class OnDiskMerger extends MergeThread<FileChunk> {
 
+    @VisibleForTesting
+    volatile Path outputPath;
+    @VisibleForTesting
+    volatile Path tmpDir;
+
     public OnDiskMerger(MergeManager manager) {
       super(manager, ioSortFactor, exceptionReporter);
       setName("DiskToDiskMerger [" + TezUtilsInternal
@@ -716,7 +788,7 @@ public class MergeManager {
     }
     
     @Override
-    public void merge(List<FileChunk> inputs) throws IOException {
+    public void merge(List<FileChunk> inputs) throws IOException, InterruptedException {
       // sanity check
       if (inputs == null || inputs.isEmpty()) {
         LOG.info("No ondisk files to merge...");
@@ -768,7 +840,7 @@ public class MergeManager {
 
       // namePart includes the suffix of the file. We need to remove it.
       namePart = FilenameUtils.removeExtension(namePart);
-      Path outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
+      outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
       outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX + mergeFileSequenceId.getAndIncrement());
 
       Writer writer =
@@ -776,7 +848,7 @@ public class MergeManager {
                         (Class)ConfigUtils.getIntermediateInputKeyClass(conf), 
                         (Class)ConfigUtils.getIntermediateInputValueClass(conf),
                         codec, null, null);
-      Path tmpDir = new Path(inputContext.getUniqueIdentifier());
+      tmpDir = new Path(inputContext.getUniqueIdentifier());
       try {
         TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs,
             (Class)ConfigUtils.getIntermediateInputKeyClass(conf),
@@ -808,6 +880,20 @@ public class MergeManager {
           " Local output file is " + outputPath + " of size " +
           outputLen);
     }
+
+    @Override
+    public void cleanup(List<FileChunk> inputs, boolean deleteData) throws IOException,
+        InterruptedException {
+      if (deleteData) {
+        //Additional check at task level
+        if (cleanup) {
+          LOG.info("Try deleting stale data");
+          MergeManager.cleanup(localFS, inputs);
+          MergeManager.cleanup(localFS, outputPath);
+          MergeManager.cleanup(localFS, tmpDir);
+        }
+      }
+    }
   }
   
   private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs,
@@ -821,7 +907,7 @@ public class MergeManager {
     for (MapOutput mo : inMemoryMapOutputs) {
       fullSize += mo.getMemory().length;
     }
-    while(fullSize > leaveBytes) {
+    while((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) {
       MapOutput mo = inMemoryMapOutputs.remove(0);
       byte[] data = mo.getMemory();
       long size = data.length;
@@ -878,7 +964,7 @@ public class MergeManager {
   private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
                                        List<MapOutput> inMemoryMapOutputs,
                                        List<FileChunk> onDiskMapOutputs
-                                       ) throws IOException {
+                                       ) throws IOException, InterruptedException {
     LOG.info("finalMerge called with " + 
              inMemoryMapOutputs.size() + " in-memory map-outputs and " + 
              onDiskMapOutputs.size() + " on-disk map-outputs");

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
index d4faf51..52b4c5b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
@@ -46,8 +46,18 @@ abstract class MergeThread<T> extends Thread {
   
   public synchronized void close() throws InterruptedException {
     closed = true;
-    waitForMerge();
-    interrupt();
+    if (!Thread.currentThread().isInterrupted()) {
+      waitForMerge();
+      interrupt();
+    } else {
+      try {
+        interrupt();
+        cleanup(inputs, Thread.currentThread().isInterrupted());
+      } catch (IOException e) {
+        //ignore
+        LOG.warn("Error cleaning up", e);
+      }
+    }
   }
 
   public synchronized boolean isInProgress() {
@@ -89,6 +99,7 @@ abstract class MergeThread<T> extends Thread {
         merge(inputs);
       } catch (InterruptedException ie) {
         // Meant to handle a shutdown of the entire fetch/merge process
+        Thread.currentThread().interrupt();
         return;
       } catch(Throwable t) {
         reporter.reportException(t);
@@ -106,4 +117,7 @@ abstract class MergeThread<T> extends Thread {
 
   public abstract void merge(List<T> inputs) 
       throws IOException, InterruptedException;
+
+  public abstract void cleanup(List<T> inputs, boolean deleteData)
+      throws IOException, InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index f98aa3a..442f032 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -357,6 +357,7 @@ public class Shuffle implements ExceptionReporter {
       shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
 
       // Stop the map-output fetcher threads
+      LOG.info("Cleaning up fetchers");
       cleanupFetchers(false);
       
       // stop the scheduler
@@ -393,8 +394,7 @@ public class Shuffle implements ExceptionReporter {
         for (FetcherOrderedGrouped fetcher : fetchers) {
           try {
             fetcher.shutDown();
-            LOG.info("Shutdown.." + fetcher.getName() + ", status:" + fetcher.isAlive() + ", "
-                + "isInterrupted:" + fetcher.isInterrupted());
+            LOG.info("Shutdown.." + fetcher.getName());
           } catch (InterruptedException e) {
             if (ignoreErrors) {
               LOG.info("Interrupted while shutting down fetchers. Ignoring.");
@@ -425,6 +425,8 @@ public class Shuffle implements ExceptionReporter {
         scheduler.close();
       } catch (InterruptedException e) {
         if (ignoreErrors) {
+          //Reset the status
+          Thread.currentThread().interrupt();
           LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring");
         } else {
           throw e;
@@ -437,6 +439,14 @@ public class Shuffle implements ExceptionReporter {
     if (!mergerClosed.getAndSet(true)) {
       try {
         merger.close();
+      } catch (InterruptedException e) {
+        if (ignoreErrors) {
+          //Reset the status
+          Thread.currentThread().interrupt();
+          LOG.info("Interrupted while attempting to close the merger during cleanup. Ignoring");
+        } else {
+          throw e;
+        }
       } catch (Throwable e) {
         if (ignoreErrors) {
           LOG.info("Exception while trying to shutdown merger, Ignoring", e);
@@ -493,7 +503,7 @@ public class Shuffle implements ExceptionReporter {
     @Override
     public void onFailure(Throwable t) {
       if (isShutDown.get()) {
-        LOG.info("Already shutdown. Ignoring error: ",  t);
+        LOG.info("Already shutdown. Ignoring error");
       } else {
         LOG.error("ShuffleRunner failed with error", t);
         inputContext.fatalError(t, "Shuffle Runner Failed");

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index a3d79ae..c54b005 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -765,6 +765,7 @@ class ShuffleScheduler {
           }
         }
       } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
         // This handles shutdown of the entire fetch / merge process.
         return;
       } catch (Throwable t) {

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index c0445c9..ca4d889 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -115,6 +115,8 @@ public abstract class ExternalSorter {
   protected Path finalIndexFile;
   protected int numSpills;
 
+  protected final boolean cleanup;
+
   // Counters
   // MR compatilbity layer needs to rename counters back to what MR requries.
 
@@ -148,6 +150,9 @@ public abstract class ExternalSorter {
     this.conf = conf;
     this.partitions = numOutputs;
 
+    cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
+        TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
+
     rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
 
     LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable >> 20)));
@@ -261,6 +266,7 @@ public abstract class ExternalSorter {
     try {
       combiner.combine(kvIter, writer);
     } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
       throw new IOException(e);
     }
   }
@@ -314,4 +320,36 @@ public abstract class ExternalSorter {
   public int getNumSpills() {
     return numSpills;
   }
+
+  protected synchronized void cleanup() throws IOException {
+    if (!cleanup) {
+      return;
+    }
+    cleanup(spillFilePaths);
+    cleanup(spillFileIndexPaths);
+    //TODO: What if when same volume rename happens (have to rely on job completion cleanup)
+    cleanup(finalOutputFile);
+    cleanup(finalIndexFile);
+  }
+
+  protected synchronized void cleanup(Path path) {
+    if (path == null || !cleanup) {
+      return;
+    }
+    try {
+      LOG.info("Deleting " + path);
+      rfs.delete(path, true);
+    } catch(IOException ioe) {
+      LOG.warn("Error in deleting "  + path);
+    }
+  }
+
+  protected synchronized void cleanup(Map<Integer, Path> spillMap) {
+    if (!cleanup) {
+      return;
+    }
+    for(Map.Entry<Integer, Path> entry : spillMap.entrySet()) {
+      cleanup(entry.getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 661f54c..030440e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
-
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
@@ -356,6 +355,9 @@ public class PipelinedSorter extends ExternalSorter {
       merger.ready(); // wait for all the future results from sort threads
       LOG.info("Spilling to " + filename.toString());
       for (int i = 0; i < partitions; ++i) {
+        if (isThreadInterrupted()) {
+          return;
+        }
         TezRawKeyValueIterator kvIter = merger.filter(i);
         //write merged output to disk
         long segmentStart = out.getPos();
@@ -391,147 +393,182 @@ public class PipelinedSorter extends ExternalSorter {
       ++numSpills;
     } catch(InterruptedException ie) {
       // TODO:the combiner has been interrupted
+      Thread.currentThread().interrupt();
     } finally {
       out.close();
     }
   }
 
+
+
+
+
+  private boolean isThreadInterrupted() throws IOException {
+    if (Thread.currentThread().isInterrupted()) {
+      if (cleanup) {
+        cleanup();
+      }
+      sortmaster.shutdownNow();
+      LOG.info("Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
+          .isShutdown() + ", terminated=" + sortmaster.isTerminated());
+      return true;
+    }
+    return false;
+  }
+
   @Override
   public void flush() throws IOException {
     final String uniqueIdentifier = outputContext.getUniqueIdentifier();
 
-    LOG.info("Starting flush of map output");
-    span.end();
-    merger.add(span.sort(sorter));
-    spill();
-    sortmaster.shutdown();
+    /**
+     * Possible that the thread got interrupted when flush was happening or when the flush was
+     * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
+     * on all I/O. At that time, this is safe to cleanup
+     */
+    if (isThreadInterrupted()) {
+      return;
+    }
+
+    try {
+      LOG.info("Starting flush of map output");
+      span.end();
+      merger.add(span.sort(sorter));
+      spill();
+      sortmaster.shutdown();
 
-    //safe to clean up
-    bufferList.clear();
+      //safe to clean up
+      bufferList.clear();
 
-    numAdditionalSpills.increment(numSpills - 1);
+      numAdditionalSpills.increment(numSpills - 1);
 
-    if (!finalMergeEnabled) {
-      //Generate events for all spills
-      List<Event> events = Lists.newLinkedList();
+      if (!finalMergeEnabled) {
+        //Generate events for all spills
+        List<Event> events = Lists.newLinkedList();
 
-      //For pipelined shuffle, previous events are already sent. Just generate the last event alone
-      int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
-      int endIndex = numSpills;
+        //For pipelined shuffle, previous events are already sent. Just generate the last event alone
+        int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
+        int endIndex = numSpills;
 
-      for (int i = startIndex; i < endIndex; i++) {
-        boolean isLastEvent = (i == numSpills - 1);
+        for (int i = startIndex; i < endIndex; i++) {
+          boolean isLastEvent = (i == numSpills - 1);
 
-        String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
-        ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
-            outputContext, i, indexCacheList.get(i), partitions,
-            sendEmptyPartitionDetails, pathComponent);
-        LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
+          String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
+          ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+              outputContext, i, indexCacheList.get(i), partitions,
+              sendEmptyPartitionDetails, pathComponent);
+          LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
+        }
+        outputContext.sendEvents(events);
+        //No need to generate final merge
+        return;
       }
-      outputContext.sendEvents(events);
-      //No need to generate final merge
-      return;
-    }
 
-    //In case final merge is required, the following code path is executed.
-    if(numSpills == 1) {
-      // someday be able to pass this directly to shuffle
-      // without writing to disk
-      final Path filename = spillFilePaths.get(0);
-      final Path indexFilename = spillFileIndexPaths.get(0);
-      finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
-      finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
-
-      sameVolRename(filename, finalOutputFile);
-      sameVolRename(indexFilename, finalIndexFile);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", "
-            + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
-            indexFilename);
+      //In case final merge is required, the following code path is executed.
+      if (numSpills == 1) {
+        // someday be able to pass this directly to shuffle
+        // without writing to disk
+        final Path filename = spillFilePaths.get(0);
+        final Path indexFilename = spillFileIndexPaths.get(0);
+        finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
+        finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
+
+        sameVolRename(filename, finalOutputFile);
+        sameVolRename(indexFilename, finalIndexFile);
+        if (LOG.isInfoEnabled()) {
+          LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", "
+              + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
+              indexFilename);
+        }
+        return;
       }
-      return;
-    }
 
-    finalOutputFile =
-        mapOutputFile.getOutputFileForWrite(0); //TODO
-    finalIndexFile =
-        mapOutputFile.getOutputIndexFileForWrite(0); //TODO
+      finalOutputFile =
+          mapOutputFile.getOutputFileForWrite(0); //TODO
+      finalIndexFile =
+          mapOutputFile.getOutputIndexFileForWrite(0); //TODO
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
-              + finalIndexFile);
-    }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
+                + finalIndexFile);
+      }
 
-    //The output stream for the final single output file
-    FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+      //The output stream for the final single output file
+      FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
 
-    final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+      final TezSpillRecord spillRec = new TezSpillRecord(partitions);
 
+      for (int parts = 0; parts < partitions; parts++) {
+        //create the segments to be merged
+        List<Segment> segmentList =
+            new ArrayList<Segment>(numSpills);
+        for (int i = 0; i < numSpills; i++) {
+          Path spillFilename = spillFilePaths.get(i);
+          TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
 
-    for (int parts = 0; parts < partitions; parts++) {
-      //create the segments to be merged
-      List<Segment> segmentList =
-          new ArrayList<Segment>(numSpills);
-      for(int i = 0; i < numSpills; i++) {
-        Path spillFilename = spillFilePaths.get(i);
-        TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+          Segment s =
+              new Segment(rfs, spillFilename, indexRecord.getStartOffset(),
+                  indexRecord.getPartLength(), codec, ifileReadAhead,
+                  ifileReadAheadLength, ifileBufferSize, true);
+          segmentList.add(i, s);
+        }
 
-        Segment s =
-            new Segment(rfs, spillFilename, indexRecord.getStartOffset(),
-                             indexRecord.getPartLength(), codec, ifileReadAhead,
-                             ifileReadAheadLength, ifileBufferSize, true);
-        segmentList.add(i, s);
-      }
+        int mergeFactor =
+            this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
+                TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
+        // sort the segments only if there are intermediate merges
+        boolean sortSegments = segmentList.size() > mergeFactor;
+        //merge
+        TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
+            keyClass, valClass, codec,
+            segmentList, mergeFactor,
+            new Path(uniqueIdentifier),
+            (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
+            nullProgressable, sortSegments, true,
+            null, spilledRecordsCounter, null,
+            null); // Not using any Progress in TezMerger. Should just work.
 
-      int mergeFactor = 
-              this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR, 
-                  TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
-      // sort the segments only if there are intermediate merges
-      boolean sortSegments = segmentList.size() > mergeFactor;
-      //merge
-      TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
-                     keyClass, valClass, codec,
-                     segmentList, mergeFactor,
-                     new Path(uniqueIdentifier),
-                     (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf), 
-                     nullProgressable, sortSegments, true,
-                     null, spilledRecordsCounter, null,
-                     null); // Not using any Progress in TezMerger. Should just work.
-
-      //write merged output to disk
-      long segmentStart = finalOut.getPos();
-      Writer writer =
-          new Writer(conf, finalOut, keyClass, valClass, codec,
-                           spilledRecordsCounter, null, merger.needsRLE());
-      if (combiner == null || numSpills < minSpillsForCombine) {
-        TezMerger.writeFile(kvIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
-      } else {
-        runCombineProcessor(kvIter, writer);
-      }
+        //write merged output to disk
+        long segmentStart = finalOut.getPos();
+        Writer writer =
+            new Writer(conf, finalOut, keyClass, valClass, codec,
+                spilledRecordsCounter, null, merger.needsRLE());
+        if (combiner == null || numSpills < minSpillsForCombine) {
+          TezMerger.writeFile(kvIter, writer, nullProgressable,
+              TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+        } else {
+          runCombineProcessor(kvIter, writer);
+        }
 
-      //close
-      writer.close();
+        //close
+        writer.close();
 
-      // record offsets
-      final TezIndexRecord rec = 
-          new TezIndexRecord(
-              segmentStart, 
-              writer.getRawLength(), 
-              writer.getCompressedLength());
-      spillRec.putIndex(rec, parts);
-    }
+        // record offsets
+        final TezIndexRecord rec =
+            new TezIndexRecord(
+                segmentStart,
+                writer.getRawLength(),
+                writer.getCompressedLength());
+        spillRec.putIndex(rec, parts);
+      }
 
-    spillRec.writeToFile(finalIndexFile, conf);
-    finalOut.close();
-    for(int i = 0; i < numSpills; i++) {
-      Path indexFilename = spillFileIndexPaths.get(i);
-      Path spillFilename = spillFilePaths.get(i);
-      rfs.delete(indexFilename,true);
-      rfs.delete(spillFilename,true);
-    }
+      spillRec.writeToFile(finalIndexFile, conf);
+      finalOut.close();
+      for (int i = 0; i < numSpills; i++) {
+        Path indexFilename = spillFileIndexPaths.get(i);
+        Path spillFilename = spillFilePaths.get(i);
+        rfs.delete(indexFilename, true);
+        rfs.delete(spillFilename, true);
+      }
 
-    spillFileIndexPaths.clear();
-    spillFilePaths.clear();
+      spillFileIndexPaths.clear();
+      spillFilePaths.clear();
+    } catch(InterruptedException ie) {
+      if (cleanup) {
+        cleanup();
+      }
+      Thread.currentThread().interrupt();
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 758e9c7..3b7bf05 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -76,7 +76,7 @@ public class TezMerger {
                             TezCounter writesCounter,
                             TezCounter bytesReadCounter,
                             Progress mergePhase)
-  throws IOException {
+      throws IOException, InterruptedException {
     return 
       new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
                            ifileReadAheadLength, ifileBufferSize, false, comparator, 
@@ -101,7 +101,7 @@ public class TezMerger {
                             TezCounter mergedMapOutputsCounter,
                             TezCounter bytesReadCounter,
                             Progress mergePhase)
-  throws IOException {
+      throws IOException, InterruptedException {
     return 
       new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
                            ifileReadAheadLength, ifileBufferSize, false, comparator, 
@@ -124,7 +124,7 @@ public class TezMerger {
                             TezCounter writesCounter,
                             TezCounter bytesReadCounter,
                             Progress mergePhase)
-      throws IOException {
+      throws IOException, InterruptedException {
     // Get rid of this ?
     return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
                  comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter,
@@ -142,7 +142,7 @@ public class TezMerger {
                             TezCounter writesCounter,
                             TezCounter bytesReadCounter,
                             Progress mergePhase)
-      throws IOException {
+      throws IOException, InterruptedException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
                            sortSegments, false).merge(keyClass, valueClass,
                                                mergeFactor, tmpDir,
@@ -163,7 +163,7 @@ public class TezMerger {
                             TezCounter writesCounter,
                             TezCounter bytesReadCounter,
                             Progress mergePhase)
-      throws IOException {
+      throws IOException, InterruptedException {
     return new MergeQueue(conf, fs, segments, comparator, reporter,
                            sortSegments, codec, considerFinalMergeForProgress).
                                          merge(keyClass, valueClass,
@@ -185,7 +185,7 @@ public class TezMerger {
                           TezCounter writesCounter,
                           TezCounter bytesReadCounter,
                           Progress mergePhase)
-    throws IOException {
+      throws IOException, InterruptedException {
   return new MergeQueue(conf, fs, segments, comparator, reporter,
                          sortSegments, codec, false).merge(keyClass, valueClass,
                                              mergeFactor, inMemSegments,
@@ -196,9 +196,9 @@ public class TezMerger {
 }
 
   public static <K extends Object, V extends Object>
-  void writeFile(TezRawKeyValueIterator records, Writer writer, 
-                 Progressable progressable, long recordsBeforeProgress) 
-  throws IOException {
+  void writeFile(TezRawKeyValueIterator records, Writer writer,
+      Progressable progressable, long recordsBeforeProgress)
+      throws IOException, InterruptedException {
     long recordCtr = 0;
     long count = 0;
     while(records.next()) {
@@ -211,6 +211,15 @@ public class TezMerger {
       
       if (((recordCtr++) % recordsBeforeProgress) == 0) {
         progressable.progress();
+        if (Thread.currentThread().isInterrupted()) {
+          /**
+           * Takes care DefaultSorter.mergeParts, MergeManager's merger threads,
+           * PipelinedSorter's flush(). This is not expensive check as it is carried out every
+           * 10000 records or so.
+           */
+          throw new InterruptedException("Current thread=" + Thread.currentThread().getName() + " got "
+              + "interrupted");
+        }
       }
     }
     if ((count > 0) && LOG.isDebugEnabled()) {
@@ -614,7 +623,7 @@ public class TezMerger {
                                      TezCounter writesCounter,
                                      TezCounter bytesReadCounter,
                                      Progress mergePhase)
-        throws IOException {
+        throws IOException, InterruptedException {
       return merge(keyClass, valueClass, factor, 0, tmpDir,
                    readsCounter, writesCounter, bytesReadCounter, mergePhase);
     }
@@ -625,7 +634,7 @@ public class TezMerger {
                                      TezCounter writesCounter,
                                      TezCounter bytesReadCounter,
                                      Progress mergePhase)
-        throws IOException {
+        throws IOException, InterruptedException {
       LOG.info("Merging " + segments.size() + " sorted segments");
       if (segments.size() == 0) {
         LOG.info("Nothing to merge. Returning an empty iterator");

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 2cbb70a..9783c79 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -193,6 +193,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         spillDone.await();
       }
     } catch (InterruptedException e) {
+      //interrupt spill thread
+      spillThread.interrupt();
+      Thread.currentThread().interrupt();
       throw new IOException("Spill thread failed to initialize", e);
     } finally {
       spillLock.unlock();
@@ -603,6 +606,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                   spillDone.await();
                 }
               } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
                   throw new IOException(
                       "Buffer interrupted while waiting for the writer", e);
               }
@@ -625,9 +629,45 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     }
   }
 
+  void interruptSpillThread() throws IOException {
+    assert !spillLock.isHeldByCurrentThread();
+    // shut down spill thread and wait for it to exit. Since the preceding
+    // ensures that it is finished with its work (and sortAndSpill did not
+    // throw), we elect to use an interrupt instead of setting a flag.
+    // Spilling simultaneously from this thread while the spill thread
+    // finishes its work might be both a useful way to extend this and also
+    // sufficient motivation for the latter approach.
+    try {
+      spillThread.interrupt();
+      spillThread.join();
+    } catch (InterruptedException e) {
+      LOG.info("Spill thread interrupted");
+      //Reset status
+      Thread.currentThread().interrupt();
+      throw new IOException("Spill failed", e);
+    }
+  }
+
   @Override
   public void flush() throws IOException {
     LOG.info("Starting flush of map output");
+    if (Thread.currentThread().isInterrupted()) {
+      /**
+       * Possible that the thread got interrupted when flush was happening or when the flush was
+       * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
+       * on all I/O. At that time, this is safe to cleanup
+       */
+      if (cleanup) {
+        cleanup();
+      }
+      try {
+        interruptSpillThread();
+      } catch(IOException e) {
+        //safe to ignore
+      }
+      return;
+    }
+
     spillLock.lock();
     try {
       while (spillInProgress) {
@@ -656,28 +696,25 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
         sortAndSpill();
       }
     } catch (InterruptedException e) {
+      //Reset status
+      Thread.currentThread().interrupt();
+      interruptSpillThread();
       throw new IOException("Interrupted while waiting for the writer", e);
     } finally {
       spillLock.unlock();
     }
-    assert !spillLock.isHeldByCurrentThread();
-    // shut down spill thread and wait for it to exit. Since the preceding
-    // ensures that it is finished with its work (and sortAndSpill did not
-    // throw), we elect to use an interrupt instead of setting a flag.
-    // Spilling simultaneously from this thread while the spill thread
-    // finishes its work might be both a useful way to extend this and also
-    // sufficient motivation for the latter approach.
-    try {
-      spillThread.interrupt();
-      spillThread.join();
-    } catch (InterruptedException e) {
-      throw new IOException("Spill failed", e);
-    }
-    // release sort buffer before the merge
+
+    interruptSpillThread();
+    // release sort buffer before the mergecl
     //FIXME
     //kvbuffer = null;
 
-    mergeParts();
+    try {
+      mergeParts();
+    } catch (InterruptedException e) {
+      cleanup();
+      Thread.currentThread().interrupt();
+    }
     if (finalMergeEnabled) {
       fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
     }
@@ -715,6 +752,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
           }
         }
       } catch (InterruptedException e) {
+        LOG.info("Spill thread interrupted");
         Thread.currentThread().interrupt();
       } finally {
         spillLock.unlock();
@@ -1085,7 +1123,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
     outputContext.sendEvents(events);
   }
 
-  private void mergeParts() throws IOException {
+  private void mergeParts() throws IOException, InterruptedException {
     // get the approximate size of the final output/index files
     long finalOutFileSize = 0;
     long finalIndexFileSize = 0;

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index d784fcd..49cf102 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -351,6 +351,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 62fa9a5..7fc9317 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -267,6 +267,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 6227fb9..53abc17 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -248,6 +248,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
     confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 08e6ec0..b50f17d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -171,6 +171,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 38450ee..7498627 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -144,6 +144,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
     confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
     confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+    confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
   }
 
   // TODO Maybe add helper methods to extract keys

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index edb9b15..f62179a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -169,28 +169,28 @@ public class TestValuesIterator {
   }
 
   @Test(timeout = 20000)
-  public void testIteratorWithInMemoryReader() throws IOException {
+  public void testIteratorWithInMemoryReader() throws IOException, InterruptedException {
     ValuesIterator iterator = createIterator(true);
     verifyIteratorData(iterator);
   }
 
   @Test(timeout = 20000)
-  public void testIteratorWithIFileReader() throws IOException {
+  public void testIteratorWithIFileReader() throws IOException, InterruptedException {
     ValuesIterator iterator = createIterator(false);
     verifyIteratorData(iterator);
   }
 
   @Test(timeout = 20000)
-  public void testCountedIteratorWithInmemoryReader() throws IOException {
+  public void testCountedIteratorWithInmemoryReader() throws IOException, InterruptedException {
     verifyCountedIteratorReader(true);
   }
 
   @Test(timeout = 20000)
-  public void testCountedIteratorWithIFileReader() throws IOException {
+  public void testCountedIteratorWithIFileReader() throws IOException, InterruptedException {
     verifyCountedIteratorReader(false);
   }
 
-  private void verifyCountedIteratorReader(boolean inMemory) throws IOException {
+  private void verifyCountedIteratorReader(boolean inMemory) throws IOException, InterruptedException {
     TezCounter keyCounter = new GenericCounter("inputKeyCounter", "y3");
     TezCounter tupleCounter = new GenericCounter("inputValuesCounter", "y4");
     ValuesIterator iterator = createCountedIterator(inMemory, keyCounter,
@@ -207,7 +207,7 @@ public class TestValuesIterator {
   }
 
   @Test(timeout = 20000)
-  public void testIteratorWithIFileReaderEmptyPartitions() throws IOException {
+  public void testIteratorWithIFileReaderEmptyPartitions() throws IOException, InterruptedException {
     ValuesIterator iterator = createEmptyIterator(false);
     assertTrue(iterator.moveToNext() == false);
 
@@ -224,7 +224,8 @@ public class TestValuesIterator {
     }
   }
 
-  private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException {
+  private ValuesIterator createEmptyIterator(boolean inMemory)
+      throws IOException, InterruptedException {
     if (!inMemory) {
       streamPaths = new Path[0];
       //This will return EmptyIterator
@@ -323,7 +324,7 @@ public class TestValuesIterator {
    * @return ValuesIterator
    * @throws IOException
    */
-  private ValuesIterator createIterator(boolean inMemory) throws IOException {
+  private ValuesIterator createIterator(boolean inMemory) throws IOException, InterruptedException {
     if (!inMemory) {
       streamPaths = createFiles();
       //Merge all files to get KeyValueIterator
@@ -353,7 +354,8 @@ public class TestValuesIterator {
    * @return ValuesIterator
    * @throws IOException
    */
-  private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter) throws IOException {
+  private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter)
+      throws IOException, InterruptedException {
     if (!inMemory) {
       streamPaths = createFiles();
       //Merge all files to get KeyValueIterator

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 094237a..0faa22a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -28,6 +28,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.UUID;
 
+import com.google.common.collect.Sets;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -166,9 +167,32 @@ public class TestMergeManager {
     Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
   }
 
+  class InterruptingThread implements Runnable {
+
+    MergeManager.OnDiskMerger mergeThread;
+
+    public InterruptingThread(MergeManager.OnDiskMerger mergeThread) {
+      this.mergeThread = mergeThread;
+    }
+
+    @Override public void run() {
+        while(this.mergeThread.tmpDir == null) {
+          //this is tight loop
+        }
+
+        this.mergeThread.interrupt();
+    }
+  }
+
   @Test(timeout = 10000)
-  public void testLocalDiskMergeMultipleTasks() throws IOException {
+  public void testLocalDiskMergeMultipleTasks() throws IOException, InterruptedException {
+    testLocalDiskMergeMultipleTasks(false);
+    testLocalDiskMergeMultipleTasks(true);
+  }
 
+
+  void testLocalDiskMergeMultipleTasks(boolean interruptInMiddle)
+      throws IOException, InterruptedException {
     Configuration conf = new TezConfiguration(defaultConf);
     conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
     conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
@@ -194,6 +218,7 @@ public class TestMergeManager {
         new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null, null,
             t0exceptionReporter, 2000000, null, false, -1);
     MergeManager t0mergeManager = spy(t0mergeManagerReal);
+    t0mergeManager.configureAndStart();
 
     MergeManager t1mergeManagerReal =
         new MergeManager(conf, localFs, localDirAllocator, t1inputContext, null, null, null, null,
@@ -249,30 +274,48 @@ public class TestMergeManager {
     List<FileChunk> t0MergeFiles = new LinkedList<FileChunk>();
     t0MergeFiles.addAll(t0mergeManager.onDiskMapOutputs);
     t0mergeManager.onDiskMapOutputs.clear();
-    t0mergeManager.onDiskMerger.merge(t0MergeFiles);
-    Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size());
-
-
-    t1MapOutput0.commit();
-    t1MapOutput1.commit();
-    verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath());
-    verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath());
-    // Run the OnDiskMerge via MergeManager
-    // Simulate the thread invocation - remove files, and invoke merge
-    List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>();
-    t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs);
-    t1mergeManager.onDiskMapOutputs.clear();
-    t1mergeManager.onDiskMerger.merge(t1MergeFiles);
-    Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size());
 
-    Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(),
-        t1mergeManager.onDiskMapOutputs.iterator().next().getPath());
+    if (!interruptInMiddle) {
+      t0mergeManager.onDiskMerger.merge(t0MergeFiles);
+      Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size());
+    } else {
+
+      //Start Interrupting thread
+      Thread interruptingThread = new Thread(new InterruptingThread(t0mergeManager.onDiskMerger));
+      interruptingThread.start();
+      try {
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
 
-    Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
-        .contains(t0inputContext.getUniqueIdentifier()));
-    Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
-        .contains(t1inputContext.getUniqueIdentifier()));
+      //Will be interrupted in the middle by interruptingThread.
+      t0mergeManager.onDiskMerger.startMerge(Sets.newHashSet(t0MergeFiles));
+      t0mergeManager.onDiskMerger.waitForMerge();
+      Assert.assertNotEquals(1, t0mergeManager.onDiskMapOutputs.size());
+    }
 
+    if (!interruptInMiddle) {
+      t1MapOutput0.commit();
+      t1MapOutput1.commit();
+      verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath());
+      verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath());
+      // Run the OnDiskMerge via MergeManager
+      // Simulate the thread invocation - remove files, and invoke merge
+      List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>();
+      t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs);
+      t1mergeManager.onDiskMapOutputs.clear();
+      t1mergeManager.onDiskMerger.merge(t1MergeFiles);
+      Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size());
+
+      Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(),
+          t1mergeManager.onDiskMapOutputs.iterator().next().getPath());
+
+      Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+          .contains(t0inputContext.getUniqueIdentifier()));
+      Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+          .contains(t1inputContext.getUniqueIdentifier()));
+    }
   }
 
   private InputContext createMockInputContext(String uniqueId) {

http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index bb932f2..b86d054 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -557,7 +557,8 @@ public class TestTezMerger {
    * @return
    * @throws IOException
    */
-  private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException {
+  private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc)
+      throws IOException, InterruptedException {
     TezMerger merger = new TezMerger();
     TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
         LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),