You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/06 00:13:51 UTC
tez git commit: TEZ-1752. Inputs / Outputs in the Runtime library
should be interruptable (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 8c44f2484 -> 146ab0702
TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/146ab070
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/146ab070
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/146ab070
Branch: refs/heads/master
Commit: 146ab0702a25cda7020de936e270e291ca567e3c
Parents: 8c44f24
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed May 6 03:43:49 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed May 6 03:43:49 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../runtime/api/AbstractLogicalIOProcessor.java | 4 +
.../api/ProcessorFrameworkInterface.java | 11 +-
.../library/api/TezRuntimeConfiguration.java | 11 +
.../common/readers/UnorderedKVReader.java | 1 +
.../runtime/library/common/shuffle/Fetcher.java | 1 +
.../common/shuffle/impl/ShuffleManager.java | 5 +
.../orderedgrouped/FetcherOrderedGrouped.java | 7 +-
.../shuffle/orderedgrouped/MergeManager.java | 124 +++++++--
.../shuffle/orderedgrouped/MergeThread.java | 18 +-
.../common/shuffle/orderedgrouped/Shuffle.java | 16 +-
.../orderedgrouped/ShuffleScheduler.java | 1 +
.../common/sort/impl/ExternalSorter.java | 38 +++
.../common/sort/impl/PipelinedSorter.java | 261 +++++++++++--------
.../library/common/sort/impl/TezMerger.java | 31 ++-
.../common/sort/impl/dflt/DefaultSorter.java | 70 +++--
.../library/input/OrderedGroupedKVInput.java | 1 +
.../runtime/library/input/UnorderedKVInput.java | 1 +
.../output/OrderedPartitionedKVOutput.java | 1 +
.../library/output/UnorderedKVOutput.java | 1 +
.../output/UnorderedPartitionedKVOutput.java | 1 +
.../library/common/TestValuesIterator.java | 20 +-
.../orderedgrouped/TestMergeManager.java | 87 +++++--
.../library/common/sort/impl/TestTezMerger.java | 3 +-
24 files changed, 518 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index d7a1e1f..7ba8021 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-1993. Implement a pluggable InputSizeEstimator for grouping fairly
ALL CHANGES:
+ TEZ-1752. Inputs / Outputs in the Runtime library should be interruptable.
TEZ-2401. Tez UI: All-dag page has duration keep counting for KILLED dag.
TEZ-2392. Have all readers throw an Exception on incorrect next() usage.
TEZ-2408. TestTaskAttempt fails to compile against hadoop-2.4 and hadoop-2.2.
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
index 7714321..5a4cbe8 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/AbstractLogicalIOProcessor.java
@@ -49,4 +49,8 @@ public abstract class AbstractLogicalIOProcessor implements LogicalIOProcessor,
return context;
}
+ @Override
+ public void abort() {
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
index f0ba9c9..89d4e3c 100644
--- a/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
+++ b/tez-api/src/main/java/org/apache/tez/runtime/api/ProcessorFrameworkInterface.java
@@ -18,9 +18,11 @@
package org.apache.tez.runtime.api;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
import java.util.List;
-import org.apache.hadoop.classification.InterfaceAudience.Public;
/**
* Represents the Tez framework part of an {@link org.apache.tez.runtime.api.Processor}.
@@ -56,4 +58,11 @@ public interface ProcessorFrameworkInterface {
* if an error occurs
*/
public void close() throws Exception;
+
+ /**
+ * Indicates <code>Processor</code> to abort. Cleanup can be done.
+ *
+ */
+ @Unstable
+ public void abort();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index a818de8..3d9a701 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -311,6 +311,16 @@ public class TezRuntimeConfiguration {
*/
public static final boolean TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH_DEFAULT = false;
+ /**
+ * Used only for internal testing. Strictly not recommended to be used elsewhere. This
+ * parameter could be changed/dropped later.
+ */
+ @Unstable
+ @Private
+ public static final String TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT = TEZ_RUNTIME_PREFIX
+ + "cleanup.files.on.interrupt";
+ public static final boolean TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT = false;
+
// TODO TEZ-1233 - allow this property to be set per vertex
// TODO TEZ-1231 - move these properties out since they are not relevant for Inputs / Outputs
@@ -374,6 +384,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_OPTIMIZE_SHARED_FETCH);
tezRuntimeKeys.add(TEZ_RUNTIME_CONVERT_USER_PAYLOAD_TO_HISTORY_TEXT);
tezRuntimeKeys.add(TEZ_RUNTIME_SORTER_CLASS);
+ tezRuntimeKeys.add(TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
defaultConf.addResource("core-default.xml");
defaultConf.addResource("core-site.xml");
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
index b14a461..fc2e312 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/readers/UnorderedKVReader.java
@@ -184,6 +184,7 @@ public class UnorderedKVReader<K, V> extends KeyValueReader {
currentFetchedInput = shuffleManager.getNextInput();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for next available input", e);
+ Thread.currentThread().interrupt();
throw new IOException(e);
}
if (currentFetchedInput == null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 3154943..48fe0f2 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -376,6 +376,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// fall back to HTTP fetch below
LOG.warn("Double locking detected for " + host);
} catch (InterruptedException sleepInterrupted) {
+ Thread.currentThread().interrupt();
// fall back to HTTP fetch below
LOG.warn("Lock was interrupted for " + host);
} finally {
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index 749143a..d47e652 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -748,6 +748,11 @@ public class ShuffleManager implements FetcherCallback {
/////////////////// End of Methods from FetcherCallbackHandler
public void shutdown() throws InterruptedException {
+ if (Thread.currentThread().isInterrupted()) {
+ //TODO: need to cleanup all FetchedInput (DiskFetchedInput, LocalDisFetchedInput), lockFile
+ //As of now relying on job cleanup (when all directories would be cleared)
+ LOG.info("Thread interrupted. Need to cleanup the local dirs");
+ }
if (!isShutdown.getAndSet(true)) {
// Shut down any pending fetchers
LOG.info("Shutting down pending fetchers on source" + srcNameTrimmed + ": "
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 8d20aa7..fbaabff 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -177,6 +177,9 @@ class FetcherOrderedGrouped extends Thread {
}
}
} catch (InterruptedException ie) {
+ //TODO: might not be respected when fetcher is in progress / server is busy. TEZ-711
+ //Set the status back
+ Thread.currentThread().interrupt();
return;
} catch (Throwable t) {
shuffle.reportException(t);
@@ -191,7 +194,9 @@ class FetcherOrderedGrouped extends Thread {
try {
join(5000);
} catch (InterruptedException ie) {
- LOG.warn("Got interrupt while joining " + getName(), ie);
+ //Reset the status
+ Thread.currentThread().interrupt();
+ LOG.warn("Got interrupt while joining " + getName());
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
index 2e6ebd9..5a35f2f 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeManager.java
@@ -19,6 +19,7 @@ package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
@@ -137,6 +138,8 @@ public class MergeManager {
private AtomicInteger mergeFileSequenceId = new AtomicInteger(0);
+ private final boolean cleanup;
+
/**
* Construct the MergeManager. Must call start before it becomes usable.
*/
@@ -174,6 +177,9 @@ public class MergeManager {
this.additionalBytesWritten = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_WRITTEN);
this.additionalBytesRead = inputContext.getCounters().findCounter(TaskCounter.ADDITIONAL_SPILLS_BYTES_READ);
+ this.cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
+
this.codec = codec;
this.ifileReadAhead = ifileReadAheadEnabled;
if (this.ifileReadAhead) {
@@ -514,27 +520,61 @@ public class MergeManager {
public boolean isMergeComplete() {
return finalMergeComplete;
}
-
+
public TezRawKeyValueIterator close() throws Throwable {
// Wait for on-going merges to complete
- if (memToMemMerger != null) {
+ if (memToMemMerger != null) {
memToMemMerger.close();
}
inMemoryMerger.close();
onDiskMerger.close();
-
- List<MapOutput> memory =
+
+ List<MapOutput> memory =
new ArrayList<MapOutput>(inMemoryMergedMapOutputs);
inMemoryMergedMapOutputs.clear();
memory.addAll(inMemoryMapOutputs);
inMemoryMapOutputs.clear();
List<FileChunk> disk = new ArrayList<FileChunk>(onDiskMapOutputs);
onDiskMapOutputs.clear();
- TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
- this.finalMergeComplete = true;
- return kvIter;
+ try {
+ TezRawKeyValueIterator kvIter = finalMerge(conf, rfs, memory, disk);
+ this.finalMergeComplete = true;
+ return kvIter;
+ } catch(InterruptedException e) {
+ //Cleanup the disk segments
+ if (cleanup) {
+ cleanup(localFS, disk);
+ cleanup(localFS, onDiskMapOutputs);
+ }
+ Thread.currentThread().interrupt(); //reset interrupt status
+ throw e;
+ }
+ }
+
+
+ static void cleanup(FileSystem fs, Collection<FileChunk> fileChunkList) {
+ for (FileChunk fileChunk : fileChunkList) {
+ cleanup(fs, fileChunk.getPath());
+ }
}
-
+
+ static void cleanup(FileSystem fs, Path path) {
+ if (path == null) {
+ return;
+ }
+
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Deleting " + path);
+ }
+ fs.delete(path, true);
+ } catch (IOException e) {
+ LOG.info("Error in deleting " + path);
+ }
+ }
+
+
+
void runCombineProcessor(TezRawKeyValueIterator kvIter, Writer writer)
throws IOException, InterruptedException {
combiner.combine(kvIter, writer);
@@ -555,7 +595,7 @@ public class MergeManager {
}
@Override
- public void merge(List<MapOutput> inputs) throws IOException {
+ public void merge(List<MapOutput> inputs) throws IOException, InterruptedException {
if (inputs == null || inputs.size() == 0) {
return;
}
@@ -597,13 +637,28 @@ public class MergeManager {
// Note the output of the merge
closeInMemoryMergedFile(mergedMapOutputs);
}
+
+ @Override
+ public void cleanup(List<MapOutput> inputs, boolean deleteData) throws IOException,
+ InterruptedException {
+ //No OP
+ }
}
/**
* Merges multiple in-memory segment to a disk segment
*/
private class InMemoryMerger extends MergeThread<MapOutput> {
-
+
+ @VisibleForTesting
+ volatile InputAttemptIdentifier srcTaskIdentifier;
+
+ @VisibleForTesting
+ volatile Path outputPath;
+
+ @VisibleForTesting
+ volatile Path tmpDir;
+
public InMemoryMerger(MergeManager manager) {
super(manager, Integer.MAX_VALUE, exceptionReporter);
setName("MemtoDiskMerger [" + TezUtilsInternal
@@ -628,7 +683,7 @@ public class MergeManager {
//in the merge method)
//figure out the mapId
- InputAttemptIdentifier srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
+ srcTaskIdentifier = inputs.get(0).getAttemptIdentifier();
List<Segment> inMemorySegments = new ArrayList<Segment>();
long mergeOutputSize =
@@ -639,7 +694,7 @@ public class MergeManager {
// All disk writes done by this merge are overhead - due to the lac of
// adequate memory to keep all segments in memory.
- Path outputPath = mapOutputFile.getInputFileForWrite(
+ outputPath = mapOutputFile.getInputFileForWrite(
srcTaskIdentifier.getInputIdentifier().getInputIndex(), srcTaskIdentifier.getSpillEventId(),
mergeOutputSize).suffix(Constants.MERGED_OUTPUT_PREFIX);
LOG.info("Patch..InMemoryMerger outputPath: " + outputPath);
@@ -657,13 +712,13 @@ public class MergeManager {
LOG.info("Initiating in-memory merge with " + noInMemorySegments +
" segments...");
+ tmpDir = new Path(inputContext.getUniqueIdentifier());
// Nothing actually materialized to disk - controlled by setting sort-factor to #segments.
rIter = TezMerger.merge(conf, rfs,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
inMemorySegments, inMemorySegments.size(),
- new Path(inputContext.getUniqueIdentifier()),
- (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
+ tmpDir, (RawComparator)ConfigUtils.getIntermediateInputKeyComparator(conf),
nullProgressable, spilledRecordsCounter, null, additionalBytesRead, null);
// spilledRecordsCounter is tracking the number of keys that will be
// read from each of the segments being merged - which is essentially
@@ -700,6 +755,18 @@ public class MergeManager {
closeOnDiskFile(new FileChunk(outputPath, 0, outFileLen));
}
+ @Override
+ public void cleanup(List<MapOutput> inputs, boolean deleteData)
+ throws IOException, InterruptedException {
+ if (deleteData) {
+ //Additional check at task level
+ if (cleanup) {
+ LOG.info("Try deleting stale data");
+ MergeManager.cleanup(localFS, outputPath);
+ MergeManager.cleanup(localFS, tmpDir);
+ }
+ }
+ }
}
/**
@@ -708,6 +775,11 @@ public class MergeManager {
@VisibleForTesting
class OnDiskMerger extends MergeThread<FileChunk> {
+ @VisibleForTesting
+ volatile Path outputPath;
+ @VisibleForTesting
+ volatile Path tmpDir;
+
public OnDiskMerger(MergeManager manager) {
super(manager, ioSortFactor, exceptionReporter);
setName("DiskToDiskMerger [" + TezUtilsInternal
@@ -716,7 +788,7 @@ public class MergeManager {
}
@Override
- public void merge(List<FileChunk> inputs) throws IOException {
+ public void merge(List<FileChunk> inputs) throws IOException, InterruptedException {
// sanity check
if (inputs == null || inputs.isEmpty()) {
LOG.info("No ondisk files to merge...");
@@ -768,7 +840,7 @@ public class MergeManager {
// namePart includes the suffix of the file. We need to remove it.
namePart = FilenameUtils.removeExtension(namePart);
- Path outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
+ outputPath = localDirAllocator.getLocalPathForWrite(namePart, approxOutputSize, conf);
outputPath = outputPath.suffix(Constants.MERGED_OUTPUT_PREFIX + mergeFileSequenceId.getAndIncrement());
Writer writer =
@@ -776,7 +848,7 @@ public class MergeManager {
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
(Class)ConfigUtils.getIntermediateInputValueClass(conf),
codec, null, null);
- Path tmpDir = new Path(inputContext.getUniqueIdentifier());
+ tmpDir = new Path(inputContext.getUniqueIdentifier());
try {
TezRawKeyValueIterator iter = TezMerger.merge(conf, rfs,
(Class)ConfigUtils.getIntermediateInputKeyClass(conf),
@@ -808,6 +880,20 @@ public class MergeManager {
" Local output file is " + outputPath + " of size " +
outputLen);
}
+
+ @Override
+ public void cleanup(List<FileChunk> inputs, boolean deleteData) throws IOException,
+ InterruptedException {
+ if (deleteData) {
+ //Additional check at task level
+ if (cleanup) {
+ LOG.info("Try deleting stale data");
+ MergeManager.cleanup(localFS, inputs);
+ MergeManager.cleanup(localFS, outputPath);
+ MergeManager.cleanup(localFS, tmpDir);
+ }
+ }
+ }
}
private long createInMemorySegments(List<MapOutput> inMemoryMapOutputs,
@@ -821,7 +907,7 @@ public class MergeManager {
for (MapOutput mo : inMemoryMapOutputs) {
fullSize += mo.getMemory().length;
}
- while(fullSize > leaveBytes) {
+ while((fullSize > leaveBytes) && !Thread.currentThread().isInterrupted()) {
MapOutput mo = inMemoryMapOutputs.remove(0);
byte[] data = mo.getMemory();
long size = data.length;
@@ -878,7 +964,7 @@ public class MergeManager {
private TezRawKeyValueIterator finalMerge(Configuration job, FileSystem fs,
List<MapOutput> inMemoryMapOutputs,
List<FileChunk> onDiskMapOutputs
- ) throws IOException {
+ ) throws IOException, InterruptedException {
LOG.info("finalMerge called with " +
inMemoryMapOutputs.size() + " in-memory map-outputs and " +
onDiskMapOutputs.size() + " on-disk map-outputs");
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
index d4faf51..52b4c5b 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/MergeThread.java
@@ -46,8 +46,18 @@ abstract class MergeThread<T> extends Thread {
public synchronized void close() throws InterruptedException {
closed = true;
- waitForMerge();
- interrupt();
+ if (!Thread.currentThread().isInterrupted()) {
+ waitForMerge();
+ interrupt();
+ } else {
+ try {
+ interrupt();
+ cleanup(inputs, Thread.currentThread().isInterrupted());
+ } catch (IOException e) {
+ //ignore
+ LOG.warn("Error cleaning up", e);
+ }
+ }
}
public synchronized boolean isInProgress() {
@@ -89,6 +99,7 @@ abstract class MergeThread<T> extends Thread {
merge(inputs);
} catch (InterruptedException ie) {
// Meant to handle a shutdown of the entire fetch/merge process
+ Thread.currentThread().interrupt();
return;
} catch(Throwable t) {
reporter.reportException(t);
@@ -106,4 +117,7 @@ abstract class MergeThread<T> extends Thread {
public abstract void merge(List<T> inputs)
throws IOException, InterruptedException;
+
+ public abstract void cleanup(List<T> inputs, boolean deleteData)
+ throws IOException, InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
index f98aa3a..442f032 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/Shuffle.java
@@ -357,6 +357,7 @@ public class Shuffle implements ExceptionReporter {
shufflePhaseTime.setValue(System.currentTimeMillis() - startTime);
// Stop the map-output fetcher threads
+ LOG.info("Cleaning up fetchers");
cleanupFetchers(false);
// stop the scheduler
@@ -393,8 +394,7 @@ public class Shuffle implements ExceptionReporter {
for (FetcherOrderedGrouped fetcher : fetchers) {
try {
fetcher.shutDown();
- LOG.info("Shutdown.." + fetcher.getName() + ", status:" + fetcher.isAlive() + ", "
- + "isInterrupted:" + fetcher.isInterrupted());
+ LOG.info("Shutdown.." + fetcher.getName());
} catch (InterruptedException e) {
if (ignoreErrors) {
LOG.info("Interrupted while shutting down fetchers. Ignoring.");
@@ -425,6 +425,8 @@ public class Shuffle implements ExceptionReporter {
scheduler.close();
} catch (InterruptedException e) {
if (ignoreErrors) {
+ //Reset the status
+ Thread.currentThread().interrupt();
LOG.info("Interrupted while attempting to close the scheduler during cleanup. Ignoring");
} else {
throw e;
@@ -437,6 +439,14 @@ public class Shuffle implements ExceptionReporter {
if (!mergerClosed.getAndSet(true)) {
try {
merger.close();
+ } catch (InterruptedException e) {
+ if (ignoreErrors) {
+ //Reset the status
+ Thread.currentThread().interrupt();
+ LOG.info("Interrupted while attempting to close the merger during cleanup. Ignoring");
+ } else {
+ throw e;
+ }
} catch (Throwable e) {
if (ignoreErrors) {
LOG.info("Exception while trying to shutdown merger, Ignoring", e);
@@ -493,7 +503,7 @@ public class Shuffle implements ExceptionReporter {
@Override
public void onFailure(Throwable t) {
if (isShutDown.get()) {
- LOG.info("Already shutdown. Ignoring error: ", t);
+ LOG.info("Already shutdown. Ignoring error");
} else {
LOG.error("ShuffleRunner failed with error", t);
inputContext.fatalError(t, "Shuffle Runner Failed");
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index a3d79ae..c54b005 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -765,6 +765,7 @@ class ShuffleScheduler {
}
}
} catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
// This handles shutdown of the entire fetch / merge process.
return;
} catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
index c0445c9..ca4d889 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/ExternalSorter.java
@@ -115,6 +115,8 @@ public abstract class ExternalSorter {
protected Path finalIndexFile;
protected int numSpills;
+ protected final boolean cleanup;
+
// Counters
// MR compatilbity layer needs to rename counters back to what MR requries.
@@ -148,6 +150,9 @@ public abstract class ExternalSorter {
this.conf = conf;
this.partitions = numOutputs;
+ cleanup = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT_DEFAULT);
+
rfs = ((LocalFileSystem)FileSystem.getLocal(this.conf)).getRaw();
LOG.info("Initial Mem : " + initialMemoryAvailable + ", assignedMb=" + ((initialMemoryAvailable >> 20)));
@@ -261,6 +266,7 @@ public abstract class ExternalSorter {
try {
combiner.combine(kvIter, writer);
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IOException(e);
}
}
@@ -314,4 +320,36 @@ public abstract class ExternalSorter {
public int getNumSpills() {
return numSpills;
}
+
+ protected synchronized void cleanup() throws IOException {
+ if (!cleanup) {
+ return;
+ }
+ cleanup(spillFilePaths);
+ cleanup(spillFileIndexPaths);
+ //TODO: What if when same volume rename happens (have to rely on job completion cleanup)
+ cleanup(finalOutputFile);
+ cleanup(finalIndexFile);
+ }
+
+ protected synchronized void cleanup(Path path) {
+ if (path == null || !cleanup) {
+ return;
+ }
+ try {
+ LOG.info("Deleting " + path);
+ rfs.delete(path, true);
+ } catch(IOException ioe) {
+ LOG.warn("Error in deleting " + path);
+ }
+ }
+
+ protected synchronized void cleanup(Map<Integer, Path> spillMap) {
+ if (!cleanup) {
+ return;
+ }
+ for(Map.Entry<Integer, Path> entry : spillMap.entrySet()) {
+ cleanup(entry.getValue());
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
index 661f54c..030440e 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/PipelinedSorter.java
@@ -33,7 +33,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
@@ -356,6 +355,9 @@ public class PipelinedSorter extends ExternalSorter {
merger.ready(); // wait for all the future results from sort threads
LOG.info("Spilling to " + filename.toString());
for (int i = 0; i < partitions; ++i) {
+ if (isThreadInterrupted()) {
+ return;
+ }
TezRawKeyValueIterator kvIter = merger.filter(i);
//write merged output to disk
long segmentStart = out.getPos();
@@ -391,147 +393,182 @@ public class PipelinedSorter extends ExternalSorter {
++numSpills;
} catch(InterruptedException ie) {
// TODO:the combiner has been interrupted
+ Thread.currentThread().interrupt();
} finally {
out.close();
}
}
+
+
+
+
+ private boolean isThreadInterrupted() throws IOException {
+ if (Thread.currentThread().isInterrupted()) {
+ if (cleanup) {
+ cleanup();
+ }
+ sortmaster.shutdownNow();
+ LOG.info("Thread interrupted, cleaned up stale data, sorter threads shutdown=" + sortmaster
+ .isShutdown() + ", terminated=" + sortmaster.isTerminated());
+ return true;
+ }
+ return false;
+ }
+
@Override
public void flush() throws IOException {
final String uniqueIdentifier = outputContext.getUniqueIdentifier();
- LOG.info("Starting flush of map output");
- span.end();
- merger.add(span.sort(sorter));
- spill();
- sortmaster.shutdown();
+ /**
+ * Possible that the thread got interrupted when flush was happening or when the flush was
+ * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
+ * on all I/O. At that time, this is safe to cleanup
+ */
+ if (isThreadInterrupted()) {
+ return;
+ }
+
+ try {
+ LOG.info("Starting flush of map output");
+ span.end();
+ merger.add(span.sort(sorter));
+ spill();
+ sortmaster.shutdown();
- //safe to clean up
- bufferList.clear();
+ //safe to clean up
+ bufferList.clear();
- numAdditionalSpills.increment(numSpills - 1);
+ numAdditionalSpills.increment(numSpills - 1);
- if (!finalMergeEnabled) {
- //Generate events for all spills
- List<Event> events = Lists.newLinkedList();
+ if (!finalMergeEnabled) {
+ //Generate events for all spills
+ List<Event> events = Lists.newLinkedList();
- //For pipelined shuffle, previous events are already sent. Just generate the last event alone
- int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
- int endIndex = numSpills;
+ //For pipelined shuffle, previous events are already sent. Just generate the last event alone
+ int startIndex = (pipelinedShuffle) ? (numSpills - 1) : 0;
+ int endIndex = numSpills;
- for (int i = startIndex; i < endIndex; i++) {
- boolean isLastEvent = (i == numSpills - 1);
+ for (int i = startIndex; i < endIndex; i++) {
+ boolean isLastEvent = (i == numSpills - 1);
- String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
- ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
- outputContext, i, indexCacheList.get(i), partitions,
- sendEmptyPartitionDetails, pathComponent);
- LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
+ String pathComponent = (outputContext.getUniqueIdentifier() + "_" + i);
+ ShuffleUtils.generateEventOnSpill(events, finalMergeEnabled, isLastEvent,
+ outputContext, i, indexCacheList.get(i), partitions,
+ sendEmptyPartitionDetails, pathComponent);
+ LOG.info("Adding spill event for spill (final update=" + isLastEvent + "), spillId=" + i);
+ }
+ outputContext.sendEvents(events);
+ //No need to generate final merge
+ return;
}
- outputContext.sendEvents(events);
- //No need to generate final merge
- return;
- }
- //In case final merge is required, the following code path is executed.
- if(numSpills == 1) {
- // someday be able to pass this directly to shuffle
- // without writing to disk
- final Path filename = spillFilePaths.get(0);
- final Path indexFilename = spillFileIndexPaths.get(0);
- finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
- finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
-
- sameVolRename(filename, finalOutputFile);
- sameVolRename(indexFilename, finalIndexFile);
- if (LOG.isInfoEnabled()) {
- LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", "
- + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
- indexFilename);
+ //In case final merge is required, the following code path is executed.
+ if (numSpills == 1) {
+ // someday be able to pass this directly to shuffle
+ // without writing to disk
+ final Path filename = spillFilePaths.get(0);
+ final Path indexFilename = spillFileIndexPaths.get(0);
+ finalOutputFile = mapOutputFile.getOutputFileForWriteInVolume(filename);
+ finalIndexFile = mapOutputFile.getOutputIndexFileForWriteInVolume(indexFilename);
+
+ sameVolRename(filename, finalOutputFile);
+ sameVolRename(indexFilename, finalIndexFile);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("numSpills=" + numSpills + ", finalOutputFile=" + finalOutputFile + ", "
+ + "finalIndexFile=" + finalIndexFile + ", filename=" + filename + ", indexFilename=" +
+ indexFilename);
+ }
+ return;
}
- return;
- }
- finalOutputFile =
- mapOutputFile.getOutputFileForWrite(0); //TODO
- finalIndexFile =
- mapOutputFile.getOutputIndexFileForWrite(0); //TODO
+ finalOutputFile =
+ mapOutputFile.getOutputFileForWrite(0); //TODO
+ finalIndexFile =
+ mapOutputFile.getOutputIndexFileForWrite(0); //TODO
- if (LOG.isDebugEnabled()) {
- LOG.debug("numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
- + finalIndexFile);
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "numSpills: " + numSpills + ", finalOutputFile:" + finalOutputFile + ", finalIndexFile:"
+ + finalIndexFile);
+ }
- //The output stream for the final single output file
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
+ //The output stream for the final single output file
+ FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
- final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ final TezSpillRecord spillRec = new TezSpillRecord(partitions);
+ for (int parts = 0; parts < partitions; parts++) {
+ //create the segments to be merged
+ List<Segment> segmentList =
+ new ArrayList<Segment>(numSpills);
+ for (int i = 0; i < numSpills; i++) {
+ Path spillFilename = spillFilePaths.get(i);
+ TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
- for (int parts = 0; parts < partitions; parts++) {
- //create the segments to be merged
- List<Segment> segmentList =
- new ArrayList<Segment>(numSpills);
- for(int i = 0; i < numSpills; i++) {
- Path spillFilename = spillFilePaths.get(i);
- TezIndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
+ Segment s =
+ new Segment(rfs, spillFilename, indexRecord.getStartOffset(),
+ indexRecord.getPartLength(), codec, ifileReadAhead,
+ ifileReadAheadLength, ifileBufferSize, true);
+ segmentList.add(i, s);
+ }
- Segment s =
- new Segment(rfs, spillFilename, indexRecord.getStartOffset(),
- indexRecord.getPartLength(), codec, ifileReadAhead,
- ifileReadAheadLength, ifileBufferSize, true);
- segmentList.add(i, s);
- }
+ int mergeFactor =
+ this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
+ TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
+ // sort the segments only if there are intermediate merges
+ boolean sortSegments = segmentList.size() > mergeFactor;
+ //merge
+ TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
+ keyClass, valClass, codec,
+ segmentList, mergeFactor,
+ new Path(uniqueIdentifier),
+ (RawComparator) ConfigUtils.getIntermediateOutputKeyComparator(conf),
+ nullProgressable, sortSegments, true,
+ null, spilledRecordsCounter, null,
+ null); // Not using any Progress in TezMerger. Should just work.
- int mergeFactor =
- this.conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR,
- TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR_DEFAULT);
- // sort the segments only if there are intermediate merges
- boolean sortSegments = segmentList.size() > mergeFactor;
- //merge
- TezRawKeyValueIterator kvIter = TezMerger.merge(conf, rfs,
- keyClass, valClass, codec,
- segmentList, mergeFactor,
- new Path(uniqueIdentifier),
- (RawComparator)ConfigUtils.getIntermediateOutputKeyComparator(conf),
- nullProgressable, sortSegments, true,
- null, spilledRecordsCounter, null,
- null); // Not using any Progress in TezMerger. Should just work.
-
- //write merged output to disk
- long segmentStart = finalOut.getPos();
- Writer writer =
- new Writer(conf, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter, null, merger.needsRLE());
- if (combiner == null || numSpills < minSpillsForCombine) {
- TezMerger.writeFile(kvIter, writer, nullProgressable, TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
- } else {
- runCombineProcessor(kvIter, writer);
- }
+ //write merged output to disk
+ long segmentStart = finalOut.getPos();
+ Writer writer =
+ new Writer(conf, finalOut, keyClass, valClass, codec,
+ spilledRecordsCounter, null, merger.needsRLE());
+ if (combiner == null || numSpills < minSpillsForCombine) {
+ TezMerger.writeFile(kvIter, writer, nullProgressable,
+ TezRuntimeConfiguration.TEZ_RUNTIME_RECORDS_BEFORE_PROGRESS_DEFAULT);
+ } else {
+ runCombineProcessor(kvIter, writer);
+ }
- //close
- writer.close();
+ //close
+ writer.close();
- // record offsets
- final TezIndexRecord rec =
- new TezIndexRecord(
- segmentStart,
- writer.getRawLength(),
- writer.getCompressedLength());
- spillRec.putIndex(rec, parts);
- }
+ // record offsets
+ final TezIndexRecord rec =
+ new TezIndexRecord(
+ segmentStart,
+ writer.getRawLength(),
+ writer.getCompressedLength());
+ spillRec.putIndex(rec, parts);
+ }
- spillRec.writeToFile(finalIndexFile, conf);
- finalOut.close();
- for(int i = 0; i < numSpills; i++) {
- Path indexFilename = spillFileIndexPaths.get(i);
- Path spillFilename = spillFilePaths.get(i);
- rfs.delete(indexFilename,true);
- rfs.delete(spillFilename,true);
- }
+ spillRec.writeToFile(finalIndexFile, conf);
+ finalOut.close();
+ for (int i = 0; i < numSpills; i++) {
+ Path indexFilename = spillFileIndexPaths.get(i);
+ Path spillFilename = spillFilePaths.get(i);
+ rfs.delete(indexFilename, true);
+ rfs.delete(spillFilename, true);
+ }
- spillFileIndexPaths.clear();
- spillFilePaths.clear();
+ spillFileIndexPaths.clear();
+ spillFilePaths.clear();
+ } catch(InterruptedException ie) {
+ if (cleanup) {
+ cleanup();
+ }
+ Thread.currentThread().interrupt();
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
index 758e9c7..3b7bf05 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/TezMerger.java
@@ -76,7 +76,7 @@ public class TezMerger {
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
return
new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false, comparator,
@@ -101,7 +101,7 @@ public class TezMerger {
TezCounter mergedMapOutputsCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
return
new MergeQueue(conf, fs, inputs, deleteInputs, codec, ifileReadAhead,
ifileReadAheadLength, ifileBufferSize, false, comparator,
@@ -124,7 +124,7 @@ public class TezMerger {
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
// Get rid of this ?
return merge(conf, fs, keyClass, valueClass, segments, mergeFactor, tmpDir,
comparator, reporter, false, readsCounter, writesCounter, bytesReadCounter,
@@ -142,7 +142,7 @@ public class TezMerger {
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, false).merge(keyClass, valueClass,
mergeFactor, tmpDir,
@@ -163,7 +163,7 @@ public class TezMerger {
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, considerFinalMergeForProgress).
merge(keyClass, valueClass,
@@ -185,7 +185,7 @@ public class TezMerger {
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
return new MergeQueue(conf, fs, segments, comparator, reporter,
sortSegments, codec, false).merge(keyClass, valueClass,
mergeFactor, inMemSegments,
@@ -196,9 +196,9 @@ public class TezMerger {
}
public static <K extends Object, V extends Object>
- void writeFile(TezRawKeyValueIterator records, Writer writer,
- Progressable progressable, long recordsBeforeProgress)
- throws IOException {
+ void writeFile(TezRawKeyValueIterator records, Writer writer,
+ Progressable progressable, long recordsBeforeProgress)
+ throws IOException, InterruptedException {
long recordCtr = 0;
long count = 0;
while(records.next()) {
@@ -211,6 +211,15 @@ public class TezMerger {
if (((recordCtr++) % recordsBeforeProgress) == 0) {
progressable.progress();
+ if (Thread.currentThread().isInterrupted()) {
+ /**
+ * Takes care DefaultSorter.mergeParts, MergeManager's merger threads,
+ * PipelinedSorter's flush(). This is not expensive check as it is carried out every
+ * 10000 records or so.
+ */
+ throw new InterruptedException("Current thread=" + Thread.currentThread().getName() + " got "
+ + "interrupted");
+ }
}
}
if ((count > 0) && LOG.isDebugEnabled()) {
@@ -614,7 +623,7 @@ public class TezMerger {
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
return merge(keyClass, valueClass, factor, 0, tmpDir,
readsCounter, writesCounter, bytesReadCounter, mergePhase);
}
@@ -625,7 +634,7 @@ public class TezMerger {
TezCounter writesCounter,
TezCounter bytesReadCounter,
Progress mergePhase)
- throws IOException {
+ throws IOException, InterruptedException {
LOG.info("Merging " + segments.size() + " sorted segments");
if (segments.size() == 0) {
LOG.info("Nothing to merge. Returning an empty iterator");
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
index 2cbb70a..9783c79 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/sort/impl/dflt/DefaultSorter.java
@@ -193,6 +193,9 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
spillDone.await();
}
} catch (InterruptedException e) {
+ //interrupt spill thread
+ spillThread.interrupt();
+ Thread.currentThread().interrupt();
throw new IOException("Spill thread failed to initialize", e);
} finally {
spillLock.unlock();
@@ -603,6 +606,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
spillDone.await();
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
throw new IOException(
"Buffer interrupted while waiting for the writer", e);
}
@@ -625,9 +629,45 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
}
+ void interruptSpillThread() throws IOException {
+ assert !spillLock.isHeldByCurrentThread();
+ // shut down spill thread and wait for it to exit. Since the preceding
+ // ensures that it is finished with its work (and sortAndSpill did not
+ // throw), we elect to use an interrupt instead of setting a flag.
+ // Spilling simultaneously from this thread while the spill thread
+ // finishes its work might be both a useful way to extend this and also
+ // sufficient motivation for the latter approach.
+ try {
+ spillThread.interrupt();
+ spillThread.join();
+ } catch (InterruptedException e) {
+ LOG.info("Spill thread interrupted");
+ //Reset status
+ Thread.currentThread().interrupt();
+ throw new IOException("Spill failed", e);
+ }
+ }
+
@Override
public void flush() throws IOException {
LOG.info("Starting flush of map output");
+ if (Thread.currentThread().isInterrupted()) {
+ /**
+ * Possible that the thread got interrupted when flush was happening or when the flush was
+ * never invoked. As a part of cleanup activity in TezTaskRunner, it would invoke close()
+ * on all I/O. At that time, this is safe to cleanup
+ */
+ if (cleanup) {
+ cleanup();
+ }
+ try {
+ interruptSpillThread();
+ } catch(IOException e) {
+ //safe to ignore
+ }
+ return;
+ }
+
spillLock.lock();
try {
while (spillInProgress) {
@@ -656,28 +696,25 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
sortAndSpill();
}
} catch (InterruptedException e) {
+ //Reset status
+ Thread.currentThread().interrupt();
+ interruptSpillThread();
throw new IOException("Interrupted while waiting for the writer", e);
} finally {
spillLock.unlock();
}
- assert !spillLock.isHeldByCurrentThread();
- // shut down spill thread and wait for it to exit. Since the preceding
- // ensures that it is finished with its work (and sortAndSpill did not
- // throw), we elect to use an interrupt instead of setting a flag.
- // Spilling simultaneously from this thread while the spill thread
- // finishes its work might be both a useful way to extend this and also
- // sufficient motivation for the latter approach.
- try {
- spillThread.interrupt();
- spillThread.join();
- } catch (InterruptedException e) {
- throw new IOException("Spill failed", e);
- }
- // release sort buffer before the merge
+
+ interruptSpillThread();
+ // release sort buffer before the mergecl
//FIXME
//kvbuffer = null;
- mergeParts();
+ try {
+ mergeParts();
+ } catch (InterruptedException e) {
+ cleanup();
+ Thread.currentThread().interrupt();
+ }
if (finalMergeEnabled) {
fileOutputByteCounter.increment(rfs.getFileStatus(finalOutputFile).getLen());
}
@@ -715,6 +752,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
}
}
} catch (InterruptedException e) {
+ LOG.info("Spill thread interrupted");
Thread.currentThread().interrupt();
} finally {
spillLock.unlock();
@@ -1085,7 +1123,7 @@ public class DefaultSorter extends ExternalSorter implements IndexedSortable {
outputContext.sendEvents(events);
}
- private void mergeParts() throws IOException {
+ private void mergeParts() throws IOException, InterruptedException {
// get the approximate size of the final output/index files
long finalOutFileSize = 0;
long finalIndexFileSize = 0;
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index d784fcd..49cf102 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -351,6 +351,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
}
// TODO Maybe add helper methods to extract keys
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 62fa9a5..7fc9317 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -267,6 +267,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
}
// TODO Maybe add helper methods to extract keys
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
index 6227fb9..53abc17 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/OrderedPartitionedKVOutput.java
@@ -248,6 +248,7 @@ public class OrderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SORTER_CLASS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
}
// TODO Maybe add helper methods to extract keys
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
index 08e6ec0..b50f17d 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedKVOutput.java
@@ -171,6 +171,7 @@ public class UnorderedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
}
// TODO Maybe add helper methods to extract keys
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
index 38450ee..7498627 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/output/UnorderedPartitionedKVOutput.java
@@ -144,6 +144,7 @@ public class UnorderedPartitionedKVOutput extends AbstractLogicalOutput {
confKeys.add(TezConfiguration.TEZ_COUNTERS_GROUP_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_COUNTER_NAME_MAX_LENGTH);
confKeys.add(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_CLEANUP_FILES_ON_INTERRUPT);
}
// TODO Maybe add helper methods to extract keys
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
index edb9b15..f62179a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/TestValuesIterator.java
@@ -169,28 +169,28 @@ public class TestValuesIterator {
}
@Test(timeout = 20000)
- public void testIteratorWithInMemoryReader() throws IOException {
+ public void testIteratorWithInMemoryReader() throws IOException, InterruptedException {
ValuesIterator iterator = createIterator(true);
verifyIteratorData(iterator);
}
@Test(timeout = 20000)
- public void testIteratorWithIFileReader() throws IOException {
+ public void testIteratorWithIFileReader() throws IOException, InterruptedException {
ValuesIterator iterator = createIterator(false);
verifyIteratorData(iterator);
}
@Test(timeout = 20000)
- public void testCountedIteratorWithInmemoryReader() throws IOException {
+ public void testCountedIteratorWithInmemoryReader() throws IOException, InterruptedException {
verifyCountedIteratorReader(true);
}
@Test(timeout = 20000)
- public void testCountedIteratorWithIFileReader() throws IOException {
+ public void testCountedIteratorWithIFileReader() throws IOException, InterruptedException {
verifyCountedIteratorReader(false);
}
- private void verifyCountedIteratorReader(boolean inMemory) throws IOException {
+ private void verifyCountedIteratorReader(boolean inMemory) throws IOException, InterruptedException {
TezCounter keyCounter = new GenericCounter("inputKeyCounter", "y3");
TezCounter tupleCounter = new GenericCounter("inputValuesCounter", "y4");
ValuesIterator iterator = createCountedIterator(inMemory, keyCounter,
@@ -207,7 +207,7 @@ public class TestValuesIterator {
}
@Test(timeout = 20000)
- public void testIteratorWithIFileReaderEmptyPartitions() throws IOException {
+ public void testIteratorWithIFileReaderEmptyPartitions() throws IOException, InterruptedException {
ValuesIterator iterator = createEmptyIterator(false);
assertTrue(iterator.moveToNext() == false);
@@ -224,7 +224,8 @@ public class TestValuesIterator {
}
}
- private ValuesIterator createEmptyIterator(boolean inMemory) throws IOException {
+ private ValuesIterator createEmptyIterator(boolean inMemory)
+ throws IOException, InterruptedException {
if (!inMemory) {
streamPaths = new Path[0];
//This will return EmptyIterator
@@ -323,7 +324,7 @@ public class TestValuesIterator {
* @return ValuesIterator
* @throws IOException
*/
- private ValuesIterator createIterator(boolean inMemory) throws IOException {
+ private ValuesIterator createIterator(boolean inMemory) throws IOException, InterruptedException {
if (!inMemory) {
streamPaths = createFiles();
//Merge all files to get KeyValueIterator
@@ -353,7 +354,8 @@ public class TestValuesIterator {
* @return ValuesIterator
* @throws IOException
*/
- private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter) throws IOException {
+ private ValuesIterator createCountedIterator(boolean inMemory, TezCounter keyCounter, TezCounter tupleCounter)
+ throws IOException, InterruptedException {
if (!inMemory) {
streamPaths = createFiles();
//Merge all files to get KeyValueIterator
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
index 094237a..0faa22a 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestMergeManager.java
@@ -28,6 +28,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.UUID;
+import com.google.common.collect.Sets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -166,9 +167,32 @@ public class TestMergeManager {
Assert.assertTrue(mergeManager.postMergeMemLimit == initialMemoryAvailable);
}
+ class InterruptingThread implements Runnable {
+
+ MergeManager.OnDiskMerger mergeThread;
+
+ public InterruptingThread(MergeManager.OnDiskMerger mergeThread) {
+ this.mergeThread = mergeThread;
+ }
+
+ @Override public void run() {
+ while(this.mergeThread.tmpDir == null) {
+ //this is tight loop
+ }
+
+ this.mergeThread.interrupt();
+ }
+ }
+
@Test(timeout = 10000)
- public void testLocalDiskMergeMultipleTasks() throws IOException {
+ public void testLocalDiskMergeMultipleTasks() throws IOException, InterruptedException {
+ testLocalDiskMergeMultipleTasks(false);
+ testLocalDiskMergeMultipleTasks(true);
+ }
+
+ void testLocalDiskMergeMultipleTasks(boolean interruptInMiddle)
+ throws IOException, InterruptedException {
Configuration conf = new TezConfiguration(defaultConf);
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_COMPRESS, false);
conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_KEY_CLASS, IntWritable.class.getName());
@@ -194,6 +218,7 @@ public class TestMergeManager {
new MergeManager(conf, localFs, localDirAllocator, t0inputContext, null, null, null, null,
t0exceptionReporter, 2000000, null, false, -1);
MergeManager t0mergeManager = spy(t0mergeManagerReal);
+ t0mergeManager.configureAndStart();
MergeManager t1mergeManagerReal =
new MergeManager(conf, localFs, localDirAllocator, t1inputContext, null, null, null, null,
@@ -249,30 +274,48 @@ public class TestMergeManager {
List<FileChunk> t0MergeFiles = new LinkedList<FileChunk>();
t0MergeFiles.addAll(t0mergeManager.onDiskMapOutputs);
t0mergeManager.onDiskMapOutputs.clear();
- t0mergeManager.onDiskMerger.merge(t0MergeFiles);
- Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size());
-
-
- t1MapOutput0.commit();
- t1MapOutput1.commit();
- verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath());
- verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath());
- // Run the OnDiskMerge via MergeManager
- // Simulate the thread invocation - remove files, and invoke merge
- List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>();
- t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs);
- t1mergeManager.onDiskMapOutputs.clear();
- t1mergeManager.onDiskMerger.merge(t1MergeFiles);
- Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size());
- Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(),
- t1mergeManager.onDiskMapOutputs.iterator().next().getPath());
+ if (!interruptInMiddle) {
+ t0mergeManager.onDiskMerger.merge(t0MergeFiles);
+ Assert.assertEquals(1, t0mergeManager.onDiskMapOutputs.size());
+ } else {
+
+ //Start Interrupting thread
+ Thread interruptingThread = new Thread(new InterruptingThread(t0mergeManager.onDiskMerger));
+ interruptingThread.start();
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
- Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
- .contains(t0inputContext.getUniqueIdentifier()));
- Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
- .contains(t1inputContext.getUniqueIdentifier()));
+ //Will be interrupted in the middle by interruptingThread.
+ t0mergeManager.onDiskMerger.startMerge(Sets.newHashSet(t0MergeFiles));
+ t0mergeManager.onDiskMerger.waitForMerge();
+ Assert.assertNotEquals(1, t0mergeManager.onDiskMapOutputs.size());
+ }
+ if (!interruptInMiddle) {
+ t1MapOutput0.commit();
+ t1MapOutput1.commit();
+ verify(t1mergeManager).closeOnDiskFile(t1MapOutput0.getOutputPath());
+ verify(t1mergeManager).closeOnDiskFile(t1MapOutput1.getOutputPath());
+ // Run the OnDiskMerge via MergeManager
+ // Simulate the thread invocation - remove files, and invoke merge
+ List<FileChunk> t1MergeFiles = new LinkedList<FileChunk>();
+ t1MergeFiles.addAll(t1mergeManager.onDiskMapOutputs);
+ t1mergeManager.onDiskMapOutputs.clear();
+ t1mergeManager.onDiskMerger.merge(t1MergeFiles);
+ Assert.assertEquals(1, t1mergeManager.onDiskMapOutputs.size());
+
+ Assert.assertNotEquals(t0mergeManager.onDiskMapOutputs.iterator().next().getPath(),
+ t1mergeManager.onDiskMapOutputs.iterator().next().getPath());
+
+ Assert.assertTrue(t0mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+ .contains(t0inputContext.getUniqueIdentifier()));
+ Assert.assertTrue(t1mergeManager.onDiskMapOutputs.iterator().next().getPath().toString()
+ .contains(t1inputContext.getUniqueIdentifier()));
+ }
}
private InputContext createMockInputContext(String uniqueId) {
http://git-wip-us.apache.org/repos/asf/tez/blob/146ab070/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
index bb932f2..b86d054 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/sort/impl/TestTezMerger.java
@@ -557,7 +557,8 @@ public class TestTezMerger {
* @return
* @throws IOException
*/
- private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc) throws IOException {
+ private TezRawKeyValueIterator merge(List<Path> pathList, RawComparator rc)
+ throws IOException, InterruptedException {
TezMerger merger = new TezMerger();
TezRawKeyValueIterator records = merger.merge(defaultConf, localFs, IntWritable.class,
LongWritable.class, null, false, 0, 1024, pathList.toArray(new Path[pathList.size()]),