You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/17 07:30:19 UTC
[17/45] hadoop git commit: MAPREDUCE-6156. Fetcher - connect()
doesn't handle connection refused correctly. Contributed by Junping Du
MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/177e8090
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/177e8090
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/177e8090
Branch: refs/heads/HDFS-EC
Commit: 177e8090f5809beb3ebcb656cd0affbb3f487de8
Parents: 7dae5b5
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Nov 13 15:42:25 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Nov 13 15:42:25 2014 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 ++
.../hadoop/mapreduce/task/reduce/Fetcher.java | 39 +++++++++++++++-----
2 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/177e8090/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 96bb690..c0105e6 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -465,6 +465,9 @@ Release 2.6.0 - 2014-11-15
MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
(Emilio Coppa and jlowe via kihwal)
+ MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused
+ correctly (Junping Du via jlowe)
+
Release 2.5.2 - 2014-11-10
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/177e8090/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 796394f..3f40853 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -407,7 +407,7 @@ class Fetcher<K,V> extends Thread {
}
if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
LOG.warn("Failed to connect to host: " + url + "after "
- + fetchRetryTimeout + "milliseconds.");
+ + fetchRetryTimeout + " milliseconds.");
throw e;
}
try {
@@ -596,7 +596,7 @@ class Fetcher<K,V> extends Thread {
} else {
// timeout, prepare to be failed.
LOG.warn("Timeout for copying MapOutput with retry on host " + host
- + "after " + fetchRetryTimeout + "milliseconds.");
+ + "after " + fetchRetryTimeout + " milliseconds.");
}
}
@@ -678,28 +678,49 @@ class Fetcher<K,V> extends Thread {
} else if (connectionTimeout > 0) {
unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
}
+ long startTime = Time.monotonicNow();
+ long lastTime = startTime;
+ int attempts = 0;
// set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit);
while (true) {
try {
+ attempts++;
connection.connect();
break;
} catch (IOException ioe) {
- // update the total remaining connect-timeout
- connectionTimeout -= unit;
-
+ long currentTime = Time.monotonicNow();
+ long retryTime = currentTime - startTime;
+ long leftTime = connectionTimeout - retryTime;
+ long timeSinceLastIteration = currentTime - lastTime;
// throw an exception if we have waited for timeout amount of time
// note that the updated value if timeout is used here
- if (connectionTimeout == 0) {
+ if (leftTime <= 0) {
+ int retryTimeInSeconds = (int) retryTime/1000;
+ LOG.error("Connection retry failed with " + attempts +
+ " attempts in " + retryTimeInSeconds + " seconds");
throw ioe;
}
-
// reset the connect timeout for the last try
- if (connectionTimeout < unit) {
- unit = connectionTimeout;
+ if (leftTime < unit) {
+ unit = (int)leftTime;
// reset the connect time out for the final connect
connection.setConnectTimeout(unit);
}
+
+ if (timeSinceLastIteration < unit) {
+ try {
+ // sleep the left time of unit
+ sleep(unit - timeSinceLastIteration);
+ } catch (InterruptedException e) {
+ LOG.warn("Sleep in connection retry get interrupted.");
+ if (stopped) {
+ return;
+ }
+ }
+ }
+ // update the total remaining connect-timeout
+ lastTime = Time.monotonicNow();
}
}
}