You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/27 02:01:42 UTC
[2/2] tez git commit: TEZ-2450. support async http clients in ordered
& unordered inputs (rbalamohan)
TEZ-2450. support async http clients in ordered & unordered inputs (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9dabf947
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9dabf947
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9dabf947
Branch: refs/heads/master
Commit: 9dabf94767480750f31d8f3e24d17a89bc036331
Parents: 7be325e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed May 27 05:32:08 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed May 27 05:32:08 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
pom.xml | 5 +
tez-runtime-library/findbugs-exclude.xml | 12 +
tez-runtime-library/pom.xml | 4 +
.../org/apache/tez/http/BaseHttpConnection.java | 63 +++
.../org/apache/tez/http/HttpConnection.java | 318 ++++++++++++++
.../apache/tez/http/HttpConnectionParams.java | 82 ++++
.../java/org/apache/tez/http/SSLFactory.java | 238 +++++++++++
.../http/async/netty/AsyncHttpConnection.java | 231 ++++++++++
.../netty/TezBodyDeferringAsyncHandler.java | 256 +++++++++++
.../library/api/TezRuntimeConfiguration.java | 4 +
.../runtime/library/common/shuffle/Fetcher.java | 34 +-
.../library/common/shuffle/HttpConnection.java | 428 -------------------
.../library/common/shuffle/ShuffleUtils.java | 117 +++--
.../common/shuffle/impl/ShuffleManager.java | 12 +-
.../orderedgrouped/FetcherOrderedGrouped.java | 23 +-
.../orderedgrouped/ShuffleScheduler.java | 11 +-
.../library/input/OrderedGroupedKVInput.java | 1 +
.../runtime/library/input/UnorderedKVInput.java | 1 +
.../org/apache/tez/http/TestHttpConnection.java | 202 +++++++++
.../library/common/shuffle/TestFetcher.java | 12 +-
.../shuffle/orderedgrouped/TestFetcher.java | 71 ++-
.../apache/tez/test/TestPipelinedShuffle.java | 13 +-
.../org/apache/tez/test/TestSecureShuffle.java | 21 +-
24 files changed, 1636 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a14e9da..5f5dd48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2450. support async http clients in ordered & unordered inputs.
TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of blocking threads.
TEZ-2466. tez-history-parser breaks hadoop 2.2 compatability.
TEZ-2463. Update site for 0.7.0 release
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 44592fa..2922cab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,11 @@
<version>1.7.5</version>
</dependency>
<dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>async-http-client</artifactId>
+ <version>1.8.16</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 489e243..919e1e3 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -73,6 +73,18 @@
<Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
</Match>
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped"/>
+ <Method name="setupConnection" params="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost, java.util.List" returns="boolean"/>
+ <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.Fetcher"/>
+ <Method name="setupConnection" params="java.util.List" returns="org.apache.tez.runtime.library.common.shuffle.Fetcher$HostFetchResult"/>
+ <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+ </Match>
+
<!-- TODO This needs more looking into -->
<Match>
<Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 03e0ec3..4433a02 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -26,6 +26,10 @@
<dependencies>
<dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>async-http-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
new file mode 100644
index 0000000..dd642ae
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public abstract class BaseHttpConnection {
+ /**
+ * Basic/unit connection timeout (in milliseconds)
+ */
+ protected final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+
+ /**
+ * Connect to url
+ *
+ * @return boolean
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract boolean connect() throws IOException, InterruptedException;
+
+ /**
+ * Validate established connection
+ *
+ * @throws IOException
+ */
+ public abstract void validate() throws IOException;
+
+ /**
+ * Get inputstream
+ *
+ * @return DataInputStream
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract DataInputStream getInputStream() throws IOException, InterruptedException;
+
+ /**
+ * Clean up connection
+ *
+ * @param disconnect
+ * @throws IOException
+ */
+ public abstract void cleanup(boolean disconnect) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
new file mode 100644
index 0000000..4732354
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+public class HttpConnection extends BaseHttpConnection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
+
+ private URL url;
+ private final String logIdentifier;
+
+ @VisibleForTesting
+ protected volatile HttpURLConnection connection;
+ private volatile DataInputStream input;
+ private volatile boolean connectionSucceeed;
+ private volatile boolean cleanup;
+
+ private final JobTokenSecretManager jobTokenSecretMgr;
+ private String encHash;
+ private String msgToEncode;
+
+ private final HttpConnectionParams httpConnParams;
+ private final Stopwatch stopWatch;
+
+ /**
+ * HttpConnection
+ *
+ * @param url
+ * @param connParams
+ * @param logIdentifier
+ * @param jobTokenSecretManager
+ * @throws IOException
+ */
+ public HttpConnection(URL url, HttpConnectionParams connParams,
+ String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
+ this.logIdentifier = logIdentifier;
+ this.jobTokenSecretMgr = jobTokenSecretManager;
+ this.httpConnParams = connParams;
+ this.url = url;
+ this.stopWatch = new Stopwatch();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MapOutput URL :" + url.toString());
+ }
+ }
+
+ @VisibleForTesting
+ public void computeEncHash() throws IOException {
+ // generate hash of the url
+ msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+ encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
+ }
+
+ private void setupConnection() throws IOException {
+ connection = (HttpURLConnection) url.openConnection();
+ if (httpConnParams.isSslShuffle()) {
+ //Configure for SSL
+ SSLFactory sslFactory = httpConnParams.getSslFactory();
+ Preconditions.checkArgument(sslFactory != null, "SSLFactory can not be null");
+ sslFactory.configure(connection);
+ }
+
+ computeEncHash();
+
+ // put url hash into http header
+ connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ // set the read timeout
+ connection.setReadTimeout(httpConnParams.getReadTimeout());
+ // put shuffle version into http header
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ }
+
+ /**
+ * Connect to source
+ *
+ * @return true if connection was successful
+ * false if connection was previously cleaned up
+ * @throws IOException upon connection failure
+ */
+ @Override
+ public boolean connect() throws IOException {
+ return connect(httpConnParams.getConnectionTimeout());
+ }
+
+ /**
+ * Connect to source with specific timeout
+ *
+ * @param connectionTimeout
+ * @return true if connection was successful
+ * false if connection was previously cleaned up
+ * @throws IOException upon connection failure
+ */
+ private boolean connect(int connectionTimeout) throws IOException {
+ stopWatch.reset().start();
+ if (connection == null) {
+ setupConnection();
+ }
+ int unit = 0;
+ if (connectionTimeout < 0) {
+ throw new IOException("Invalid timeout " + "[timeout = " + connectionTimeout + " ms]");
+ } else if (connectionTimeout > 0) {
+ unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+ }
+ // set the connect timeout to the unit-connect-timeout
+ connection.setConnectTimeout(unit);
+ int connectionFailures = 0;
+ while (true) {
+ long connectStartTime = System.currentTimeMillis();
+ try {
+ connection.connect();
+ connectionSucceeed = true;
+ break;
+ } catch (IOException ioe) {
+ // Don't attempt another connect if already cleanedup.
+ connectionFailures++;
+ if (cleanup) {
+ LOG.info("Cleanup is set to true. Not attempting to"
+ + " connect again. Last exception was: ["
+ + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+ return false;
+ }
+ // update the total remaining connect-timeout
+ connectionTimeout -= unit;
+ // throw an exception if we have waited for timeout amount of time
+ // note that the updated value if timeout is used here
+ if (connectionTimeout <= 0) {
+ throw new IOException(
+ "Failed to connect to " + url + ", #connectionFailures=" + connectionFailures, ioe);
+ }
+ long elapsed = System.currentTimeMillis() - connectStartTime;
+ if (elapsed < unit) {
+ try {
+ long sleepTime = unit - elapsed;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping for " + sleepTime + " while establishing connection to " + url +
+ ", since connectAttempt returned in " + elapsed + " ms");
+ }
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Connection establishment sleep interrupted, #connectionFailures=" +
+ connectionFailures, e);
+ }
+ }
+
+ // reset the connect timeout for the last try
+ if (connectionTimeout < unit) {
+ unit = connectionTimeout;
+ // reset the connect time out for the final connect
+ connection.setConnectTimeout(unit);
+ }
+
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to connect to " + url.toString() +
+ " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="
+ + connectionFailures);
+ }
+ return true;
+ }
+
+ @Override
+ public void validate() throws IOException {
+ stopWatch.reset().start();
+ int rc = connection.getResponseCode();
+ if (rc != HttpURLConnection.HTTP_OK) {
+ throw new IOException("Got invalid response code " + rc + " from " + url
+ + ": " + connection.getResponseMessage());
+ }
+
+ // get the shuffle version
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
+ .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
+ .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ throw new IOException("Incompatible shuffle response version");
+ }
+
+ // get the replyHash which is HMac of the encHash we sent to the server
+ String replyHash =
+ connection
+ .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+ if (replyHash == null) {
+ throw new IOException("security validation of TT Map output failed");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
+ + replyHash);
+ }
+
+ // verify that replyHash is HMac of encHash
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+ //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
+ LOG.info("for url=" + url +
+ " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ }
+
+ /**
+ * Get the inputstream from the connection
+ *
+ * @return DataInputStream
+ * @throws IOException
+ */
+ @Override
+ public DataInputStream getInputStream() throws IOException {
+ stopWatch.reset().start();
+ if (connectionSucceeed) {
+ input = new DataInputStream(new BufferedInputStream(
+ connection.getInputStream(), httpConnParams.getBufferSize()));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to getInputStream (connect) " + url +
+ " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ }
+ return input;
+ }
+
+ /**
+ * Cleanup the connection.
+ *
+ * @param disconnect Close the connection if this is true; otherwise respect keepalive
+ * @throws IOException
+ */
+ @Override
+ public void cleanup(boolean disconnect) throws IOException {
+ cleanup = true;
+ stopWatch.reset().start();
+ try {
+ if (input != null) {
+ LOG.info("Closing input on " + logIdentifier);
+ input.close();
+ input = null;
+ }
+ if (httpConnParams.isKeepAlive() && connectionSucceeed) {
+ // Refer:
+ // http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+ readErrorStream(connection.getErrorStream());
+ }
+ if (connection != null && (disconnect || !httpConnParams.isKeepAlive())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing connection on " + logIdentifier);
+ }
+ connection.disconnect();
+ connection = null;
+ }
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
+ } else {
+ LOG.info("Exception while shutting down fetcher " + logIdentifier
+ + ": " + e.getMessage());
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to cleanup connection to " + url +
+ " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+
+ /**
+ * Cleanup the error stream if any, for keepAlive connections
+ *
+ * @param errorStream
+ */
+ private void readErrorStream(InputStream errorStream) {
+ if (errorStream == null) {
+ return;
+ }
+ try {
+ DataOutputBuffer errorBuffer = new DataOutputBuffer();
+ IOUtils.copyBytes(errorStream, errorBuffer, 4096);
+ IOUtils.closeStream(errorBuffer);
+ IOUtils.closeStream(errorStream);
+ } catch (IOException ioe) {
+ // ignore
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
new file mode 100644
index 0000000..aac4bb3
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+public class HttpConnectionParams {
+ private final boolean keepAlive;
+ private final int keepAliveMaxConnections;
+ private final int connectionTimeout;
+ private final int readTimeout;
+ private final int bufferSize;
+
+ private final boolean sslShuffle;
+ private final SSLFactory sslFactory;
+
+ public HttpConnectionParams(boolean keepAlive, int keepAliveMaxConnections, int
+ connectionTimeout, int readTimeout, int bufferSize, boolean sslShuffle, SSLFactory
+ sslFactory) {
+ this.keepAlive = keepAlive;
+ this.keepAliveMaxConnections = keepAliveMaxConnections;
+ this.connectionTimeout = connectionTimeout;
+ this.readTimeout = readTimeout;
+ this.bufferSize = bufferSize;
+ this.sslShuffle = sslShuffle;
+ this.sslFactory = sslFactory;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public boolean isKeepAlive() {
+ return keepAlive;
+ }
+
+ public int getKeepAliveMaxConnections() {
+ return keepAliveMaxConnections;
+ }
+
+ public int getReadTimeout() {
+ return readTimeout;
+ }
+
+ public boolean isSslShuffle() {
+ return sslShuffle;
+ }
+
+ public SSLFactory getSslFactory() {
+ return sslFactory;
+ }
+
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("keepAlive=").append(keepAlive).append(", ");
+ sb.append("keepAliveMaxConnections=").append(keepAliveMaxConnections).append(", ");
+ sb.append("connectionTimeout=").append(connectionTimeout).append(", ");
+ sb.append("readTimeout=").append(readTimeout).append(", ");
+ sb.append("bufferSize=").append(bufferSize).append(", ");
+ sb.append("bufferSize=").append(bufferSize);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java b/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
new file mode 100644
index 0000000..f23739b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import com.ning.http.client.AsyncHttpClientConfig;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.KeyStoresFactory;
+import org.apache.hadoop.security.ssl.SSLFactory.Mode;
+import org.apache.hadoop.security.ssl.SSLHostnameVerifier;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.security.GeneralSecurityException;
+
+import static org.apache.hadoop.security.ssl.SSLFactory.DEFAULT_SSL_ENABLED_PROTOCOLS;
+import static org.apache.hadoop.security.ssl.SSLFactory.DEFAULT_SSL_REQUIRE_CLIENT_CERT;
+import static org.apache.hadoop.security.ssl.SSLFactory.KEYSTORES_FACTORY_CLASS_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_CLIENT_CONF_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_ENABLED_PROTOCOLS;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_HOSTNAME_VERIFIER_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_SERVER_CONF_KEY;
+
+/**
+ * Factory that creates SSLEngine and SSLSocketFactory instances using
+ * Hadoop configuration information.
+ * <p/>
+ * This SSLFactory uses a {@link org.apache.hadoop.security.ssl.ReloadingX509TrustManager} instance,
+ * which reloads public keys if the truststore file changes.
+ * <p/>
+ * This factory is used to configure HTTPS in Hadoop HTTP based endpoints, both
+ * client and server.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SSLFactory implements ConnectionConfigurator {
+
+ private Configuration conf;
+ private Mode mode;
+ private boolean requireClientCert;
+ private SSLContext context;
+ private HostnameVerifier hostnameVerifier;
+ private KeyStoresFactory keystoresFactory;
+
+ private String[] enabledProtocols = null;
+
+ /**
+ * Creates an SSLFactory.
+ *
+ * @param mode SSLFactory mode, client or server.
+ * @param conf Hadoop configuration from where the SSLFactory configuration
+ * will be read.
+ */
+ public SSLFactory(Mode mode, Configuration conf) {
+ this.conf = conf;
+ if (mode == null) {
+ throw new IllegalArgumentException("mode cannot be NULL");
+ }
+ this.mode = mode;
+ requireClientCert = conf.getBoolean(SSL_REQUIRE_CLIENT_CERT_KEY,
+ DEFAULT_SSL_REQUIRE_CLIENT_CERT);
+ Configuration sslConf = readSSLConfiguration(mode);
+
+ Class<? extends KeyStoresFactory> klass
+ = conf.getClass(KEYSTORES_FACTORY_CLASS_KEY,
+ FileBasedKeyStoresFactory.class, KeyStoresFactory.class);
+ keystoresFactory = ReflectionUtils.newInstance(klass, sslConf);
+
+ enabledProtocols = conf.getStrings(SSL_ENABLED_PROTOCOLS, DEFAULT_SSL_ENABLED_PROTOCOLS);
+ }
+
+ private Configuration readSSLConfiguration(Mode mode) {
+ Configuration sslConf = new Configuration(false);
+ sslConf.setBoolean(SSL_REQUIRE_CLIENT_CERT_KEY, requireClientCert);
+ String sslConfResource;
+ if (mode == Mode.CLIENT) {
+ sslConfResource = conf.get(SSL_CLIENT_CONF_KEY, "ssl-client.xml");
+ } else {
+ sslConfResource = conf.get(SSL_SERVER_CONF_KEY, "ssl-server.xml");
+ }
+ sslConf.addResource(sslConfResource);
+ return sslConf;
+ }
+
+ /**
+ * Initializes the factory.
+ *
+ * @throws GeneralSecurityException thrown if an SSL initialization error
+ * happened.
+ * @throws IOException thrown if an IO error happened while reading the SSL
+ * configuration.
+ */
+ public void init() throws GeneralSecurityException, IOException {
+ keystoresFactory.init(mode);
+ context = SSLContext.getInstance("TLS");
+ context.init(keystoresFactory.getKeyManagers(),
+ keystoresFactory.getTrustManagers(), null);
+ context.getDefaultSSLParameters().setProtocols(enabledProtocols);
+ hostnameVerifier = getHostnameVerifier(conf);
+ }
+
+ private HostnameVerifier getHostnameVerifier(Configuration conf)
+ throws GeneralSecurityException, IOException {
+ return getHostnameVerifier(conf.get(SSL_HOSTNAME_VERIFIER_KEY, "DEFAULT").
+ trim().toUpperCase());
+ }
+
+ public static HostnameVerifier getHostnameVerifier(String verifier)
+ throws GeneralSecurityException, IOException {
+ HostnameVerifier hostnameVerifier;
+ if (verifier.equals("DEFAULT")) {
+ hostnameVerifier = SSLHostnameVerifier.DEFAULT;
+ } else if (verifier.equals("DEFAULT_AND_LOCALHOST")) {
+ hostnameVerifier = SSLHostnameVerifier.DEFAULT_AND_LOCALHOST;
+ } else if (verifier.equals("STRICT")) {
+ hostnameVerifier = SSLHostnameVerifier.STRICT;
+ } else if (verifier.equals("STRICT_IE6")) {
+ hostnameVerifier = SSLHostnameVerifier.STRICT_IE6;
+ } else if (verifier.equals("ALLOW_ALL")) {
+ hostnameVerifier = SSLHostnameVerifier.ALLOW_ALL;
+ } else {
+ throw new GeneralSecurityException("Invalid hostname verifier: " +
+ verifier);
+ }
+ return hostnameVerifier;
+ }
+
+ /**
+ * Releases any resources being used.
+ */
+ public void destroy() {
+ keystoresFactory.destroy();
+ }
+
+ /**
+ * Returns the SSLFactory KeyStoresFactory instance.
+ *
+ * @return the SSLFactory KeyStoresFactory instance.
+ */
+ public KeyStoresFactory getKeystoresFactory() {
+ return keystoresFactory;
+ }
+
+
+ /**
+ * Returns a configured SSLSocketFactory.
+ *
+ * @return the configured SSLSocketFactory.
+ * @throws GeneralSecurityException thrown if the SSLSocketFactory could not
+ * be initialized.
+ * @throws IOException thrown if and IO error occurred while loading
+ * the server keystore.
+ */
+ public SSLSocketFactory createSSLSocketFactory() throws GeneralSecurityException, IOException {
+ if (mode != Mode.CLIENT) {
+ throw new IllegalStateException("Factory is in CLIENT mode");
+ }
+ return context.getSocketFactory();
+ }
+
+ /**
+ * Returns the hostname verifier it should be used in HttpsURLConnections.
+ *
+ * @return the hostname verifier.
+ */
+ public HostnameVerifier getHostnameVerifier() {
+ if (mode != Mode.CLIENT) {
+ throw new IllegalStateException("Factory is in CLIENT mode");
+ }
+ return hostnameVerifier;
+ }
+
+
+
+ /**
+ * If the given {@link HttpURLConnection} is an {@link HttpsURLConnection}
+ * configures the connection with the {@link SSLSocketFactory} and
+ * {@link HostnameVerifier} of this SSLFactory, otherwise does nothing.
+ *
+ * @param conn the {@link HttpURLConnection} instance to configure.
+ * @return the configured {@link HttpURLConnection} instance.
+ * @throws IOException if an IO error occurred.
+ */
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
+ if (conn instanceof HttpsURLConnection) {
+ HttpsURLConnection sslConn = (HttpsURLConnection) conn;
+ try {
+ sslConn.setSSLSocketFactory(createSSLSocketFactory());
+ } catch (GeneralSecurityException ex) {
+ throw new IOException(ex);
+ }
+ sslConn.setHostnameVerifier(getHostnameVerifier());
+ conn = sslConn;
+ }
+ return conn;
+ }
+
+ /**
+ * Set ssl context for {@link com.ning.http.client.AsyncHttpClientConfig.Builder}
+ *
+ * @param asyncNingBuilder {@link com.ning.http.client.AsyncHttpClientConfig.Builder} instance to
+ * configure.
+ * @throws IOException if an IO error occurred.
+ */
+ public void configure(AsyncHttpClientConfig.Builder asyncNingBuilder) throws IOException {
+ if (asyncNingBuilder != null) {
+ asyncNingBuilder.setSSLContext(context);
+ asyncNingBuilder.setHostnameVerifier(getHostnameVerifier());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
new file mode 100644
index 0000000..f46939d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http.async.netty;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.AsyncHttpClientConfig;
+import com.ning.http.client.ListenableFuture;
+import com.ning.http.client.Request;
+import com.ning.http.client.RequestBuilder;
+import com.ning.http.client.Response;
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.http.SSLFactory;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class AsyncHttpConnection extends BaseHttpConnection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class);
+
+ private final JobTokenSecretManager jobTokenSecretMgr;
+ private String encHash;
+ private String msgToEncode;
+
+ private final HttpConnectionParams httpConnParams;
+ private final Stopwatch stopWatch;
+ private final URL url;
+
+ private static volatile AsyncHttpClient httpAsyncClient;
+
+ private final TezBodyDeferringAsyncHandler handler;
+ private final PipedOutputStream pos; //handler would write to this as and when it receives chunks
+ private final PipedInputStream pis; //connected to pos, which can be used by fetchers
+
+ private Response response;
+ private ListenableFuture<Response> responseFuture;
+ private TezBodyDeferringAsyncHandler.BodyDeferringInputStream dis;
+
+ private void initClient(HttpConnectionParams httpConnParams) throws IOException {
+ if (httpAsyncClient != null) {
+ return;
+ }
+
+ if (httpAsyncClient == null) {
+ synchronized (AsyncHttpConnection.class) {
+ if (httpAsyncClient == null) {
+ LOG.info("Initializing AsyncClient (TezBodyDeferringAsyncHandler)");
+ AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
+ if (httpConnParams.isSslShuffle()) {
+ //Configure SSL
+ SSLFactory sslFactory = httpConnParams.getSslFactory();
+ Preconditions.checkArgument(sslFactory != null, "SSLFactory can not be null");
+ sslFactory.configure(builder);
+ }
+
+ /**
+ * TODO : following settings need fine tuning.
+ * Change following config to accept common thread pool later.
+ * Change max connections based on the total inputs (ordered & unordered). Need to tune
+ * setMaxConnections & addRequestFilter.
+ */
+ builder
+ .setAllowPoolingConnection(httpConnParams.isKeepAlive())
+ .setAllowSslConnectionPool(httpConnParams.isKeepAlive())
+ .setCompressionEnabled(false)
+ //.setExecutorService(applicationThreadPool)
+ //.addRequestFilter(new ThrottleRequestFilter())
+ .setMaximumConnectionsPerHost(1)
+ .setConnectionTimeoutInMs(httpConnParams.getConnectionTimeout())
+ .setRequestTimeoutInMs(httpConnParams.getReadTimeout())
+ .setUseRawUrl(true)
+ .build();
+ httpAsyncClient = new AsyncHttpClient(builder.build());
+ }
+ }
+ }
+ }
+
+ public AsyncHttpConnection(URL url, HttpConnectionParams connParams,
+ String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
+ this.jobTokenSecretMgr = jobTokenSecretManager;
+ this.httpConnParams = connParams;
+ this.url = url;
+ this.stopWatch = new Stopwatch();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MapOutput URL :" + url.toString());
+ }
+
+ initClient(httpConnParams);
+ pos = new PipedOutputStream();
+ pis = new PipedInputStream(pos, httpConnParams.getBufferSize());
+ handler = new TezBodyDeferringAsyncHandler(pos, url, UNIT_CONNECT_TIMEOUT);
+ }
+
+ @VisibleForTesting
+ public void computeEncHash() throws IOException {
+ // generate hash of the url
+ msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+ encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
+ }
+
+ /**
+ * Connect to source
+ *
+ * @return true if connection was successful
+ * false if connection was previously cleaned up
+ * @throws IOException upon connection failure
+ */
+ public boolean connect() throws IOException, InterruptedException {
+ computeEncHash();
+
+ RequestBuilder rb = new RequestBuilder();
+ rb.setHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ rb.setHeader(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ rb.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ Request request = rb.setUrl(url.toString()).build();
+
+ //for debugging
+ LOG.debug("Request url={}, encHash={}, id={}", url, encHash);
+
+ try {
+ //Blocks calling thread until it receives headers, but have the option to defer response body
+ responseFuture = httpAsyncClient.executeRequest(request, handler);
+
+ //BodyDeferringAsyncHandler would automatically manage producer and consumer frequency mismatch
+ dis = new TezBodyDeferringAsyncHandler.BodyDeferringInputStream(responseFuture, handler, pis);
+
+ response = dis.getAsapResponse();
+ if (response == null) {
+ throw new IOException("Response is null");
+ }
+ } catch(IOException e) {
+ throw e;
+ }
+
+ //verify the response
+ int rc = response.getStatusCode();
+ if (rc != HttpURLConnection.HTTP_OK) {
+ LOG.debug("Request url={}, id={}", response.getUri());
+ throw new IOException("Got invalid response code " + rc + " from "
+ + url + ": " + response.getStatusText());
+ }
+ return true;
+ }
+
+ public void validate() throws IOException {
+ // get the shuffle version
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME
+ .equals(response.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION
+ .equals(response.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ throw new IOException("Incompatible shuffle response version");
+ }
+
+ // get the replyHash which is HMac of the encHash we sent to the server
+ String replyHash = response.getHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+ if (replyHash == null) {
+ throw new IOException("security validation of TT Map output failed");
+ }
+ LOG.debug("url={};encHash={};replyHash={}", msgToEncode, encHash, replyHash);
+
+ // verify that replyHash is HMac of encHash
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+ //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
+ LOG.info("for url={} sent hash and receievd reply {} ms", url, stopWatch.elapsedMillis());
+ }
+
+ /**
+ * Get the inputstream from the connection
+ *
+ * @return DataInputStream
+ * @throws IOException
+ */
+ public DataInputStream getInputStream() throws IOException, InterruptedException {
+ Preconditions.checkState(response != null, "Response can not be null");
+ return new DataInputStream(dis);
+ }
+
+ @VisibleForTesting
+ public void close() {
+ httpAsyncClient.close();
+ httpAsyncClient = null;
+ }
+ /**
+ * Cleanup the connection.
+ *
+ * @param disconnect
+ * @throws IOException
+ */
+ public void cleanup(boolean disconnect) throws IOException {
+ // Netty internally has its own connection management and takes care of it.
+ if (response != null) {
+ dis.close();
+ }
+ IOUtils.closeQuietly(pos);
+ IOUtils.closeQuietly(pis);
+ response = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
new file mode 100644
index 0000000..8e83eac
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.http.async.netty;
+
+import com.ning.http.client.AsyncHandler;
+import com.ning.http.client.HttpResponseBodyPart;
+import com.ning.http.client.HttpResponseHeaders;
+import com.ning.http.client.HttpResponseStatus;
+import com.ning.http.client.Response;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Same as {@link com.ning.http.client.BodyDeferringAsyncHandler} with additional checks handle
+ * errors in getResponse(). Based on testing, at very high load {@link com.ning.http.client
+ * .BodyDeferringAsyncHandler} gets to hung state in getResponse() as it tries to wait
+ * indefinitely for headers to arrive. This class tries to fix the problem by waiting only for
+ * the connection timeout.
+ */
+@InterfaceAudience.Private
+class TezBodyDeferringAsyncHandler implements AsyncHandler<Response> {
+ private static final Logger LOG = LoggerFactory.getLogger(TezBodyDeferringAsyncHandler.class);
+
+ private final Response.ResponseBuilder responseBuilder = new Response.ResponseBuilder();
+ private final CountDownLatch headersArrived = new CountDownLatch(1);
+ private final OutputStream output;
+
+ private volatile boolean responseSet;
+ private volatile boolean statusReceived;
+ private volatile Response response;
+ private volatile Throwable throwable;
+
+ private final Semaphore semaphore = new Semaphore(1);
+
+ private final URL url;
+ private final int headerReceiveTimeout;
+
+ TezBodyDeferringAsyncHandler(final OutputStream os, final URL url, final int timeout) {
+ this.output = os;
+ this.responseSet = false;
+ this.url = url;
+ this.headerReceiveTimeout = timeout;
+ }
+
+ public void onThrowable(Throwable t) {
+ this.throwable = t;
+ // Counting down to handle error cases too.
+ // In "premature exceptions" cases, the onBodyPartReceived() and
+ // onCompleted()
+ // methods will never be invoked, leaving caller of getResponse() method
+ // blocked forever.
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ // Ignore
+ } finally {
+ LOG.error("Error in asyncHandler ", t);
+ headersArrived.countDown();
+ semaphore.release();
+ }
+ try {
+ closeOut();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
+ responseBuilder.reset();
+ responseBuilder.accumulate(responseStatus);
+ statusReceived = true;
+ return AsyncHandler.STATE.CONTINUE;
+ }
+
+ public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
+ responseBuilder.accumulate(headers);
+ return AsyncHandler.STATE.CONTINUE;
+ }
+
+ public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+ // body arrived, flush headers
+ if (!responseSet) {
+ response = responseBuilder.build();
+ responseSet = true;
+ headersArrived.countDown();
+ }
+ bodyPart.writeTo(output);
+ return AsyncHandler.STATE.CONTINUE;
+ }
+
+ protected void closeOut() throws IOException {
+ try {
+ output.flush();
+ } finally {
+ output.close();
+ }
+ }
+
+ public Response onCompleted() throws IOException {
+ if (!responseSet) {
+ response = responseBuilder.build();
+ responseSet = true;
+ }
+ // Counting down to handle error cases too.
+ // In "normal" cases, latch is already at 0 here
+ // But in other cases, for example when because of some error
+ // onBodyPartReceived() is never called, the caller
+ // of getResponse() would remain blocked infinitely.
+ // By contract, onCompleted() is always invoked, even in case of errors
+ headersArrived.countDown();
+ closeOut();
+ try {
+ semaphore.acquire();
+ if (throwable != null) {
+ IOException ioe = new IOException(throwable.getMessage());
+ ioe.initCause(throwable);
+ throw ioe;
+ } else {
+ // sending out current response
+ return responseBuilder.build();
+ }
+ } catch (InterruptedException e) {
+ return null;
+ } finally {
+ semaphore.release();
+ }
+ }
+
+ /**
+ * This method -- unlike Future<Reponse>.get() -- will block only as long,
+ * as headers arrive. This is useful for large transfers, to examine headers
+ * ASAP, and defer body streaming to it's fine destination and prevent
+ * unneeded bandwidth consumption. The response here will contain the very
+ * 1st response from server, so status code and headers, but it might be
+ * incomplete in case of broken servers sending trailing headers. In that
+ * case, the "usual" Future<Response>.get() method will return complete
+ * headers, but multiple invocations of getResponse() will always return the
+ * 1st cached, probably incomplete one. Note: the response returned by this
+ * method will contain everything <em>except</em> the response body itself,
+ * so invoking any method like Response.getResponseBodyXXX() will result in
+ * error! Also, please not that this method might return <code>null</code>
+ * in case of some errors.
+ *
+ * @return a {@link Response}
+ * @throws InterruptedException
+ */
+ public Response getResponse() throws InterruptedException, IOException {
+ /**
+ * Based on testing, it is possible that it is in connected state, but the headers are not
+ * received. Instead of waiting forever, close after timeout for next retry.
+ */
+ boolean result = headersArrived.await(headerReceiveTimeout, TimeUnit.MILLISECONDS);
+ if (!result) {
+ LOG.error("Breaking after timeout={}, url={}, responseSet={} statusReceived={}",
+ headerReceiveTimeout, url, responseSet, statusReceived);
+ return null;
+ }
+ try {
+ semaphore.acquire();
+ if (throwable != null) {
+ IOException ioe = new IOException(throwable.getMessage());
+ ioe.initCause(throwable);
+ throw ioe;
+ } else {
+ return response;
+ }
+ } finally {
+ semaphore.release();
+ }
+ }
+
+ /**
+ * A simple helper class that is used to perform automatic "join" for async
+ * download and the error checking of the Future of the request.
+ */
+ static class BodyDeferringInputStream extends FilterInputStream {
+ private final Future<Response> future;
+ private final TezBodyDeferringAsyncHandler bdah;
+
+ public BodyDeferringInputStream(final Future<Response> future,
+ final TezBodyDeferringAsyncHandler bdah, final InputStream in) {
+ super(in);
+ this.future = future;
+ this.bdah = bdah;
+ }
+
+ /**
+ * Closes the input stream, and "joins" (wait for complete execution
+ * together with potential exception thrown) of the async request.
+ */
+ public void close() throws IOException {
+ // close
+ super.close();
+ // "join" async request
+ try {
+ getLastResponse();
+ } catch (Exception e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+
+ /**
+ * Delegates to {@link TezBodyDeferringAsyncHandler#getResponse()}. Will
+ * blocks as long as headers arrives only. Might return
+ * <code>null</code>. See
+ * {@link TezBodyDeferringAsyncHandler#getResponse()} method for details.
+ *
+ * @return a {@link Response}
+ * @throws InterruptedException
+ */
+ public Response getAsapResponse() throws InterruptedException, IOException {
+ return bdah.getResponse();
+ }
+
+ /**
+ * Delegates to <code>Future<Response>#get()</code> method. Will block
+ * as long as complete response arrives.
+ *
+ * @return a {@link Response}
+ * @throws InterruptedException
+ * @throws java.util.concurrent.ExecutionException
+ */
+ public Response getLastResponse() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 3d9a701..fc94347 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -210,6 +210,9 @@ public class TezRuntimeConfiguration {
public final static int TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT =
8 * 1024;
+ public static final String TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP = TEZ_RUNTIME_PREFIX +
+ "shuffle.use.async.http";
+ public static final boolean TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP_DEFAULT = false;
public static final String TEZ_RUNTIME_SHUFFLE_ENABLE_SSL = TEZ_RUNTIME_PREFIX +
"shuffle.ssl.enable";
@@ -352,6 +355,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_COMBINER_CLASS);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 61e0151..e7c98b7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
@@ -59,7 +61,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
import com.google.common.base.Preconditions;
@@ -108,7 +109,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private URL url;
private volatile DataInputStream input;
- private HttpConnection httpConnection;
+ BaseHttpConnection httpConnection;
private HttpConnectionParams httpConnectionParams;
private final boolean localDiskFetchEnabled;
@@ -121,6 +122,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// Initiative value is 0, which means it hasn't retried yet.
private long retryStartTime = 0;
+ private final boolean asyncHttp;
+
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
@@ -132,7 +135,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
boolean localDiskFetchEnabled,
boolean sharedFetchEnabled,
String localHostname,
- int shufflePort) {
+ int shufflePort, boolean asyncHttp) {
+ this.asyncHttp = asyncHttp;
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
@@ -402,13 +406,17 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
- port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
+ port, partition, appId.toString(), httpConnectionParams.isSslShuffle());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
- httpConnectionParams.getKeepAlive());
+ httpConnectionParams.isKeepAlive());
- httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, jobTokenSecretMgr);
+ httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
+ logIdentifier, jobTokenSecretMgr);
httpConnection.connect();
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
@@ -449,6 +457,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
return new HostFetchResult(new FetchResult(host, port, partition, remaining),
new InputAttemptIdentifier[] { firstAttempt }, false);
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); //reset status
+ return null;
}
return null;
}
@@ -903,10 +914,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
- Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) {
+ Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
+ boolean asyncHttp) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
- false, localHostname, shufflePort);
+ false, localHostname, shufflePort, asyncHttp);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -915,10 +927,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
- String localHostname, int shufflePort) {
+ String localHostname, int shufflePort, boolean asyncHttp) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
- lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort);
+ lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp);
}
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
deleted file mode 100644
index 7827f0a..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.shuffle;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.HttpsURLConnection;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
-import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
-
-import com.google.common.base.Stopwatch;
-
-/**
- * HttpConnection which can be used for Unordered / Ordered shuffle.
- */
-public class HttpConnection {
-
- private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
-
- /** Basic/unit connection timeout (in milliseconds) */
- private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-
- private URL url;
- private final String logIdentifier;
-
- //Shared by many threads
- private static SSLFactory sslFactory;
-
- @VisibleForTesting
- protected volatile HttpURLConnection connection;
- private volatile DataInputStream input;
-
- private volatile boolean connectionSucceeed;
- private volatile boolean cleanup;
-
- private final JobTokenSecretManager jobTokenSecretMgr;
- private String encHash;
- private String msgToEncode;
-
- private final HttpConnectionParams httpConnParams;
- private final Stopwatch stopWatch;
-
- /**
- * HttpConnection
- *
- * @param url
- * @param connParams
- * @param logIdentifier
- * @param jobTokenSecretManager
- * @throws IOException
- */
- public HttpConnection(URL url, HttpConnectionParams connParams,
- String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
- this.logIdentifier = logIdentifier;
- this.jobTokenSecretMgr = jobTokenSecretManager;
- this.httpConnParams = connParams;
- this.url = url;
- this.stopWatch = new Stopwatch();
- if (LOG.isDebugEnabled()) {
- LOG.debug("MapOutput URL :" + url.toString());
- }
- }
-
- private void setupConnection() throws IOException {
- connection = (HttpURLConnection) url.openConnection();
- if (sslFactory != null && httpConnParams.sslShuffle) {
- try {
- ((HttpsURLConnection) connection).setSSLSocketFactory(sslFactory
- .createSSLSocketFactory());
- ((HttpsURLConnection) connection).setHostnameVerifier(sslFactory
- .getHostnameVerifier());
- } catch (GeneralSecurityException ex) {
- throw new IOException(ex);
- }
- }
- // generate hash of the url
- msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
-
- // put url hash into http header
- connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
- encHash);
- // set the read timeout
- connection.setReadTimeout(httpConnParams.readTimeout);
- // put shuffle version into http header
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
- ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
- ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- }
-
- /**
- * Connect to source
- *
- * @return true if connection was successful
- * false if connection was previously cleaned up
- * @throws IOException upon connection failure
- */
- public boolean connect() throws IOException {
- return connect(httpConnParams.connectionTimeout);
- }
-
- /**
- * Connect to source with specific timeout
- *
- * @param connectionTimeout
- * @return true if connection was successful
- * false if connection was previously cleaned up
- * @throws IOException upon connection failure
- */
- public boolean connect(int connectionTimeout) throws IOException {
- stopWatch.reset().start();
- if (connection == null) {
- setupConnection();
- }
- int unit = 0;
- if (connectionTimeout < 0) {
- throw new IOException("Invalid timeout " + "[timeout = "
- + connectionTimeout + " ms]");
- } else if (connectionTimeout > 0) {
- unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
- }
- // set the connect timeout to the unit-connect-timeout
- connection.setConnectTimeout(unit);
- int connectionFailures = 0;
- while (true) {
- long connectStartTime = System.currentTimeMillis();
- try {
- connection.connect();
- connectionSucceeed = true;
- break;
- } catch (IOException ioe) {
- // Don't attempt another connect if already cleanedup.
- connectionFailures++;
- if (cleanup) {
- LOG.info("Cleanup is set to true. Not attempting to"
- + " connect again. Last exception was: ["
- + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
- return false;
- }
- // update the total remaining connect-timeout
- connectionTimeout -= unit;
- // throw an exception if we have waited for timeout amount of time
- // note that the updated value if timeout is used here
- if (connectionTimeout <= 0) {
- throw new IOException(
- "Failed to connect to " + url + ", #connectionFailures=" + connectionFailures, ioe);
- }
- long elapsed = System.currentTimeMillis() - connectStartTime;
- if (elapsed < unit) {
- try {
- long sleepTime = unit - elapsed;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for " + sleepTime + " while establishing connection to " + url +
- ", since connectAttempt returned in " + elapsed + " ms");
- }
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw new IOException(
- "Connection establishment sleep interrupted, #connectionFailures=" +
- connectionFailures, e);
- }
- }
-
- // reset the connect timeout for the last try
- if (connectionTimeout < unit) {
- unit = connectionTimeout;
- // reset the connect time out for the final connect
- connection.setConnectTimeout(unit);
- }
-
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Time taken to connect to " + url.toString() +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="+ connectionFailures);
- }
- return true;
- }
-
- public void validate() throws IOException {
- stopWatch.reset().start();
- int rc = connection.getResponseCode();
- if (rc != HttpURLConnection.HTTP_OK) {
- throw new IOException("Got invalid response code " + rc + " from " + url
- + ": " + connection.getResponseMessage());
- }
- // get the shuffle version
- if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
- .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
- || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
- .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
- throw new IOException("Incompatible shuffle response version");
- }
- // get the replyHash which is HMac of the encHash we sent to the server
- String replyHash =
- connection
- .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
- if (replyHash == null) {
- throw new IOException("security validation of TT Map output failed");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
- + replyHash);
- }
- // verify that replyHash is HMac of encHash
- SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
- //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
- LOG.info("for url=" + url +
- " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
- }
-
- /**
- * Get the inputstream from the connection
- *
- * @return DataInputStream
- * @throws IOException
- */
- public DataInputStream getInputStream() throws IOException {
- stopWatch.reset().start();
- if (connectionSucceeed) {
- input =
- new DataInputStream(new BufferedInputStream(
- connection.getInputStream(), httpConnParams.bufferSize));
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Time taken to getInputStream (connect) " + url +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
- }
- return input;
- }
-
- /**
- * Cleanup the connection.
- *
- * @param disconnect
- * Close the connection if this is true; otherwise respect keepalive
- * @throws IOException
- */
- public void cleanup(boolean disconnect) throws IOException {
- cleanup = true;
- stopWatch.reset().start();
- try {
- if (input != null) {
- LOG.info("Closing input on " + logIdentifier);
- input.close();
- input = null;
- }
- if (httpConnParams.keepAlive && connectionSucceeed) {
- // Refer:
- // http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
- readErrorStream(connection.getErrorStream());
- }
- if (connection != null && (disconnect || !httpConnParams.keepAlive)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing connection on " + logIdentifier);
- }
- connection.disconnect();
- connection = null;
- }
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
- } else {
- LOG.info("Exception while shutting down fetcher " + logIdentifier
- + ": " + e.getMessage());
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Time taken to cleanup connection to " + url +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
- }
- }
-
- /**
- * Cleanup the error stream if any, for keepAlive connections
- *
- * @param errorStream
- */
- private void readErrorStream(InputStream errorStream) {
- if (errorStream == null) {
- return;
- }
- try {
- DataOutputBuffer errorBuffer = new DataOutputBuffer();
- IOUtils.copyBytes(errorStream, errorBuffer, 4096);
- IOUtils.closeStream(errorBuffer);
- IOUtils.closeStream(errorStream);
- } catch (IOException ioe) {
- // ignore
- }
- }
-
- public static class HttpConnectionParams {
- private boolean keepAlive;
- private int keepAliveMaxConnections;
- private int connectionTimeout;
- private int readTimeout;
- private int bufferSize;
- private boolean sslShuffle;
-
- public boolean getKeepAlive() {
- return keepAlive;
- }
-
- public int getKeepAliveMaxConnections() {
- return keepAliveMaxConnections;
- }
-
- public int getConnectionTimeout() {
- return connectionTimeout;
- }
-
- public int getReadTimeout() {
- return readTimeout;
- }
-
- public void setReadTimeout(int readTimeout) {
- this.readTimeout = readTimeout;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public boolean isSSLShuffleEnabled() {
- return sslShuffle;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("keepAlive=").append(keepAlive).append(", ");
- sb.append("keepAliveMaxConnections=").append(keepAliveMaxConnections).append(", ");
- sb.append("connectionTimeout=").append(connectionTimeout).append(", ");
- sb.append("readTimeout=").append(readTimeout).append(", ");
- sb.append("bufferSize=").append(bufferSize).append(", ");
- sb.append("sslShuffle=").append(sslShuffle);
- return sb.toString();
- }
- }
-
- public static class HttpConnectionParamsBuilder {
- private HttpConnectionParams params;
-
- public HttpConnectionParamsBuilder() {
- params = new HttpConnectionParams();
- }
-
- public HttpConnectionParamsBuilder setKeepAlive(boolean keepAlive,
- int keepAliveMaxConnections) {
- params.keepAlive = keepAlive;
- params.keepAliveMaxConnections = keepAliveMaxConnections;
- return this;
- }
-
- public HttpConnectionParamsBuilder setTimeout(int connectionTimeout,
- int readTimeout) {
- params.connectionTimeout = connectionTimeout;
- params.readTimeout = readTimeout;
- return this;
- }
-
- public synchronized HttpConnectionParamsBuilder setSSL(boolean sslEnabled,
- Configuration conf) {
- synchronized (HttpConnectionParamsBuilder.class) {
- params.sslShuffle = sslEnabled;
- if (sslEnabled) {
- //Create sslFactory if it is null or if it was destroyed earlier
- if (sslFactory == null || sslFactory.getKeystoresFactory()
- .getTrustManagers() == null) {
- LOG.info("Initializing SSL factory in HttpConnection");
- sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- try {
- sslFactory.init();
- } catch (Exception ex) {
- sslFactory.destroy();
- sslFactory = null;
- throw new RuntimeException(ex);
- }
- }
- }
- }
- return this;
- }
-
- public HttpConnectionParamsBuilder setBufferSize(int bufferSize) {
- params.bufferSize = bufferSize;
- return this;
- }
-
- public HttpConnectionParams build() {
- return params;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 46489ed..8b6e847 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -33,13 +33,16 @@ import javax.crypto.SecretKey;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.http.SSLFactory;
+import org.apache.tez.http.async.netty.AsyncHttpConnection;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.TezCommonUtils;
@@ -54,8 +57,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParamsBuilder;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -66,6 +67,9 @@ public class ShuffleUtils {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+ //Shared by multiple threads
+ private static volatile SSLFactory sslFactory;
+
static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
new ThreadLocal<DecimalFormat>() {
@Override
@@ -213,45 +217,15 @@ public class ShuffleUtils {
return new URL(url.toString());
}
- public static HttpConnectionParams constructHttpShuffleConnectionParams(
- Configuration conf) {
- HttpConnectionParamsBuilder builder = new HttpConnectionParamsBuilder();
-
- int connectionTimeout =
- conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
-
- int readTimeout =
- conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
-
- int bufferSize =
- conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
-
- boolean keepAlive =
- conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
- int keepAliveMaxConnections =
- conf.getInt(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
- if (keepAlive) {
- System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
- System.setProperty("http.maxConnections",
- String.valueOf(keepAliveMaxConnections));
- LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
+ public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url,
+ HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager)
+ throws IOException {
+ if (asyncHttp) {
+ //TODO: support other async packages? httpclient-async?
+ return new AsyncHttpConnection(url, params, logIdentifier, jobTokenSecretManager);
+ } else {
+ return new HttpConnection(url, params, logIdentifier, jobTokenSecretManager);
}
-
- builder.setTimeout(connectionTimeout, readTimeout)
- .setBufferSize(bufferSize)
- .setKeepAlive(keepAlive, keepAliveMaxConnections);
-
- boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
- builder.setSSL(sslShuffle, conf);
-
- return builder.build();
}
public static String stringify(DataMovementEventPayloadProto dmProto) {
@@ -473,5 +447,62 @@ public class ShuffleUtils {
", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
MBPS_FORMAT.get().format(rate) + " MB/s");
}
+
+ /**
+ * Build {@link org.apache.tez.http.HttpConnectionParams} from configuration
+ *
+ * @param conf
+ * @return HttpConnectionParams
+ */
+ public static HttpConnectionParams getHttpConnectionParams(Configuration conf) {
+ int connectionTimeout =
+ conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
+
+ int readTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
+
+ int bufferSize = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
+
+ boolean keepAlive = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
+
+ int keepAliveMaxConnections = conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
+
+ if (keepAlive) {
+ System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
+ System.setProperty("http.maxConnections", String.valueOf(keepAliveMaxConnections));
+ }
+
+ boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
+
+ if (sslShuffle) {
+ if (sslFactory == null) {
+ synchronized (HttpConnectionParams.class) {
+ //Create sslFactory if it is null or if it was destroyed earlier
+ if (sslFactory == null || sslFactory.getKeystoresFactory().getTrustManagers() == null) {
+ sslFactory =
+ new SSLFactory(org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT, conf);
+ try {
+ sslFactory.init();
+ } catch (Exception ex) {
+ sslFactory.destroy();
+ sslFactory = null;
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+ }
+
+ HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive,
+ keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle,
+ sslFactory);
+ return httpConnParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index f354920..b7c0742 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -26,7 +26,6 @@ import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -46,7 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import org.apache.tez.http.HttpConnectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -55,7 +54,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.net.NetUtils;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
@@ -76,7 +74,6 @@ import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.Fetcher;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
@@ -132,6 +129,7 @@ public class ShuffleManager implements FetcherCallback {
private final Condition wakeLoop = lock.newCondition();
private final int numFetchers;
+ private final boolean asyncHttp;
// Parameters required by Fetchers
private final JobTokenSecretManager jobTokenSecretMgr;
@@ -241,8 +239,8 @@ public class ShuffleManager implements FetcherCallback {
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret);
- httpConnectionParams =
- ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+ this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
+ httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
@@ -398,7 +396,7 @@ public class ShuffleManager implements FetcherCallback {
httpConnectionParams, inputManager, inputContext.getApplicationId(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
- localhostName, shufflePort);
+ localhostName, shufflePort, asyncHttp);
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);