You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/05/06 23:37:34 UTC
tez git commit: TEZ-2419. Inputs/Outputs should inform the Processor
about Interrupts when interrupted during a blocking Op. (sseth)
Repository: tez
Updated Branches:
refs/heads/TEZ-2003 27dc21ed5 -> f694356c6
TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts
when interrupted during a blocking Op. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/f694356c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/f694356c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/f694356c
Branch: refs/heads/TEZ-2003
Commit: f694356c6953beb47fa9504c45dd6a38d03060ed
Parents: 27dc21e
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed May 6 14:37:15 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed May 6 14:37:15 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../tez/mapreduce/lib/MRReaderMapReduce.java | 3 +-
.../tez/mapreduce/lib/MRReaderMapred.java | 3 +
.../apache/tez/mapreduce/output/MROutput.java | 3 +-
.../library/api/IOInterruptedException.java | 40 +++++++
.../tez/runtime/library/api/KeyValueReader.java | 2 +
.../tez/runtime/library/api/KeyValueWriter.java | 2 +
.../runtime/library/api/KeyValuesReader.java | 1 +
.../runtime/library/api/KeyValuesWriter.java | 2 +
.../common/readers/UnorderedKVReader.java | 4 +-
.../common/shuffle/orderedgrouped/Shuffle.java | 6 +-
.../common/sort/impl/ExternalSorter.java | 3 +-
.../common/sort/impl/PipelinedSorter.java | 20 ++--
.../common/sort/impl/dflt/DefaultSorter.java | 11 +-
.../writers/UnorderedPartitionedKVWriter.java | 3 +-
.../input/ConcatenatedMergedKeyValueInput.java | 8 +-
.../input/ConcatenatedMergedKeyValuesInput.java | 8 +-
.../library/input/OrderedGroupedKVInput.java | 12 +-
.../common/readers/TestUnorderedKVReader.java | 22 ++++
.../input/TestOrderedGroupedKVInput.java | 113 +++++++++++++++++++
20 files changed, 246 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7b6ab81..02c9468 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-2419. Inputs/Outputs should inform the Processor about Interrupts when interrupted during a blocking Op.
TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable.
Release 0.7.0: Unreleased
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
index 0495751..5fc3e49 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapReduce.java
@@ -20,6 +20,7 @@ package org.apache.tez.mapreduce.lib;
import java.io.IOException;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.mapred.JobConf;
@@ -116,7 +117,7 @@ public class MRReaderMapReduce extends MRReader {
hasNext = recordReader.nextKeyValue();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException("Interrupted while checking for next key-value", e);
+ throw new IOInterruptedException("Interrupted while checking for next key-value", e);
}
if (hasNext) {
inputRecordCounter.increment(1);
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
index 366e7a7..1bf71f6 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/lib/MRReaderMapred.java
@@ -117,6 +117,9 @@ public class MRReaderMapred extends MRReader {
hasCompletedProcessing();
completedProcessing = true;
}
+ // The underlying reader does not throw InterruptedExceptions. Cannot convert to an
+ // IOInterruptedException without checking the interrupt flag on each request, which is also
+ // not guaranteed. Relying on the user to ensure Interrupts are handled correctly.
return hasNext;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
----------------------------------------------------------------------
diff --git a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
index d19f707..a3b19ed 100644
--- a/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
+++ b/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/output/MROutput.java
@@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -499,7 +500,7 @@ public class MROutput extends AbstractLogicalOutput {
newRecordWriter.write(key, value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException("Interrupted while writing next key-value",e);
+ throw new IOInterruptedException("Interrupted while writing next key-value",e);
}
} else {
oldRecordWriter.write(key, value);
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java
new file mode 100644
index 0000000..776b2a3
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/IOInterruptedException.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.api;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Indicates that an IOOperation was interrupted
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class IOInterruptedException extends IOException {
+
+ public IOInterruptedException(String message) {
+ super(message);
+ }
+
+ public IOInterruptedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public IOInterruptedException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
index d504d08..47f335b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueReader.java
@@ -49,6 +49,7 @@ public abstract class KeyValueReader extends Reader {
* @return true if another key/value(s) pair exists, false if there are no more.
* @throws IOException
* if an error occurs
+ * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
*/
public abstract boolean next() throws IOException;
@@ -63,6 +64,7 @@ public abstract class KeyValueReader extends Reader {
/**
* Returns the current value
* @return the current value
+ *
* @throws IOException
*/
public abstract Object getCurrentValue() throws IOException;
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
index 6acb24b..b5c4294 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValueWriter.java
@@ -39,6 +39,8 @@ public abstract class KeyValueWriter extends Writer {
* the value to write
* @throws IOException
* if an error occurs
+ * @throws {@link IOInterruptedException} if IO was interrupted
+ * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
*/
public abstract void write(Object key, Object value) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
index 510f4b7..7760818 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesReader.java
@@ -49,6 +49,7 @@ public abstract class KeyValuesReader extends Reader {
* @return true if another key/value(s) pair exists, false if there are no more.
* @throws IOException
* if an error occurs
+ * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
*/
public abstract boolean next() throws IOException;
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
index 50fc2d6..9cdde43 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/KeyValuesWriter.java
@@ -38,6 +38,8 @@ public abstract class KeyValuesWriter extends KeyValueWriter {
* @param values
* values to write
* @throws java.io.IOException
+ * @throws {@link IOInterruptedException} if IO was interrupted
+ * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
*/
public abstract void write(Object key, Iterable<Object> values) throws IOException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index fc2e312..a8dd1b2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -20,6 +20,7 @@ package org.apache.tez.runtime.library.common.readers;
import java.io.IOException;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -168,7 +169,6 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
*
* @return true if the next input exists, false otherwise
* @throws IOException
- * @throws InterruptedException
*/
private boolean moveToNextInput() throws IOException {
if (currentReader != null) { // Close the current reader.
@@ -185,7 +185,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for next available input", e);
Thread.currentThread().interrupt();
- throw new IOException(e);
+ throw new IOInterruptedException(e);
}
if (currentFetchedInput == null) {
hasCompletedProcessing();
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index 442f032..76c50e0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -297,6 +297,7 @@ public class Shuffle implements ExceptionReporter {
kvIter = runShuffleFuture.get();
} catch (ExecutionException e) {
Throwable cause = e.getCause();
+ // Processor interrupted while waiting for errors, will see an InterruptedException.
handleThrowable(cause);
}
if (isShutDown.get()) {
@@ -368,7 +369,9 @@ public class Shuffle implements ExceptionReporter {
try {
kvIter = merger.close();
} catch (Throwable e) {
- throw new ShuffleError("Error while doing final merge " , e);
+ // Set the throwable so that future.get() sees the reported errror.
+ throwable.set(e);
+ throw new ShuffleError("Error while doing final merge ", e);
}
mergePhaseTime.setValue(System.currentTimeMillis() - startTime);
@@ -506,6 +509,7 @@ public class Shuffle implements ExceptionReporter {
LOG.info("Already shutdown. Ignoring error");
} else {
LOG.error("ShuffleRunner failed with error", t);
+ // In case of an abort / Interrupt - the runtime makes sure that this is ignored.
inputContext.fatalError(t, "Shuffle Runner Failed");
cleanupIgnoreErrors();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index ca4d889..40d22fe 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -25,6 +25,7 @@ import java.util.Iterator;
import java.util.Map;
import com.google.common.collect.Maps;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -267,7 +268,7 @@ public abstract class ExternalSorter {
combiner.combine(kvIter, writer);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException(e);
+ throw new IOInterruptedException("Combiner interrupted", e);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 030440e..d9de921 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -29,6 +29,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.PriorityQueue;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -37,6 +38,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -341,7 +343,7 @@ public class PipelinedSorter extends ExternalSorter {
mapOutputByteCounter.increment(valend - keystart);
}
- public void spill() throws IOException {
+ public void spill() throws IOException {
// create spill file
final long size = capacity +
+ (partitions * APPROX_HEADER_LENGTH);
@@ -352,7 +354,13 @@ public class PipelinedSorter extends ExternalSorter {
FSDataOutputStream out = rfs.create(filename, true, 4096);
try {
- merger.ready(); // wait for all the future results from sort threads
+ try {
+ merger.ready(); // wait for all the future results from sort threads
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOG.info("Interrupted while waiting for mergers to complete");
+ throw new IOInterruptedException("Interrupted while waiting for mergers to complete", e);
+ }
LOG.info("Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
if (isThreadInterrupted()) {
@@ -391,9 +399,6 @@ public class PipelinedSorter extends ExternalSorter {
//TODO: honor cache limits
indexCacheList.add(spillRec);
++numSpills;
- } catch(InterruptedException ie) {
- // TODO:the combiner has been interrupted
- Thread.currentThread().interrupt();
} finally {
out.close();
}
@@ -568,6 +573,7 @@ public class PipelinedSorter extends ExternalSorter {
cleanup();
}
Thread.currentThread().interrupt();
+ throw new IOInterruptedException("Interrupted while closing Output", ie);
}
}
@@ -1046,7 +1052,7 @@ public class PipelinedSorter extends ExternalSorter {
iter = futureIter.get();
this.add(iter);
}
-
+
StringBuilder sb = new StringBuilder();
for(SpanIterator sp: heap) {
sb.append(sp.toString());
@@ -1056,7 +1062,7 @@ public class PipelinedSorter extends ExternalSorter {
}
LOG.info("Heap = " + sb.toString());
return true;
- } catch(Exception e) {
+ } catch(ExecutionException e) {
LOG.info(e.toString());
return false;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 9783c79..afe07f0 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -32,6 +32,7 @@ import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import org.apache.commons.lang.exception.ExceptionUtils;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -607,7 +608,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException(
+ throw new IOInterruptedException(
"Buffer interrupted while waiting for the writer", e);
}
}
@@ -644,7 +645,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
LOG.info("Spill thread interrupted");
//Reset status
Thread.currentThread().interrupt();
- throw new IOException("Spill failed", e);
+ throw new IOInterruptedException("Spill failed", e);
}
}
@@ -769,7 +770,11 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
+ " failed : " + ExceptionUtils.getStackTrace(lspillException);
outputContext.fatalError(lspillException, logMsg);
}
- throw new IOException("Spill failed", lspillException);
+ if (lspillException instanceof InterruptedException) {
+ throw new IOInterruptedException("Spill failed", lspillException);
+ } else {
+ throw new IOException("Spill failed", lspillException);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
index 37d8be6..9a98cd1 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/writers/UnorderedPartitionedKVWriter.java
@@ -54,6 +54,7 @@ import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.Constants;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
@@ -354,7 +355,7 @@ public class UnorderedPartitionedKVWriter extends BaseUnorderedPartitionedKVWrit
return availableBuffers.take();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting for next buffer", e);
+ throw new IOInterruptedException("Interrupted while waiting for next buffer", e);
}
}
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
index 14b1e2c..45784d9 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValueInput.java
@@ -64,7 +64,13 @@ public class ConcatenatedMergedKeyValueInput extends MergedLogicalInput {
currentReader = (KeyValueReader) reader;
currentReaderIndex++;
} catch (Exception e) {
- throw new IOException(e);
+ // An InterruptedException is not expected here since this works off of
+ // underlying readers which take care of throwing IOInterruptedExceptions
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else {
+ throw new IOException(e);
+ }
}
}
return true;
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
index 2a1e4c6..27ff324 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/ConcatenatedMergedKeyValuesInput.java
@@ -65,7 +65,13 @@ public class ConcatenatedMergedKeyValuesInput extends MergedLogicalInput {
currentReader = (KeyValuesReader) reader;
currentReaderIndex++;
} catch (Exception e) {
- throw new IOException(e);
+ // An InterruptedException is not expected here since this works off of
+ // underlying readers which take care of throwing IOInterruptedExceptions
+ if (e instanceof IOException) {
+ throw (IOException)e;
+ } else {
+ throw new IOException(e);
+ }
}
}
return true;
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 49cf102..12a5955 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -27,6 +27,8 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -121,7 +123,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
if (!isStarted.get()) {
memoryUpdateCallbackHandler.validateUpdateReceived();
// Start the shuffle - copy and merge
- shuffle = new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned());
+ shuffle = createShuffle();
shuffle.run();
if (LOG.isDebugEnabled()) {
LOG.debug("Initialized the handlers in shuffle..Safe to start processing..");
@@ -137,6 +139,11 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
}
}
+ @VisibleForTesting
+ Shuffle createShuffle() throws IOException {
+ return new Shuffle(getContext(), conf, getNumPhysicalInputs(), memoryUpdateCallbackHandler.getMemoryAssigned());
+ }
+
/**
* Check if the input is ready for consumption
*
@@ -207,6 +214,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
* previous K-V pair will throw an Exception
*
* @return a KVReader over the sorted input.
+ * @throws {@link IOInterruptedException} if IO was performing a blocking operation and was interrupted
*/
@Override
public KeyValuesReader getReader() throws IOException, TezException {
@@ -240,7 +248,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
waitForInputReady();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- throw new IOException("Interrupted while waiting for input ready", e);
+ throw new IOInterruptedException("Interrupted while waiting for input ready", e);
}
}
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
index 51ea42d..80bdc42 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/readers/TestUnorderedKVReader.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.io.Text;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
@@ -48,6 +49,7 @@ import java.util.LinkedList;
import static junit.framework.TestCase.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -165,4 +167,24 @@ public class TestUnorderedKVReader {
}
}
+ @Test(timeout = 5000)
+ public void testInterruptOnNext() throws IOException, InterruptedException {
+ ShuffleManager shuffleManager = mock(ShuffleManager.class);
+
+ // Simulate an interrupt while waiting for the next fetched input.
+ doThrow(new InterruptedException()).when(shuffleManager).getNextInput();
+ TezCounters counters = new TezCounters();
+ TezCounter inputRecords = counters.findCounter(TaskCounter.INPUT_RECORDS_PROCESSED);
+ UnorderedKVReader<Text, Text> reader =
+ new UnorderedKVReader<Text, Text>(shuffleManager, defaultConf, null, false, -1, -1,
+ inputRecords);
+
+ try {
+ reader.next();
+ fail("No data available to reader. Should not be able to access any record");
+ } catch (IOInterruptedException e) {
+ // Expected exception. Any other should fail the test.
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/f694356c/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
new file mode 100644
index 0000000..d4be802
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/input/TestOrderedGroupedKVInput.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.runtime.library.input;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.common.TezUtils;
+import org.apache.tez.common.counters.TezCounters;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.UserPayload;
+import org.apache.tez.runtime.api.InputContext;
+import org.apache.tez.runtime.library.api.IOInterruptedException;
+import org.apache.tez.runtime.library.common.MemoryUpdateCallbackHandler;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.Shuffle;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+public class TestOrderedGroupedKVInput {
+
+ @Test(timeout = 5000)
+ public void testInterruptWhileAwaitingInput() throws IOException, TezException {
+
+ InputContext inputContext = createMockInputContext();
+ OrderedGroupedKVInput kvInput = new OrderedGroupedKVInputForTest(inputContext, 10);
+ kvInput.initialize();
+
+ kvInput.start();
+
+ try {
+ kvInput.getReader();
+ Assert.fail("getReader should not return since underlying inputs are not ready");
+ } catch (IOException e) {
+ Assert.assertTrue(e instanceof IOInterruptedException);
+ }
+
+ }
+
+
+ private InputContext createMockInputContext() throws IOException {
+ InputContext inputContext = mock(InputContext.class);
+ Configuration conf = new TezConfiguration();
+ UserPayload payLoad = TezUtils.createUserPayloadFromConf(conf);
+ String[] workingDirs = new String[]{"workDir1"};
+ TezCounters counters = new TezCounters();
+
+
+ doReturn(payLoad).when(inputContext).getUserPayload();
+ doReturn(workingDirs).when(inputContext).getWorkDirs();
+ doReturn(200 * 1024 * 1024l).when(inputContext).getTotalMemoryAvailableToTask();
+ doReturn(counters).when(inputContext).getCounters();
+
+ doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ Object[] args = invocation.getArguments();
+
+ if (args[1] instanceof MemoryUpdateCallbackHandler) {
+ MemoryUpdateCallbackHandler memUpdateCallbackHandler =
+ (MemoryUpdateCallbackHandler) args[1];
+ memUpdateCallbackHandler.memoryAssigned(200 * 1024 * 1024);
+ } else {
+ Assert.fail();
+ }
+ return null;
+ }
+ }).when(inputContext).requestInitialMemory(any(long.class),
+ any(MemoryUpdateCallbackHandler.class));
+
+ return inputContext;
+ }
+
+ static class OrderedGroupedKVInputForTest extends OrderedGroupedKVInput {
+
+ public OrderedGroupedKVInputForTest(InputContext inputContext, int numPhysicalInputs) {
+ super(inputContext, numPhysicalInputs);
+ }
+
+ Shuffle createShuffle() throws IOException {
+ Shuffle shuffle = mock(Shuffle.class);
+ try {
+ doThrow(new InterruptedException()).when(shuffle).waitForInput();
+ } catch (InterruptedException e) {
+ Assert.fail();
+ } catch (TezException e) {
+ Assert.fail();
+ }
+ return shuffle;
+ }
+ }
+
+}