You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/11/17 07:30:19 UTC

[17/45] hadoop git commit: MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du

MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused correctly. Contributed by Junping Du


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/177e8090
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/177e8090
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/177e8090

Branch: refs/heads/HDFS-EC
Commit: 177e8090f5809beb3ebcb656cd0affbb3f487de8
Parents: 7dae5b5
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Nov 13 15:42:25 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Nov 13 15:42:25 2014 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../hadoop/mapreduce/task/reduce/Fetcher.java   | 39 +++++++++++++++-----
 2 files changed, 33 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/177e8090/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 96bb690..c0105e6 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -465,6 +465,9 @@ Release 2.6.0 - 2014-11-15
     MAPREDUCE-5958. Wrong reduce task progress if map output is compressed
     (Emilio Coppa and jlowe via kihwal)
 
+    MAPREDUCE-6156. Fetcher - connect() doesn't handle connection refused
+    correctly (Junping Du via jlowe)
+
 Release 2.5.2 - 2014-11-10
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/177e8090/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
index 796394f..3f40853 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
@@ -407,7 +407,7 @@ class Fetcher<K,V> extends Thread {
         }
         if ((Time.monotonicNow() - startTime) >= this.fetchRetryTimeout) {
           LOG.warn("Failed to connect to host: " + url + "after " 
-              + fetchRetryTimeout + "milliseconds.");
+              + fetchRetryTimeout + " milliseconds.");
           throw e;
         }
         try {
@@ -596,7 +596,7 @@ class Fetcher<K,V> extends Thread {
     } else {
       // timeout, prepare to be failed.
       LOG.warn("Timeout for copying MapOutput with retry on host " + host 
-          + "after " + fetchRetryTimeout + "milliseconds.");
+          + "after " + fetchRetryTimeout + " milliseconds.");
       
     }
   }
@@ -678,28 +678,49 @@ class Fetcher<K,V> extends Thread {
     } else if (connectionTimeout > 0) {
       unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
     }
+    long startTime = Time.monotonicNow();
+    long lastTime = startTime;
+    int attempts = 0;
     // set the connect timeout to the unit-connect-timeout
     connection.setConnectTimeout(unit);
     while (true) {
       try {
+        attempts++;
         connection.connect();
         break;
       } catch (IOException ioe) {
-        // update the total remaining connect-timeout
-        connectionTimeout -= unit;
-
+        long currentTime = Time.monotonicNow();
+        long retryTime = currentTime - startTime;
+        long leftTime = connectionTimeout - retryTime;
+        long timeSinceLastIteration = currentTime - lastTime;
         // throw an exception if we have waited for timeout amount of time
         // note that the updated value if timeout is used here
-        if (connectionTimeout == 0) {
+        if (leftTime <= 0) {
+          int retryTimeInSeconds = (int) retryTime/1000;
+          LOG.error("Connection retry failed with " + attempts + 
+              " attempts in " + retryTimeInSeconds + " seconds");
           throw ioe;
         }
-
         // reset the connect timeout for the last try
-        if (connectionTimeout < unit) {
-          unit = connectionTimeout;
+        if (leftTime < unit) {
+          unit = (int)leftTime;
           // reset the connect time out for the final connect
           connection.setConnectTimeout(unit);
         }
+        
+        if (timeSinceLastIteration < unit) {
+          try {
+            // sleep the left time of unit
+            sleep(unit - timeSinceLastIteration);
+          } catch (InterruptedException e) {
+            LOG.warn("Sleep in connection retry get interrupted.");
+            if (stopped) {
+              return;
+            }
+          }
+        }
+        // update the total remaining connect-timeout
+        lastTime = Time.monotonicNow();
       }
     }
   }