You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/04/16 01:21:16 UTC
git commit: TEZ-695. TEZ-988. Enable KeepAlive in Tez Fetcher (Rajesh
Balamohan via bikas)
Repository: incubator-tez
Updated Branches:
refs/heads/master 7083f0119 -> 36e940c6f
TEZ-695. TEZ-988. Enable KeepAlive in Tez Fetcher (Rajesh Balamohan via bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/36e940c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/36e940c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/36e940c6
Branch: refs/heads/master
Commit: 36e940c6ff10f36df66072eb16393c9bbdd8e275
Parents: 7083f01
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Apr 15 16:21:16 2014 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Apr 15 16:21:16 2014 -0700
----------------------------------------------------------------------
.../org/apache/tez/common/TezJobConfig.java | 16 +++++-
.../library/common/shuffle/impl/Fetcher.java | 60 ++++++++++++++++++--
2 files changed, 69 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e940c6/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
index e5e409b..ae122ff 100644
--- a/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
+++ b/tez-api/src/main/java/org/apache/tez/common/TezJobConfig.java
@@ -226,10 +226,24 @@ public class TezJobConfig {
/**
*
*/
+ public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED =
+ "tez.runtime.shuffle.keep-alive.enabled";
+ public static final boolean DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED = false;
+
+ /**
+ *
+ */
+ public static final String TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS =
+ "tez.runtime.shuffle.keep-alive.max.connections";
+ public static final int DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS = 20;
+
+ /**
+ *
+ */
public static final String TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT = "tez.runtime.shuffle.read.timeout";
public final static int DEFAULT_TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT =
3 * 60 * 1000;
-
+
/**
*
*/
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/36e940c6/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
index 9462d95..32fad76 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/Fetcher.java
@@ -38,6 +38,7 @@ import javax.net.ssl.HttpsURLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
@@ -64,6 +65,9 @@ class Fetcher extends Thread {
private static enum ShuffleErrors{IO_ERROR, WRONG_LENGTH, BAD_ID, WRONG_MAP,
CONNECTION, WRONG_REDUCE}
+ private final static String URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED =
+ "sun.net.http.errorstream.enableBuffering";
+ private final static String URL_CONNECTION_MAX_CONNECTIONS = "http.maxConnections";
private final static String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
private final TezCounter connectionErrs;
private final TezCounter ioErrs;
@@ -98,6 +102,8 @@ class Fetcher extends Thread {
private LinkedHashSet<InputAttemptIdentifier> remaining;
+ private boolean keepAlive = false;
+
public Fetcher(Configuration job,
ShuffleScheduler scheduler, MergeManager merger,
ShuffleClientMetrics metrics,
@@ -132,6 +138,19 @@ class Fetcher extends Thread {
this.decompressor = null;
}
+ this.keepAlive =
+ job.getBoolean(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED);
+ int keepAliveMaxConnections =
+ job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+ TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS);
+
+ if (keepAlive) {
+ System.setProperty(URL_CONNECTION_ERROR_STREAM_BUFFER_ENABLED, "true");
+ System.setProperty(URL_CONNECTION_MAX_CONNECTIONS, String.valueOf(keepAliveMaxConnections));
+ LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
+ }
+
this.connectionTimeout =
job.getInt(TezJobConfig.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
TezJobConfig.DEFAULT_TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT);
@@ -245,16 +264,17 @@ class Fetcher extends Thread {
// Construct the url and connect
DataInputStream input;
+ HttpURLConnection connection = null;
boolean connectSucceeded = false;
try {
URL url = getMapOutputURL(host, srcAttempts);
- HttpURLConnection connection = openConnection(url);
-
+ connection = openConnection(url);
+
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
String encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecret);
-
+
// put url hash into http header
connection.addRequestProperty(
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
@@ -293,6 +313,11 @@ class Fetcher extends Thread {
SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecret);
LOG.info("for url="+msgToEncode+" sent hash and receievd reply");
} catch (IOException ie) {
+ if (keepAlive && connection != null) {
+ //Read the response body in case of error. This helps in connection reuse.
+ //Refer: http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+ readErrorStream(connection.getErrorStream());
+ }
ioErrs.increment(1);
if (!connectSucceeded) {
LOG.warn("Failed to connect to " + host + " with " + remaining.size() + " inputs", ie);
@@ -304,7 +329,6 @@ class Fetcher extends Thread {
// At this point, either the connection failed, or the initial header verification failed.
// The error does not relate to any specific Input. Report all of them as failed.
-
// This ends up indirectly penalizing the host (multiple failures reported on the single host)
for(InputAttemptIdentifier left: remaining) {
@@ -338,6 +362,10 @@ class Fetcher extends Thread {
for(InputAttemptIdentifier left: failedTasks) {
scheduler.copyFailed(left, host, true, false);
}
+ //Being defensive: cleanup the error stream in case of keepAlive
+ if (keepAlive && connection != null) {
+ readErrorStream(connection.getErrorStream());
+ }
}
IOUtils.cleanup(LOG, input);
@@ -351,7 +379,23 @@ class Fetcher extends Thread {
putBackRemainingMapOutputs(host);
}
}
-
+
+ /**
+ * Cleanup the error stream if any, for keepAlive connections
+ *
+ * @param errorStream
+ */
+ private void readErrorStream(InputStream errorStream) {
+ try {
+ DataOutputBuffer errorBuffer = new DataOutputBuffer();
+ IOUtils.copyBytes(errorStream, errorBuffer, 4096);
+ IOUtils.closeStream(errorBuffer);
+ IOUtils.closeStream(errorStream);
+ } catch(IOException ioe) {
+ //ignore
+ }
+ }
+
private void putBackRemainingMapOutputs(MapHost host) {
// Cycle through remaining MapOutputs
boolean isFirst = true;
@@ -550,7 +594,11 @@ class Fetcher extends Thread {
url.append(mapId.getPathComponent());
first = false;
}
-
+ //It is possible to override keep-alive setting in cluster by adding keepAlive in url.
+ //Refer MAPREDUCE-5787 to enable/disable keep-alive in the cluster.
+ if (keepAlive) {
+ url.append("&keepAlive=true");
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("MapOutput URL for " + host + " -> " + url.toString());
}