You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/02 23:03:02 UTC
git commit: TEZ-1091. Respect keepAlive when shutting down Fetchers.
Contributed by Rajesh Balamohan.
Repository: incubator-tez
Updated Branches:
refs/heads/master 06ee16a33 -> d90dbd8dc
TEZ-1091. Respect keepAlive when shutting down Fetchers. Contributed by
Rajesh Balamohan.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d90dbd8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d90dbd8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d90dbd8d
Branch: refs/heads/master
Commit: d90dbd8dc4638784de702caa26bdd66a38005afb
Parents: 06ee16a
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 2 14:02:26 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 2 14:02:26 2014 -0700
----------------------------------------------------------------------
.../library/common/shuffle/impl/Fetcher.java | 23 ++++++++++----------
1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d90dbd8d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index a761184..950c3cc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -202,7 +202,7 @@ class Fetcher extends Thread {
// Shuffle
copyFromHost(host);
} finally {
- cleanupCurrentConnection();
+ cleanupCurrentConnection(!keepAlive);
if (host != null) {
scheduler.freeHost(host);
metrics.threadFree();
@@ -219,7 +219,7 @@ class Fetcher extends Thread {
public void shutDown() throws InterruptedException {
this.stopped = true;
interrupt();
- cleanupCurrentConnection();
+ cleanupCurrentConnection(true);
try {
join(5000);
} catch (InterruptedException ie) {
@@ -231,7 +231,7 @@ class Fetcher extends Thread {
}
private Object cleanupLock = new Object();
- private void cleanupCurrentConnection() {
+ private void cleanupCurrentConnection(boolean disconnect) {
// Synchronizing on cleanupLock to ensure we don't run into a parallel close
// Can't synchronize on the main class itself since that would cause the
// shutdown request to block
@@ -241,7 +241,7 @@ class Fetcher extends Thread {
LOG.info("Closing input on " + logIdentifier);
input.close();
}
- if (connection != null) {
+ if (connection != null && disconnect) {
LOG.info("Closing connection on " + logIdentifier);
connection.disconnect();
}
@@ -321,7 +321,7 @@ class Fetcher extends Thread {
if (stopped) {
LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
- cleanupCurrentConnection();
+ cleanupCurrentConnection(true);
putBackRemainingMapOutputs(host);
return;
}
@@ -354,7 +354,7 @@ class Fetcher extends Thread {
} catch (IOException ie) {
if (stopped) {
LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
- cleanupCurrentConnection();
+ cleanupCurrentConnection(true);
putBackRemainingMapOutputs(host);
return;
}
@@ -413,7 +413,7 @@ class Fetcher extends Thread {
}
}
- cleanupCurrentConnection();
+ cleanupCurrentConnection(!keepAlive); //do not disconnect if keepAlive is on
// Sanity check
if (failedTasks == null && !remaining.isEmpty()) {
@@ -522,14 +522,13 @@ class Fetcher extends Thread {
return EMPTY_ATTEMPT_ID_ARRAY;
}
- // Check if we can shuffle *now* ...
if (mapOutput.getType() == Type.WAIT) {
// TODO Review: Does this cause a tight loop ?
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
- //Not an error but wait to process data.
+ //Not an error but wait to process data.
return EMPTY_ATTEMPT_ID_ARRAY;
- }
-
+ }
+
// Go!
LOG.info("fetcher#" + id + " about to shuffle output of map " +
mapOutput.getAttemptIdentifier() + " decomp: " +
@@ -554,7 +553,7 @@ class Fetcher extends Thread {
if (stopped) {
LOG.info("Not reporting fetch failure for exception during data copy: ["
+ ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
- cleanupCurrentConnection();
+ cleanupCurrentConnection(true);
if (mapOutput != null) {
mapOutput.abort(); // Release resources
}