You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/09 03:42:44 UTC

[06/43] tez git commit: TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts when interrupted during a blocking Op. (sseth)

TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts
when interrupted during a blocking Op. (sseth)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/55308630
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/55308630
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/55308630

Branch: refs/heads/TEZ-2003
Commit: 55308630b6354ce070550d1ea4efbedbbae8e13a
Parents: 7476fae
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 6 14:39:08 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 14:39:08 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |   1 +
 .../tez/mapreduce/lib/MRReaderMapReduce.java    |   3 +-
 .../tez/mapreduce/lib/MRReaderMapred.java       |   3 +
 .../apache/tez/mapreduce/output/MROutput.java   |   3 +-
 .../library/api/IOInterruptedException.java     |  40 +++++++
 .../tez/runtime/library/api/KeyValueReader.java |   2 +
 .../tez/runtime/library/api/KeyValueWriter.java |   2 +
 .../runtime/library/api/KeyValuesReader.java    |   1 +
 .../runtime/library/api/KeyValuesWriter.java    |   2 +
 .../common/readers/UnorderedKVReader.java       |   4 +-
 .../common/shuffle/orderedgrouped/Shuffle.java  |   6 +-
 .../common/sort/impl/ExternalSorter.java        |   3 +-
 .../common/sort/impl/PipelinedSorter.java       |  20 ++--
 .../common/sort/impl/dflt/DefaultSorter.java    |  11 +-
 .../writers/UnorderedPartitionedKVWriter.java   |   3 +-
 .../input/ConcatenatedMergedKeyValueInput.java  |   8 +-
 .../input/ConcatenatedMergedKeyValuesInput.java |   8 +-
 .../library/input/OrderedGroupedKVInput.java    |  12 +-
 .../common/readers/TestUnorderedKVReader.java   |  22 ++++
 .../input/TestOrderedGroupedKVInput.java        | 113 +++++++++++++++++++
 20 files changed, 246 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index fd45454..c865f12 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.0: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts when interrupted during a blocking Op.
   TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable.
 
 Release 0.7.0: Unreleased

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 0495751..5fc3e49 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.lib;
 
 import java.io.IOException;
 
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.mapred.JobConf;
@@ -116,7 +117,7 @@ public class MRReaderMapReduce extends MRReader {
       hasNext = recordReader.nextKeyValue();
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw new IOException("Interrupted while checking for next key-value", e);
+      throw new IOInterruptedException("Interrupted while checking for next key-value", e);
     }
     if (hasNext) {
       inputRecordCounter.increment(1);

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index 366e7a7..1bf71f6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -117,6 +117,9 @@ public class MRReaderMapred extends MRReader {
       hasCompletedProcessing();
       completedProcessing = true;
     }
+    // The underlying reader does not throw InterruptedExceptions. Cannot convert to an
+    // IOInterruptedException without checking the interrupt flag on each request, which is also
+    // not guaranteed. Relying on the user to ensure Interrupts are handled correctly.
     return hasNext;
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index d19f707..a3b19ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.commons.lang.StringUtils;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -499,7 +500,7 @@ public class MROutput extends AbstractLogicalOutput {
             newRecordWriter.write(key, value);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            throw new IOException("Interrupted while writing next key-value",e);
+            throw new IOInterruptedException("Interrupted while writing next key-value",e);
           }
         } else {
           oldRecordWriter.write(key, value);

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java
new file mode 100644
index 0000000..776b2a3
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Indicates that an IOOperation was interrupted
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IOInterruptedException extends IOException {
+
+  public IOInterruptedException(String message) {
+    super(message);
+  }
+
+  public IOInterruptedException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public IOInterruptedException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
index d504d08..47f335b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
@@ -49,6 +49,7 @@ public abstract class KeyValueReader extends Reader {
    * @return true if another key/value(s) pair exists, false if there are no more.
    * @throws IOException
    *           if an error occurs
+   * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
    */
   public abstract boolean next() throws IOException;
 
@@ -63,6 +64,7 @@ public abstract class KeyValueReader extends Reader {
   /**
    * Returns the current value
    * @return the current value
+   *
    * @throws IOException
    */
   public abstract Object getCurrentValue() throws IOException;

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
index 6acb24b..b5c4294 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
@@ -39,6 +39,8 @@ public abstract class KeyValueWriter extends Writer {
    *          the value to write
    * @throws IOException
    *           if an error occurs
+   * @throws {@link IOInterruptedException} if IO was interrupted
+   * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
    */
   public abstract void write(Object key, Object value) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
index 510f4b7..7760818 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
@@ -49,6 +49,7 @@ public abstract class KeyValuesReader extends Reader {
    * @return true if another key/value(s) pair exists, false if there are no more.
    * @throws IOException
    *           if an error occurs
+   * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
    */
   public abstract boolean next() throws IOException;
 

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
index 50fc2d6..9cdde43 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
@@ -38,6 +38,8 @@ public abstract class KeyValuesWriter extends KeyValueWriter {
    * @param values
    *          values to write
    * @throws java.io.IOException
+   * @throws {@link IOInterruptedException} if IO was interrupted
+   * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
    */
   public abstract void write(Object key, Iterable<Object> values) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index fc2e312..a8dd1b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.readers;
 
 import java.io.IOException;
 
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -168,7 +169,6 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
    * 
    * @return true if the next input exists, false otherwise
    * @throws IOException
-   * @throws InterruptedException
    */
   private boolean moveToNextInput() throws IOException {
     if (currentReader != null) { // Close the current reader.
@@ -185,7 +185,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
     } catch (InterruptedException e) {
       LOG.warn("Interrupted while waiting for next available input", e);
       Thread.currentThread().interrupt();
-      throw new IOException(e);
+      throw new IOInterruptedException(e);
     }
     if (currentFetchedInput == null) {
       hasCompletedProcessing();

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index ee05378..cb12a63 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -305,6 +305,7 @@ public class Shuffle implements ExceptionReporter {
       kvIter = runShuffleFuture.get();
     } catch (ExecutionException e) {
       Throwable cause = e.getCause();
+      // Processor interrupted while waiting for errors, will see an InterruptedException.
       handleThrowable(cause);
     }
     if (isShutDown.get()) {
@@ -375,7 +376,9 @@ public class Shuffle implements ExceptionReporter {
       try {
         kvIter = merger.close();
       } catch (Throwable e) {
-        throw new ShuffleError("Error while doing final merge " , e);
+        // Set the throwable so that future.get() sees the reported errror.
+        throwable.set(e);
+        throw new ShuffleError("Error while doing final merge ", e);
       }
       mergePhaseTime.setValue(System.currentTimeMillis() - startTime);
 
@@ -513,6 +516,7 @@ public class Shuffle implements ExceptionReporter {
         LOG.info("Already shutdown. Ignoring error");
       } else {
         LOG.error("ShuffleRunner failed with error", t);
+        // In case of an abort / Interrupt - the runtime makes sure that this is ignored.
         inputContext.fatalError(t, "Shuffle Runner Failed");
         cleanupIgnoreErrors();
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index ca4d889..40d22fe 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
 import java.util.Map;
 
 import com.google.common.collect.Maps;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -267,7 +268,7 @@ public abstract class ExternalSorter {
       combiner.combine(kvIter, writer);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw new IOException(e);
+      throw new IOInterruptedException("Combiner interrupted", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 030440e..d9de921 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.ListIterator;
 import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -37,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -341,7 +343,7 @@ public class PipelinedSorter extends ExternalSorter {
     mapOutputByteCounter.increment(valend - keystart);
   }
 
-  public void spill() throws IOException { 
+  public void spill() throws IOException {
     // create spill file
     final long size = capacity +
         + (partitions * APPROX_HEADER_LENGTH);
@@ -352,7 +354,13 @@ public class PipelinedSorter extends ExternalSorter {
     FSDataOutputStream out = rfs.create(filename, true, 4096);
 
     try {
-      merger.ready(); // wait for all the future results from sort threads
+      try {
+        merger.ready(); // wait for all the future results from sort threads
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        LOG.info("Interrupted while waiting for mergers to complete");
+        throw new IOInterruptedException("Interrupted while waiting for mergers to complete", e);
+      }
       LOG.info("Spilling to " + filename.toString());
       for (int i = 0; i < partitions; ++i) {
         if (isThreadInterrupted()) {
@@ -391,9 +399,6 @@ public class PipelinedSorter extends ExternalSorter {
       //TODO: honor cache limits
       indexCacheList.add(spillRec);
       ++numSpills;
-    } catch(InterruptedException ie) {
-      // TODO:the combiner has been interrupted
-      Thread.currentThread().interrupt();
     } finally {
       out.close();
     }
@@ -568,6 +573,7 @@ public class PipelinedSorter extends ExternalSorter {
         cleanup();
       }
       Thread.currentThread().interrupt();
+      throw new IOInterruptedException("Interrupted while closing Output", ie);
     }
   }
 
@@ -1046,7 +1052,7 @@ public class PipelinedSorter extends ExternalSorter {
           iter = futureIter.get();
           this.add(iter);
         }
-        
+
         StringBuilder sb = new StringBuilder();
         for(SpanIterator sp: heap) {
             sb.append(sp.toString());
@@ -1056,7 +1062,7 @@ public class PipelinedSorter extends ExternalSorter {
         }
         LOG.info("Heap = " + sb.toString());
         return true;
-      } catch(Exception e) {
+      } catch(ExecutionException e) {
         LOG.info(e.toString());
         return false;
       }

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 9783c79..afe07f0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -607,7 +608,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
                 }
               } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
-                  throw new IOException(
+                  throw new IOInterruptedException(
                       "Buffer interrupted while waiting for the writer", e);
               }
             }
@@ -644,7 +645,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
       LOG.info("Spill thread interrupted");
       //Reset status
       Thread.currentThread().interrupt();
-      throw new IOException("Spill failed", e);
+      throw new IOInterruptedException("Spill failed", e);
     }
   }
 
@@ -769,7 +770,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
             + " failed : " + ExceptionUtils.getStackTrace(lspillException);
         outputContext.fatalError(lspillException, logMsg);
       }
-      throw new IOException("Spill failed", lspillException);
+      if (lspillException instanceof InterruptedException) {
+        throw new IOInterruptedException("Spill failed", lspillException);
+      } else {
+        throw new IOException("Spill failed", lspillException);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 37d8be6..9a98cd1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -54,6 +54,7 @@ import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.OutputContext;
 import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.Constants;
 import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -354,7 +355,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
           return availableBuffers.take();
         } catch (InterruptedException e) {
           Thread.currentThread().interrupt();
-          throw new IOException("Interrupted while waiting for next buffer", e);
+          throw new IOInterruptedException("Interrupted while waiting for next buffer", e);
         }
       }
     } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 14b1e2c..45784d9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -64,7 +64,13 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
           currentReader = (KeyValueReader) reader;
           currentReaderIndex++;
         } catch (Exception e) {
-          throw new IOException(e);
+          // An InterruptedException is not expected here since this works off of
+          // underlying readers which take care of throwing IOInterruptedExceptions
+          if (e instanceof IOException) {
+            throw (IOException) e;
+          } else {
+            throw new IOException(e);
+          }
         }
       }
       return true;

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 2a1e4c6..27ff324 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -65,7 +65,13 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
           currentReader = (KeyValuesReader) reader;
           currentReaderIndex++;
         } catch (Exception e) {
-          throw new IOException(e);
+          // An InterruptedException is not expected here since this works off of
+          // underlying readers which take care of throwing IOInterruptedExceptions
+          if (e instanceof IOException) {
+            throw (IOException)e;
+          } else {
+            throw new IOException(e);
+          }
         }
       }
       return true;

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 49cf102..12a5955 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -27,6 +27,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -121,7 +123,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     if (!isStarted.get()) {
       memoryUpdateCallbackHandler.validateUpdateReceived();
       // Start the shuffle - copy and merge
-      shuffle = new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned());
+      shuffle = createShuffle();
       shuffle.run();
       if (LOG.isDebugEnabled()) {
         LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
@@ -137,6 +139,11 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
     }
   }
 
+  @VisibleForTesting
+  Shuffle createShuffle() throws IOException {
+    return new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned());
+  }
+
   /**
    * Check if the input is ready for consumption
    *
@@ -207,6 +214,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
    * previous K-V pair will throw an Exception
    *
    * @return a KVReader over the sorted input.
+   * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
    */
   @Override
   public KeyValuesReader getReader() throws IOException, TezException {
@@ -240,7 +248,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
         waitForInputReady();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        throw new IOException("Interrupted while waiting for input ready", e);
+        throw new IOInterruptedException("Interrupted while waiting for input ready", e);
       }
     }
     @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index 51ea42d..80bdc42 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
 import org.apache.tez.common.counters.TaskCounter;
 import org.apache.tez.common.counters.TezCounter;
 import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
 import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
@@ -48,6 +49,7 @@ import java.util.LinkedList;
 import static junit.framework.TestCase.fail;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 
@@ -165,4 +167,24 @@ public class TestUnorderedKVReader {
     }
   }
 
+  @Test(timeout = 5000)
+  public void testInterruptOnNext() throws IOException, InterruptedException {
+    ShuffleManager shuffleManager = mock(ShuffleManager.class);
+
+    // Simulate an interrupt while waiting for the next fetched input.
+    doThrow(new InterruptedException()).when(shuffleManager).getNextInput();
+    TezCounters counters = new TezCounters();
+    TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+    UnorderedKVReader<Text, Text> reader =
+        new UnorderedKVReader<Text, Text>(shuffleManager, defaultConf, null, false, -1, -1,
+            inputRecords);
+
+    try {
+      reader.next();
+      fail("No data available to reader. Should not be able to access any record");
+    } catch (IOInterruptedException e) {
+      // Expected exception. Any other should fail the test.
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/tez/blob/55308630/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
new file mode 100644
index 0000000..d4be802
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestOrderedGroupedKVInput {
+
+  @Test(timeout = 5000)
+  public void testInterruptWhileAwaitingInput() throws IOException, TezException {
+
+    InputContext inputContext = createMockInputContext();
+    OrderedGroupedKVInput kvInput = new OrderedGroupedKVInputForTest(inputContext, 10);
+    kvInput.initialize();
+
+    kvInput.start();
+
+    try {
+      kvInput.getReader();
+      Assert.fail("getReader should not return since underlying inputs are not ready");
+    } catch (IOException e) {
+      Assert.assertTrue(e instanceof IOInterruptedException);
+    }
+
+  }
+
+
+  private InputContext createMockInputContext() throws IOException {
+    InputContext inputContext = mock(InputContext.class);
+    Configuration conf = new TezConfiguration();
+    UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+    String[] workingDirs = new String[]{"workDir1"};
+    TezCounters counters = new TezCounters();
+
+
+    doReturn(payLoad).when(inputContext).getUserPayload();
+    doReturn(workingDirs).when(inputContext).getWorkDirs();
+    doReturn(200 * 1024 * 1024l).when(inputContext).getTotalMemoryAvailableToTask();
+    doReturn(counters).when(inputContext).getCounters();
+
+    doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        Object[] args = invocation.getArguments();
+
+        if (args[1] instanceof MemoryUpdateCallbackHandler) {
+          MemoryUpdateCallbackHandler memUpdateCallbackHandler =
+              (MemoryUpdateCallbackHandler) args[1];
+          memUpdateCallbackHandler.memoryAssigned(200 * 1024 * 1024);
+        } else {
+          Assert.fail();
+        }
+        return null;
+      }
+    }).when(inputContext).requestInitialMemory(any(long.class),
+        any(MemoryUpdateCallbackHandler.class));
+
+    return inputContext;
+  }
+
+  static class OrderedGroupedKVInputForTest extends OrderedGroupedKVInput {
+
+    public OrderedGroupedKVInputForTest(InputContext inputContext, int numPhysicalInputs) {
+      super(inputContext, numPhysicalInputs);
+    }
+
+    Shuffle createShuffle() throws IOException {
+      Shuffle shuffle = mock(Shuffle.class);
+      try {
+        doThrow(new InterruptedException()).when(shuffle).waitForInput();
+      } catch (InterruptedException e) {
+        Assert.fail();
+      } catch (TezException e) {
+        Assert.fail();
+      }
+      return shuffle;
+    }
+  }
+
+}