You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2014/05/02 23:03:02 UTC

git commit: TEZ-1091. Respect keepAlive when shutting down Fetchers. Contributed by Rajesh Balamohan.

Repository: incubator-tez
Updated Branches:
  refs/heads/master 06ee16a33 -> d90dbd8dc


TEZ-1091. Respect keepAlive when shutting down Fetchers. Contributed by
Rajesh Balamohan.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/d90dbd8d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/d90dbd8d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/d90dbd8d

Branch: refs/heads/master
Commit: d90dbd8dc4638784de702caa26bdd66a38005afb
Parents: 06ee16a
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 2 14:02:26 2014 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 2 14:02:26 2014 -0700

----------------------------------------------------------------------
 .../library/common/shuffle/impl/Fetcher.java    | 23 ++++++++++----------
 1 file changed, 11 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/d90dbd8d/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index a761184..950c3cc 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -202,7 +202,7 @@ class Fetcher extends Thread {
           // Shuffle
           copyFromHost(host);
         } finally {
-          cleanupCurrentConnection();
+          cleanupCurrentConnection(!keepAlive);
           if (host != null) {
             scheduler.freeHost(host);
             metrics.threadFree();            
@@ -219,7 +219,7 @@ class Fetcher extends Thread {
   public void shutDown() throws InterruptedException {
     this.stopped = true;
     interrupt();
-    cleanupCurrentConnection();
+    cleanupCurrentConnection(true);
     try {
       join(5000);
     } catch (InterruptedException ie) {
@@ -231,7 +231,7 @@ class Fetcher extends Thread {
   }
 
   private Object cleanupLock = new Object();
-  private void cleanupCurrentConnection() {
+  private void cleanupCurrentConnection(boolean disconnect) {
     // Synchronizing on cleanupLock to ensure we don't run into a parallel close
     // Can't synchronize on the main class itself since that would cause the
     // shutdown request to block
@@ -241,7 +241,7 @@ class Fetcher extends Thread {
           LOG.info("Closing input on " + logIdentifier);
           input.close();
         }
-        if (connection != null) {
+        if (connection != null && disconnect) {
           LOG.info("Closing connection on " + logIdentifier);
           connection.disconnect();
         }
@@ -321,7 +321,7 @@ class Fetcher extends Thread {
       
       if (stopped) {
         LOG.info("Detected fetcher has been shutdown after connection establishment. Returning");
-        cleanupCurrentConnection();
+        cleanupCurrentConnection(true);
         putBackRemainingMapOutputs(host);
         return;
       }
@@ -354,7 +354,7 @@ class Fetcher extends Thread {
     } catch (IOException ie) {
       if (stopped) {
         LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
-        cleanupCurrentConnection();
+        cleanupCurrentConnection(true);
         putBackRemainingMapOutputs(host);
         return;
       }
@@ -413,7 +413,7 @@ class Fetcher extends Thread {
         }
       }
 
-      cleanupCurrentConnection();
+      cleanupCurrentConnection(!keepAlive); //do not disconnect if keepAlive is on
 
       // Sanity check
       if (failedTasks == null && !remaining.isEmpty()) {
@@ -522,14 +522,13 @@ class Fetcher extends Thread {
         return EMPTY_ATTEMPT_ID_ARRAY;
       }
       
-      // Check if we can shuffle *now* ...
       if (mapOutput.getType() == Type.WAIT) {
         // TODO Review: Does this cause a tight loop ?
         LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
-        //Not an error but wait to process data.
+        //Not an error but wait to process data.  
         return EMPTY_ATTEMPT_ID_ARRAY;
-      } 
-      
+      }
+
       // Go!
       LOG.info("fetcher#" + id + " about to shuffle output of map " + 
                mapOutput.getAttemptIdentifier() + " decomp: " +
@@ -554,7 +553,7 @@ class Fetcher extends Thread {
       if (stopped) {
         LOG.info("Not reporting fetch failure for exception during data copy: ["
             + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
-        cleanupCurrentConnection();
+        cleanupCurrentConnection(true);
         if (mapOutput != null) {
           mapOutput.abort(); // Release resources
         }