You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/04/16 01:21:16 UTC

git commit: TEZ-695. TEZ-988. Enable KeepAlive in Tez Fetcher (Rajesh Balamohan via bikas)

Repository: incubator-tez
Updated Branches:
  refs/heads/master 7083f0119 -> 36e940c6f


TEZ-695. TEZ-988. Enable KeepAlive in Tez Fetcher (Rajesh Balamohan via bikas)


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/36e940c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/36e940c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/36e940c6

Branch: refs/heads/master
Commit: 36e940c6ff10f36df66072eb16393c9bbdd8e275
Parents: 7083f01
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Apr 15 16:21:16 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Apr 15 16:21:16 2014 -0700

----------------------------------------------------------------------
 .../org/apache/tez/common/TezJobConfig.java     | 16 +++++-
 .../library/common/shuffle/impl/Fetcher.java    | 60 ++++++++++++++++++--
 2 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e940c6/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index e5e409b..ae122ff 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -226,10 +226,24 @@ public class TezJobConfig {
   /**
    * 
    */
+  public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED =
+      "tez.runtime.shuffle.keep-alive.enabled";
+  public static final boolean DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED = false;
+
+  /**
+   *
+   */
+  public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS =
+      "tez.runtime.shuffle.keep-alive.max.connections";
+  public static final int DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS = 20;
+
+  /**
+   *
+   */
   public static final String TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT = "tez.runtime.shuffle.read.timeout";
   public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT = 
       3 * 60 * 1000;
-  
+
   /**
    * 
    */

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e940c6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 9462d95..32fad76 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -38,6 +38,7 @@ import javax.net.ssl.HttpsURLConnection;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -64,6 +65,9 @@ class Fetcher extends Thread {
   private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
                                     CONNECTION, WRONG_REDUCE}
   
+  private final static String URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED =
+      "sun.net.http.errorstream.enableBuffering";
+  private final static String URL_CONNECTION_MAX_CONNECTIONS = "http.maxConnections";
   private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
   private final TezCounter connectionErrs;
   private final TezCounter ioErrs;
@@ -98,6 +102,8 @@ class Fetcher extends Thread {
   
   private LinkedHashSet<InputAttemptIdentifier> remaining;
 
+  private boolean keepAlive = false;
+
   public Fetcher(Configuration job, 
       ShuffleScheduler scheduler, MergeManager merger,
       ShuffleClientMetrics metrics,
@@ -132,6 +138,19 @@ class Fetcher extends Thread {
       this.decompressor = null;
     }
 
+    this.keepAlive =
+        job.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+    int keepAliveMaxConnections =
+        job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+          TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+
+    if (keepAlive) {
+      System.setProperty(URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED, "true");
+      System.setProperty(URL_CONNECTION_MAX_CONNECTIONS, String.valueOf(keepAliveMaxConnections));
+      LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
+    }
+
     this.connectionTimeout = 
         job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
             TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
@@ -245,16 +264,17 @@ class Fetcher extends Thread {
     
     // Construct the url and connect
     DataInputStream input;
+    HttpURLConnection connection = null;
     boolean connectSucceeded = false;
     
     try {
       URL url = getMapOutputURL(host, srcAttempts);
-      HttpURLConnection connection = openConnection(url);
-      
+      connection = openConnection(url);
+
       // generate hash of the url
       String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
       String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
-      
+
       // put url hash into http header
       connection.addRequestProperty(
           SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
@@ -293,6 +313,11 @@ class Fetcher extends Thread {
       SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
       LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
     } catch (IOException ie) {
+      if (keepAlive && connection != null) {
+        //Read the response body in case of error. This helps in connection reuse.
+        //Refer: http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+        readErrorStream(connection.getErrorStream());
+      }
       ioErrs.increment(1);
       if (!connectSucceeded) {
         LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
@@ -304,7 +329,6 @@ class Fetcher extends Thread {
 
       // At this point, either the connection failed, or the initial header verification failed.
       // The error does not relate to any specific Input. Report all of them as failed.
-      
       // This ends up indirectly penalizing the host (multiple failures reported on the single host)
 
       for(InputAttemptIdentifier left: remaining) {
@@ -338,6 +362,10 @@ class Fetcher extends Thread {
         for(InputAttemptIdentifier left: failedTasks) {
           scheduler.copyFailed(left, host, true, false);
         }
+        //Being defensive: cleanup the error stream in case of keepAlive
+        if (keepAlive && connection != null) {
+          readErrorStream(connection.getErrorStream());
+        }
       }
       
       IOUtils.cleanup(LOG, input);
@@ -351,7 +379,23 @@ class Fetcher extends Thread {
       putBackRemainingMapOutputs(host);
     }
   }
-  
+
+  /**
+   * Cleanup the error stream if any, for keepAlive connections
+   * 
+   * @param errorStream
+   */
+  private void readErrorStream(InputStream errorStream) {
+    try {
+      DataOutputBuffer errorBuffer = new DataOutputBuffer();
+      IOUtils.copyBytes(errorStream, errorBuffer, 4096);
+      IOUtils.closeStream(errorBuffer);
+      IOUtils.closeStream(errorStream);
+    } catch(IOException ioe) {
+      //ignore
+    }
+  }
+
   private void putBackRemainingMapOutputs(MapHost host) {
     // Cycle through remaining MapOutputs
     boolean isFirst = true;
@@ -550,7 +594,11 @@ class Fetcher extends Thread {
       url.append(mapId.getPathComponent());
       first = false;
     }
-   
+    //It is possible to override keep-alive setting in cluster by adding keepAlive in url.
+    //Refer MAPREDUCE-5787 to enable/disable keep-alive in the cluster.
+    if (keepAlive) {
+      url.append("&keepAlive=true");
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
     }