You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/05/27 02:01:41 UTC
[1/2] tez git commit: TEZ-2450. support async http clients in ordered
& unordered inputs (rbalamohan)
Repository: tez
Updated Branches:
refs/heads/master 7be325eab -> 9dabf9476
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
index 0248f13..a4d38ce 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/FetcherOrderedGrouped.java
@@ -27,6 +27,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.CallableWithNdc;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,8 +45,6 @@ import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapOutput.Ty
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -86,11 +86,13 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
private final boolean ifileReadAhead;
private final int ifileReadAheadLength;
- private LinkedList<InputAttemptIdentifier> remaining;
+ @VisibleForTesting
+ LinkedList<InputAttemptIdentifier> remaining;
volatile DataInputStream input;
- volatile HttpConnection httpConnection;
+ volatile BaseHttpConnection httpConnection;
+ private final boolean asyncHttp;
// Initiative value is 0, which means it hasn't retried yet.
@@ -114,7 +116,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
TezCounter badIdErrsCounter,
TezCounter wrongMapErrsCounter,
TezCounter connectionErrsCounter,
- TezCounter wrongReduceErrsCounter) {
+ TezCounter wrongReduceErrsCounter,
+ boolean asyncHttp) {
this.scheduler = scheduler;
this.allocator = allocator;
this.metrics = metrics;
@@ -134,6 +137,7 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
this.ifileReadAhead = ifileReadAhead;
this.ifileReadAheadLength = ifileReadAheadLength;
this.httpConnectionParams = httpConnectionParams;
+ this.asyncHttp = asyncHttp;
if (codec != null) {
this.codec = codec;
} else {
@@ -311,8 +315,8 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
boolean connectSucceeded = false;
try {
URL url = ShuffleUtils.constructInputURL(host.getBaseUrl(), attempts,
- httpConnectionParams.getKeepAlive());
- httpConnection = new HttpConnection(url, httpConnectionParams,
+ httpConnectionParams.isKeepAlive());
+ httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
logIdentifier, jobTokenSecretManager);
connectSucceeded = httpConnection.connect();
@@ -323,7 +327,10 @@ class FetcherOrderedGrouped extends CallableWithNdc<Void> {
input = httpConnection.getInputStream();
httpConnection.validate();
return true;
- } catch (IOException ie) {
+ } catch (IOException | InterruptedException ie) {
+ if (ie instanceof InterruptedException) {
+ Thread.currentThread().interrupt(); //reset status
+ }
if (stopped) {
LOG.info("Not reporting fetch failure, since an Exception was caught after shutdown");
return false;
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 85c3a30..75dca64 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -54,10 +54,10 @@ import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConstants;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -144,7 +144,7 @@ class ShuffleScheduler {
private final ListeningExecutorService fetcherExecutor;
- private final HttpConnection.HttpConnectionParams httpConnectionParams;
+ private final HttpConnectionParams httpConnectionParams;
private final FetchedInputAllocatorOrderedGrouped allocator;
private final ShuffleClientMetrics shuffleMetrics;
private final Shuffle shuffle;
@@ -157,6 +157,7 @@ class ShuffleScheduler {
private final boolean localDiskFetchEnabled;
private final String localHostname;
private final int shufflePort;
+ private final boolean asyncHttp;
private final TezCounter ioErrsCounter;
private final TezCounter wrongLengthErrsCounter;
@@ -245,8 +246,8 @@ class ShuffleScheduler {
this.startTime = startTime;
this.lastProgressTime = startTime;
- this.httpConnectionParams =
- ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+ this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
+ this.httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
this.shuffleMetrics = new ShuffleClientMetrics(inputContext.getDAGName(),
inputContext.getTaskVertexName(), inputContext.getTaskIndex(),
this.conf, UserGroupInformation.getCurrentUser().getShortUserName());
@@ -1016,7 +1017,7 @@ class ShuffleScheduler {
shuffleMetrics, shuffle, jobTokenSecretManager, ifileReadAhead, ifileReadAheadLength,
codec, conf, localDiskFetchEnabled, localHostname, shufflePort, srcNameTrimmed, mapHost,
ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter, wrongMapErrsCounter,
- connectionErrsCounter, wrongReduceErrsCounter);
+ connectionErrsCounter, wrongReduceErrsCounter, asyncHttp);
}
private class FetchFutureCallback implements FutureCallback<Void> {
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
index 12a5955..7399359 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/OrderedGroupedKVInput.java
@@ -330,6 +330,7 @@ public class OrderedGroupedKVInput extends AbstractLogicalInput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINE_MIN_SPILLS);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_COMBINER_CLASS);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
index 7fc9317..1016263 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/input/UnorderedKVInput.java
@@ -242,6 +242,7 @@ public class UnorderedKVInput extends AbstractLogicalInput {
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IFILE_READAHEAD_BYTES);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_FILE_BUFFER_SIZE);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_FACTOR);
+ confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
confKeys.add(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/test/java/org/apache/tez/http/TestHttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/http/TestHttpConnection.java b/tez-runtime-library/src/test/java/org/apache/tez/http/TestHttpConnection.java
new file mode 100644
index 0000000..8f0a7ad
--- /dev/null
+++ b/tez-runtime-library/src/test/java/org/apache/tez/http/TestHttpConnection.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import com.google.common.base.Throwables;
+import org.apache.tez.http.async.netty.AsyncHttpConnection;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.channels.ClosedByInterruptException;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+public class TestHttpConnection {
+
+ private static int connTimeout = 5000;
+ private static int readTimeout = 5000;
+
+ private static final String NOT_HOSTED_URL = "http://10.255.255.255:10221";
+
+ private static ExecutorService executorService;
+ private static URL url;
+ private static JobTokenSecretManager tokenSecretManager;
+
+ private Thread currentThread;
+
+ @BeforeClass
+ public static void setup() throws IOException, URISyntaxException {
+ executorService = Executors.newFixedThreadPool(1,
+ new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ url = new URL(NOT_HOSTED_URL);
+ tokenSecretManager = mock(JobTokenSecretManager.class);
+ }
+
+ @AfterClass
+ public static void cleanup() throws Exception {
+ executorService.shutdownNow();
+ }
+
+ public void baseTest(Callable<Void> worker, CountDownLatch latch, String message) throws
+ InterruptedException {
+ long startTime = System.currentTimeMillis();
+ try {
+ Future future = executorService.submit(worker);
+ future.get();
+ } catch (ExecutionException e) {
+ assertTrue(e.getCause().getCause() instanceof IOException);
+ assertTrue(e.getMessage(), e.getMessage().contains(message));
+ long elapsedTime = System.currentTimeMillis() - startTime;
+ assertTrue("elapasedTime=" + elapsedTime + " should be greater than " + connTimeout,
+ elapsedTime > connTimeout);
+ }
+ assertTrue(latch.getCount() == 0);
+ }
+
+ @Test(timeout = 20000)
+ public void testConnectionTimeout() throws IOException, InterruptedException {
+ HttpConnectionParams params = getConnectionParams();
+
+ //For http
+ CountDownLatch latch = new CountDownLatch(1);
+ HttpConnection httpConn = getHttpConnection(params);
+ baseTest(new Worker(latch, httpConn, false), latch, "Failed to connect");
+
+ //For async http
+ latch = new CountDownLatch(1);
+ AsyncHttpConnection asyncHttpConn = getAsyncHttpConnection(params);
+ baseTest(new Worker(latch, asyncHttpConn, false), latch, "connection timed out");
+ }
+
+ @Test(timeout = 20000)
+ @SuppressWarnings("unchecked")
+ //Should be interruptible
+ public void testAsyncHttpConnectionInterrupt()
+ throws IOException, InterruptedException, ExecutionException {
+ CountDownLatch latch = new CountDownLatch(1);
+ HttpConnectionParams params = getConnectionParams();
+ AsyncHttpConnection asyncHttpConn = getAsyncHttpConnection(params);
+ Future future = executorService.submit(new Worker(latch, asyncHttpConn, true));
+
+ while(currentThread == null) {
+ synchronized (this) {
+ wait(100);
+ }
+ }
+
+ assertTrue("currentThread is still null", currentThread != null);
+
+ //Try interrupting the thread (exception verification happens in the worker itself)
+ currentThread.interrupt();
+
+ future.get();
+ assertTrue(latch.getCount() == 0);
+ }
+
+ HttpConnectionParams getConnectionParams() {
+ HttpConnectionParams params = mock(HttpConnectionParams.class);
+ when(params.getBufferSize()).thenReturn(8192);
+ when(params.getKeepAliveMaxConnections()).thenReturn(1);
+ when(params.getConnectionTimeout()).thenReturn(connTimeout);
+ when(params.getReadTimeout()).thenReturn(readTimeout);
+ return params;
+ }
+
+ HttpConnection getHttpConnection(HttpConnectionParams params) throws IOException {
+ HttpConnection realConn = new HttpConnection(url, params, "log", tokenSecretManager);
+ HttpConnection connection = spy(realConn);
+
+ doAnswer(new Answer() {
+ public Void answer(InvocationOnMock invocation) {
+ return null;
+ }
+ }).when(connection).computeEncHash();
+ return connection;
+ }
+
+ AsyncHttpConnection getAsyncHttpConnection(HttpConnectionParams params) throws IOException {
+ AsyncHttpConnection realConn = new AsyncHttpConnection(url, params, "log", tokenSecretManager);
+ AsyncHttpConnection connection = spy(realConn);
+
+ doAnswer(new Answer() {
+ public Void answer(InvocationOnMock invocation) {
+ return null;
+ }
+ }).when(connection).computeEncHash();
+ return connection;
+ }
+
+ class Worker implements Callable<Void> {
+ private CountDownLatch latch;
+ private BaseHttpConnection connection;
+ private boolean expectingInterrupt;
+
+ public Worker(CountDownLatch latch, BaseHttpConnection connection, boolean expectingInterrupt) {
+ this.latch = latch;
+ this.connection = connection;
+ this.expectingInterrupt = expectingInterrupt;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ try {
+ currentThread = Thread.currentThread();
+ connection.connect();
+ fail();
+ } catch(Throwable t) {
+ if (expectingInterrupt) {
+ //ClosedByInterruptException normally; InterruptedException if
+ // TezBodyDeferringAsyncHandler quits otherwise
+ assertTrue((t instanceof InterruptedException) || (t instanceof ClosedByInterruptException));
+ }
+ } finally {
+ latch.countDown();
+ if (connection != null) {
+ connection.cleanup(true);
+ }
+ }
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
index 4ef187d..34c2ca7 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/TestFetcher.java
@@ -70,7 +70,8 @@ public class TestFetcher {
final boolean DISABLE_LOCAL_FETCH = false;
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
+ PORT, false);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
@@ -88,7 +89,7 @@ public class TestFetcher {
// when enabled and hostname does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST,
- PORT);
+ PORT, false);
builder.assignWork(HOST + "_OTHER", PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -103,7 +104,7 @@ public class TestFetcher {
// when enabled and port does not match use http fetch.
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT);
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, ENABLE_LOCAL_FETCH, HOST, PORT, false);
builder.assignWork(HOST, PORT + 1, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -119,7 +120,8 @@ public class TestFetcher {
// When disabled use http fetch
conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false);
builder = new Fetcher.FetcherBuilder(fetcherCallback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST, PORT);
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, DISABLE_LOCAL_FETCH, HOST,
+ PORT, false);
builder.assignWork(HOST, PORT, 0, Arrays.asList(srcAttempts));
fetcher = spy(builder.build());
@@ -152,7 +154,7 @@ public class TestFetcher {
int partition = 42;
FetcherCallback callback = mock(FetcherCallback.class);
Fetcher.FetcherBuilder builder = new Fetcher.FetcherBuilder(callback, null, null,
- ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT);
+ ApplicationId.newInstance(0, 1), null, "fetcherTest", conf, true, HOST, PORT, false);
builder.assignWork(HOST, PORT, partition, Arrays.asList(srcAttempts));
Fetcher fetcher = spy(builder.build());
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
index f77e9a6..385b7b0 100644
--- a/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
+++ b/tez-runtime-library/src/test/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/TestFetcher.java
@@ -18,6 +18,7 @@
package org.apache.tez.runtime.library.common.shuffle.orderedgrouped;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
@@ -44,6 +45,8 @@ import java.util.Arrays;
import java.util.List;
import com.google.common.collect.Lists;
+import org.apache.tez.http.HttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.runtime.library.common.InputIdentifier;
import org.slf4j.Logger;
@@ -59,7 +62,6 @@ import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.junit.Assert;
import org.junit.Test;
@@ -118,7 +120,7 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
fetcher.call();
verify(scheduler).getMapsForHost(mapHost);
@@ -146,7 +148,7 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
// when local mode is enabled and host and port matches use local fetch
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -163,7 +165,7 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -178,7 +180,7 @@ public class TestFetcher {
new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, ENABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -192,7 +194,7 @@ public class TestFetcher {
fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, DISABLE_LOCAL_FETCH, HOST, PORT, "src vertex", mapHost, ioErrsCounter,
wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
spyFetcher = spy(fetcher);
doNothing().when(spyFetcher).setupLocalDiskFetch(mapHost);
@@ -217,7 +219,7 @@ public class TestFetcher {
"http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
FetcherOrderedGrouped fetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, true, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
FetcherOrderedGrouped spyFetcher = spy(fetcher);
@@ -342,6 +344,7 @@ public class TestFetcher {
}
@Test(timeout = 5000)
+ @SuppressWarnings("unchecked")
public void testWithRetry() throws Exception {
Configuration conf = new TezConfiguration();
conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3000);
@@ -355,13 +358,12 @@ public class TestFetcher {
when(inputContext.getCounters()).thenReturn(new TezCounters());
when(inputContext.getSourceVertexName()).thenReturn("");
- HttpConnection.HttpConnectionParams httpConnectionParams =
- ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+ HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
final MapHost host = new MapHost(1, HOST + ":" + PORT,
"http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
FetcherOrderedGrouped mockFetcher = new FetcherOrderedGrouped(null, scheduler, merger, metrics, shuffle, null, false, 0,
null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter, wrongLengthErrsCounter, badIdErrsCounter,
- wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter);
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, false);
final FetcherOrderedGrouped fetcher = spy(mockFetcher);
@@ -426,4 +428,53 @@ public class TestFetcher {
}
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testAsyncWithException() throws Exception {
+ Configuration conf = new TezConfiguration();
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT, 3000);
+ conf.setInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT, 3000);
+
+ ShuffleScheduler scheduler = mock(ShuffleScheduler.class);
+ MergeManager merger = mock(MergeManager.class);
+ ShuffleClientMetrics metrics = mock(ShuffleClientMetrics.class);
+ Shuffle shuffle = mock(Shuffle.class);
+
+ TezCounters counters = new TezCounters();
+ InputContext inputContext = mock(InputContext.class);
+ when(inputContext.getCounters()).thenReturn(counters);
+ when(inputContext.getSourceVertexName()).thenReturn("");
+
+ JobTokenSecretManager jobMgr = mock(JobTokenSecretManager.class);
+ doReturn(new byte[10]).when(jobMgr).computeHash(any(byte[].class));
+
+ HttpConnectionParams httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
+ final MapHost host = new MapHost(1, HOST + ":" + PORT,
+ "http://" + HOST + ":" + PORT + "/mapOutput?job=job_123&&reduce=1&map=");
+ FetcherOrderedGrouped mockFetcher =
+ new FetcherOrderedGrouped(httpConnectionParams, scheduler, merger, metrics, shuffle, jobMgr,
+ false, 0,
+ null, conf, false, HOST, PORT, "src vertex", host, ioErrsCounter,
+ wrongLengthErrsCounter, badIdErrsCounter,
+ wrongMapErrsCounter, connectionErrsCounter, wrongReduceErrsCounter, true);
+ final FetcherOrderedGrouped fetcher = spy(mockFetcher);
+ fetcher.remaining = Lists.newLinkedList();
+
+ final List<InputAttemptIdentifier> srcAttempts = Arrays.asList(
+ new InputAttemptIdentifier(0, 1, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_0"),
+ new InputAttemptIdentifier(1, 2, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_1"),
+ new InputAttemptIdentifier(3, 4, InputAttemptIdentifier.PATH_PREFIX + "pathComponent_3")
+ );
+ doReturn(srcAttempts).when(scheduler).getMapsForHost(host);
+
+ try {
+ long currentIOErrors = ioErrsCounter.getValue();
+ boolean connected = fetcher.setupConnection(host, srcAttempts);
+ Assert.assertTrue(connected == false);
+ //Ensure that counters are incremented (i.e it followed the exception codepath)
+ Assert.assertTrue(ioErrsCounter.getValue() > currentIOErrors);
+ } catch (IOException e) {
+ fail();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
index 2a63293..25c149d 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestPipelinedShuffle.java
@@ -94,6 +94,7 @@ public class TestPipelinedShuffle {
}
}
+ //TODO: Add support for async http clients
@Before
public void setupTezCluster() throws Exception {
//With 1 MB sort buffer and with good amount of dataset, it would spill records
@@ -126,8 +127,18 @@ public class TestPipelinedShuffle {
@Test
public void baseTest() throws Exception {
+ Configuration conf = new Configuration(miniTezCluster.getConfig());
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
+ test(conf);
+
+ conf = new Configuration(miniTezCluster.getConfig());
+ conf.setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, true);
+ test(conf);
+ }
+
+ private void test(Configuration conf) throws Exception {
PipelinedShuffleJob pipelinedShuffle = new PipelinedShuffleJob();
- pipelinedShuffle.setConf(new Configuration(miniTezCluster.getConfig()));
+ pipelinedShuffle.setConf(conf);
String[] args = new String[] { };
assertEquals(0, pipelinedShuffle.run(args));
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
----------------------------------------------------------------------
diff --git a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
index 8da3f08..e3e42d3 100644
--- a/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
+++ b/tez-tests/src/test/java/org/apache/tez/test/TestSecureShuffle.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.mapreduce.examples.TestOrderedWordCount;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.junit.After;
@@ -65,22 +66,30 @@ public class TestSecureShuffle {
private boolean enableSSLInCluster; //To set ssl config in cluster
private int resultWithTezSSL; //expected result with tez ssl setting
private int resultWithoutTezSSL; //expected result without tez ssl setting
+ private boolean asyncHttp;
- public TestSecureShuffle(boolean sslInCluster, int resultWithTezSSL, int resultWithoutTezSSL) {
+ public TestSecureShuffle(boolean sslInCluster, int resultWithTezSSL, int resultWithoutTezSSL,
+ boolean asyncHttp) {
this.enableSSLInCluster = sslInCluster;
this.resultWithTezSSL = resultWithTezSSL;
this.resultWithoutTezSSL = resultWithoutTezSSL;
+ this.asyncHttp = asyncHttp;
}
- @Parameterized.Parameters(name = "test[sslInCluster:{0}, resultWithTezSSL:{1}, resultWithoutTezSSL:{2}]")
+ @Parameterized.Parameters(name = "test[sslInCluster:{0}, resultWithTezSSL:{1}, "
+ + "resultWithoutTezSSL:{2}, asyncHttp:{3}]")
public static Collection<Object[]> getParameters() {
Collection<Object[]> parameters = new ArrayList<Object[]>();
//enable ssl in cluster, succeed with tez-ssl enabled, fail with tez-ssl disabled
- parameters.add(new Object[] { true, 0, 1 });
+ parameters.add(new Object[] { true, 0, 1, false });
+
+ //With asyncHttp
+ parameters.add(new Object[] { true, 0, 1, true });
+ parameters.add(new Object[] { false, 1, 0, true });
//Negative testcase
- // disable ssl in cluster, fail with tez-ssl enabled, succeed with tez-ssl disabled
- parameters.add(new Object[] { false, 1, 0 });
+ //disable ssl in cluster, fail with tez-ssl enabled, succeed with tez-ssl disabled
+ parameters.add(new Object[] { false, 1, 0, false });
return parameters;
}
@@ -151,7 +160,7 @@ public class TestSecureShuffle {
*
* @throws Exception
*/
- @Test(timeout = 240000)
+ @Test(timeout = 500000)
public void testSecureShuffle() throws Exception {
//With tez-ssl setting
miniTezCluster.getConfig().setBoolean(
[2/2] tez git commit: TEZ-2450. support async http clients in ordered
& unordered inputs (rbalamohan)
Posted by rb...@apache.org.
TEZ-2450. support async http clients in ordered & unordered inputs (rbalamohan)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9dabf947
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9dabf947
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9dabf947
Branch: refs/heads/master
Commit: 9dabf94767480750f31d8f3e24d17a89bc036331
Parents: 7be325e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Wed May 27 05:32:08 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Wed May 27 05:32:08 2015 +0530
----------------------------------------------------------------------
CHANGES.txt | 1 +
pom.xml | 5 +
tez-runtime-library/findbugs-exclude.xml | 12 +
tez-runtime-library/pom.xml | 4 +
.../org/apache/tez/http/BaseHttpConnection.java | 63 +++
.../org/apache/tez/http/HttpConnection.java | 318 ++++++++++++++
.../apache/tez/http/HttpConnectionParams.java | 82 ++++
.../java/org/apache/tez/http/SSLFactory.java | 238 +++++++++++
.../http/async/netty/AsyncHttpConnection.java | 231 ++++++++++
.../netty/TezBodyDeferringAsyncHandler.java | 256 +++++++++++
.../library/api/TezRuntimeConfiguration.java | 4 +
.../runtime/library/common/shuffle/Fetcher.java | 34 +-
.../library/common/shuffle/HttpConnection.java | 428 -------------------
.../library/common/shuffle/ShuffleUtils.java | 117 +++--
.../common/shuffle/impl/ShuffleManager.java | 12 +-
.../orderedgrouped/FetcherOrderedGrouped.java | 23 +-
.../orderedgrouped/ShuffleScheduler.java | 11 +-
.../library/input/OrderedGroupedKVInput.java | 1 +
.../runtime/library/input/UnorderedKVInput.java | 1 +
.../org/apache/tez/http/TestHttpConnection.java | 202 +++++++++
.../library/common/shuffle/TestFetcher.java | 12 +-
.../shuffle/orderedgrouped/TestFetcher.java | 71 ++-
.../apache/tez/test/TestPipelinedShuffle.java | 13 +-
.../org/apache/tez/test/TestSecureShuffle.java | 21 +-
24 files changed, 1636 insertions(+), 524 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index a14e9da..5f5dd48 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -8,6 +8,7 @@ INCOMPATIBLE CHANGES
TEZ-2468. Change the minimum Java version to Java 7.
ALL CHANGES:
+ TEZ-2450. support async http clients in ordered & unordered inputs.
TEZ-2454. Change FetcherOrderedGroup to work as Callables instead of blocking threads.
TEZ-2466. tez-history-parser breaks hadoop 2.2 compatability.
TEZ-2463. Update site for 0.7.0 release
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 44592fa..2922cab 100644
--- a/pom.xml
+++ b/pom.xml
@@ -182,6 +182,11 @@
<version>1.7.5</version>
</dependency>
<dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>async-http-client</artifactId>
+ <version>1.8.16</version>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.5</version>
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/findbugs-exclude.xml b/tez-runtime-library/findbugs-exclude.xml
index 489e243..919e1e3 100644
--- a/tez-runtime-library/findbugs-exclude.xml
+++ b/tez-runtime-library/findbugs-exclude.xml
@@ -73,6 +73,18 @@
<Bug pattern="UWF_UNWRITTEN_PUBLIC_OR_PROTECTED_FIELD"/>
</Match>
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.FetcherOrderedGrouped"/>
+ <Method name="setupConnection" params="org.apache.tez.runtime.library.common.shuffle.orderedgrouped.MapHost, java.util.List" returns="boolean"/>
+ <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+ </Match>
+
+ <Match>
+ <Class name="org.apache.tez.runtime.library.common.shuffle.Fetcher"/>
+ <Method name="setupConnection" params="java.util.List" returns="org.apache.tez.runtime.library.common.shuffle.Fetcher$HostFetchResult"/>
+ <Bug pattern="BC_VACUOUS_INSTANCEOF"/>
+ </Match>
+
<!-- TODO This needs more looking into -->
<Match>
<Class name="org.apache.tez.runtime.library.common.sort.impl.dflt.DefaultSorter"/>
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/pom.xml
----------------------------------------------------------------------
diff --git a/tez-runtime-library/pom.xml b/tez-runtime-library/pom.xml
index 03e0ec3..4433a02 100644
--- a/tez-runtime-library/pom.xml
+++ b/tez-runtime-library/pom.xml
@@ -26,6 +26,10 @@
<dependencies>
<dependency>
+ <groupId>com.ning</groupId>
+ <artifactId>async-http-client</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.tez</groupId>
<artifactId>tez-api</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
new file mode 100644
index 0000000..dd642ae
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/BaseHttpConnection.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+
+public abstract class BaseHttpConnection {
+ /**
+ * Basic/unit connection timeout (in milliseconds)
+ */
+ protected final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
+
+ /**
+ * Connect to url
+ *
+ * @return boolean
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract boolean connect() throws IOException, InterruptedException;
+
+ /**
+ * Validate established connection
+ *
+ * @throws IOException
+ */
+ public abstract void validate() throws IOException;
+
+ /**
+ * Get inputstream
+ *
+ * @return DataInputStream
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ public abstract DataInputStream getInputStream() throws IOException, InterruptedException;
+
+ /**
+ * Clean up connection
+ *
+ * @param disconnect
+ * @throws IOException
+ */
+ public abstract void cleanup(boolean disconnect) throws IOException;
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
new file mode 100644
index 0000000..4732354
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnection.java
@@ -0,0 +1,318 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.concurrent.TimeUnit;
+
+public class HttpConnection extends BaseHttpConnection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
+
+ private URL url;
+ private final String logIdentifier;
+
+ @VisibleForTesting
+ protected volatile HttpURLConnection connection;
+ private volatile DataInputStream input;
+ private volatile boolean connectionSucceeed;
+ private volatile boolean cleanup;
+
+ private final JobTokenSecretManager jobTokenSecretMgr;
+ private String encHash;
+ private String msgToEncode;
+
+ private final HttpConnectionParams httpConnParams;
+ private final Stopwatch stopWatch;
+
+ /**
+ * HttpConnection
+ *
+ * @param url
+ * @param connParams
+ * @param logIdentifier
+ * @param jobTokenSecretManager
+ * @throws IOException
+ */
+ public HttpConnection(URL url, HttpConnectionParams connParams,
+ String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
+ this.logIdentifier = logIdentifier;
+ this.jobTokenSecretMgr = jobTokenSecretManager;
+ this.httpConnParams = connParams;
+ this.url = url;
+ this.stopWatch = new Stopwatch();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MapOutput URL :" + url.toString());
+ }
+ }
+
+ @VisibleForTesting
+ public void computeEncHash() throws IOException {
+ // generate hash of the url
+ msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+ encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
+ }
+
+ private void setupConnection() throws IOException {
+ connection = (HttpURLConnection) url.openConnection();
+ if (httpConnParams.isSslShuffle()) {
+ //Configure for SSL
+ SSLFactory sslFactory = httpConnParams.getSslFactory();
+ Preconditions.checkArgument(sslFactory != null, "SSLFactory can not be null");
+ sslFactory.configure(connection);
+ }
+
+ computeEncHash();
+
+ // put url hash into http header
+ connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ // set the read timeout
+ connection.setReadTimeout(httpConnParams.getReadTimeout());
+ // put shuffle version into http header
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ }
+
+ /**
+ * Connect to source
+ *
+ * @return true if connection was successful
+ * false if connection was previously cleaned up
+ * @throws IOException upon connection failure
+ */
+ @Override
+ public boolean connect() throws IOException {
+ return connect(httpConnParams.getConnectionTimeout());
+ }
+
+ /**
+ * Connect to source with specific timeout
+ *
+ * @param connectionTimeout
+ * @return true if connection was successful
+ * false if connection was previously cleaned up
+ * @throws IOException upon connection failure
+ */
+ private boolean connect(int connectionTimeout) throws IOException {
+ stopWatch.reset().start();
+ if (connection == null) {
+ setupConnection();
+ }
+ int unit = 0;
+ if (connectionTimeout < 0) {
+ throw new IOException("Invalid timeout " + "[timeout = " + connectionTimeout + " ms]");
+ } else if (connectionTimeout > 0) {
+ unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
+ }
+ // set the connect timeout to the unit-connect-timeout
+ connection.setConnectTimeout(unit);
+ int connectionFailures = 0;
+ while (true) {
+ long connectStartTime = System.currentTimeMillis();
+ try {
+ connection.connect();
+ connectionSucceeed = true;
+ break;
+ } catch (IOException ioe) {
+ // Don't attempt another connect if already cleanedup.
+ connectionFailures++;
+ if (cleanup) {
+ LOG.info("Cleanup is set to true. Not attempting to"
+ + " connect again. Last exception was: ["
+ + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
+ return false;
+ }
+ // update the total remaining connect-timeout
+ connectionTimeout -= unit;
+ // throw an exception if we have waited for timeout amount of time
+ // note that the updated value if timeout is used here
+ if (connectionTimeout <= 0) {
+ throw new IOException(
+ "Failed to connect to " + url + ", #connectionFailures=" + connectionFailures, ioe);
+ }
+ long elapsed = System.currentTimeMillis() - connectStartTime;
+ if (elapsed < unit) {
+ try {
+ long sleepTime = unit - elapsed;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sleeping for " + sleepTime + " while establishing connection to " + url +
+ ", since connectAttempt returned in " + elapsed + " ms");
+ }
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw new IOException(
+ "Connection establishment sleep interrupted, #connectionFailures=" +
+ connectionFailures, e);
+ }
+ }
+
+ // reset the connect timeout for the last try
+ if (connectionTimeout < unit) {
+ unit = connectionTimeout;
+ // reset the connect time out for the final connect
+ connection.setConnectTimeout(unit);
+ }
+
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to connect to " + url.toString() +
+ " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="
+ + connectionFailures);
+ }
+ return true;
+ }
+
+ @Override
+ public void validate() throws IOException {
+ stopWatch.reset().start();
+ int rc = connection.getResponseCode();
+ if (rc != HttpURLConnection.HTTP_OK) {
+ throw new IOException("Got invalid response code " + rc + " from " + url
+ + ": " + connection.getResponseMessage());
+ }
+
+ // get the shuffle version
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
+ .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
+ .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ throw new IOException("Incompatible shuffle response version");
+ }
+
+ // get the replyHash which is HMac of the encHash we sent to the server
+ String replyHash =
+ connection
+ .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+ if (replyHash == null) {
+ throw new IOException("security validation of TT Map output failed");
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
+ + replyHash);
+ }
+
+ // verify that replyHash is HMac of encHash
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+ //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
+ LOG.info("for url=" + url +
+ " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ }
+
+ /**
+ * Get the inputstream from the connection
+ *
+ * @return DataInputStream
+ * @throws IOException
+ */
+ @Override
+ public DataInputStream getInputStream() throws IOException {
+ stopWatch.reset().start();
+ if (connectionSucceeed) {
+ input = new DataInputStream(new BufferedInputStream(
+ connection.getInputStream(), httpConnParams.getBufferSize()));
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to getInputStream (connect) " + url +
+ " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ }
+ return input;
+ }
+
+ /**
+ * Cleanup the connection.
+ *
+ * @param disconnect Close the connection if this is true; otherwise respect keepalive
+ * @throws IOException
+ */
+ @Override
+ public void cleanup(boolean disconnect) throws IOException {
+ cleanup = true;
+ stopWatch.reset().start();
+ try {
+ if (input != null) {
+ LOG.info("Closing input on " + logIdentifier);
+ input.close();
+ input = null;
+ }
+ if (httpConnParams.isKeepAlive() && connectionSucceeed) {
+ // Refer:
+ // http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
+ readErrorStream(connection.getErrorStream());
+ }
+ if (connection != null && (disconnect || !httpConnParams.isKeepAlive())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Closing connection on " + logIdentifier);
+ }
+ connection.disconnect();
+ connection = null;
+ }
+ } catch (IOException e) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
+ } else {
+ LOG.info("Exception while shutting down fetcher " + logIdentifier
+ + ": " + e.getMessage());
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Time taken to cleanup connection to " + url +
+ " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
+ }
+ }
+
+ /**
+ * Cleanup the error stream if any, for keepAlive connections
+ *
+ * @param errorStream
+ */
+ private void readErrorStream(InputStream errorStream) {
+ if (errorStream == null) {
+ return;
+ }
+ try {
+ DataOutputBuffer errorBuffer = new DataOutputBuffer();
+ IOUtils.copyBytes(errorStream, errorBuffer, 4096);
+ IOUtils.closeStream(errorBuffer);
+ IOUtils.closeStream(errorStream);
+ } catch (IOException ioe) {
+ // ignore
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
new file mode 100644
index 0000000..aac4bb3
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/HttpConnectionParams.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+public class HttpConnectionParams {
+ private final boolean keepAlive;
+ private final int keepAliveMaxConnections;
+ private final int connectionTimeout;
+ private final int readTimeout;
+ private final int bufferSize;
+
+ private final boolean sslShuffle;
+ private final SSLFactory sslFactory;
+
+ public HttpConnectionParams(boolean keepAlive, int keepAliveMaxConnections, int
+ connectionTimeout, int readTimeout, int bufferSize, boolean sslShuffle, SSLFactory
+ sslFactory) {
+ this.keepAlive = keepAlive;
+ this.keepAliveMaxConnections = keepAliveMaxConnections;
+ this.connectionTimeout = connectionTimeout;
+ this.readTimeout = readTimeout;
+ this.bufferSize = bufferSize;
+ this.sslShuffle = sslShuffle;
+ this.sslFactory = sslFactory;
+ }
+
+ public int getBufferSize() {
+ return bufferSize;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public boolean isKeepAlive() {
+ return keepAlive;
+ }
+
+ public int getKeepAliveMaxConnections() {
+ return keepAliveMaxConnections;
+ }
+
+ public int getReadTimeout() {
+ return readTimeout;
+ }
+
+ public boolean isSslShuffle() {
+ return sslShuffle;
+ }
+
+ public SSLFactory getSslFactory() {
+ return sslFactory;
+ }
+
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("keepAlive=").append(keepAlive).append(", ");
+ sb.append("keepAliveMaxConnections=").append(keepAliveMaxConnections).append(", ");
+ sb.append("connectionTimeout=").append(connectionTimeout).append(", ");
+ sb.append("readTimeout=").append(readTimeout).append(", ");
+ sb.append("bufferSize=").append(bufferSize).append(", ");
+ sb.append("bufferSize=").append(bufferSize);
+ return sb.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java b/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
new file mode 100644
index 0000000..f23739b
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/SSLFactory.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http;
+
+import com.ning.http.client.AsyncHttpClientConfig;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.FileBasedKeyStoresFactory;
+import org.apache.hadoop.security.ssl.KeyStoresFactory;
+import org.apache.hadoop.security.ssl.SSLFactory.Mode;
+import org.apache.hadoop.security.ssl.SSLHostnameVerifier;
+import org.apache.hadoop.util.ReflectionUtils;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.security.GeneralSecurityException;
+
+import static org.apache.hadoop.security.ssl.SSLFactory.DEFAULT_SSL_ENABLED_PROTOCOLS;
+import static org.apache.hadoop.security.ssl.SSLFactory.DEFAULT_SSL_REQUIRE_CLIENT_CERT;
+import static org.apache.hadoop.security.ssl.SSLFactory.KEYSTORES_FACTORY_CLASS_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_CLIENT_CONF_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_ENABLED_PROTOCOLS;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_HOSTNAME_VERIFIER_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY;
+import static org.apache.hadoop.security.ssl.SSLFactory.SSL_SERVER_CONF_KEY;
+
+/**
+ * Factory that creates SSLEngine and SSLSocketFactory instances using
+ * Hadoop configuration information.
+ * <p/>
+ * This SSLFactory uses a {@link org.apache.hadoop.security.ssl.ReloadingX509TrustManager} instance,
+ * which reloads public keys if the truststore file changes.
+ * <p/>
+ * This factory is used to configure HTTPS in Hadoop HTTP based endpoints, both
+ * client and server.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SSLFactory implements ConnectionConfigurator {
+
+ private Configuration conf;
+ private Mode mode;
+ private boolean requireClientCert;
+ private SSLContext context;
+ private HostnameVerifier hostnameVerifier;
+ private KeyStoresFactory keystoresFactory;
+
+ private String[] enabledProtocols = null;
+
+ /**
+ * Creates an SSLFactory.
+ *
+ * @param mode SSLFactory mode, client or server.
+ * @param conf Hadoop configuration from where the SSLFactory configuration
+ * will be read.
+ */
+ public SSLFactory(Mode mode, Configuration conf) {
+ this.conf = conf;
+ if (mode == null) {
+ throw new IllegalArgumentException("mode cannot be NULL");
+ }
+ this.mode = mode;
+ requireClientCert = conf.getBoolean(SSL_REQUIRE_CLIENT_CERT_KEY,
+ DEFAULT_SSL_REQUIRE_CLIENT_CERT);
+ Configuration sslConf = readSSLConfiguration(mode);
+
+ Class<? extends KeyStoresFactory> klass
+ = conf.getClass(KEYSTORES_FACTORY_CLASS_KEY,
+ FileBasedKeyStoresFactory.class, KeyStoresFactory.class);
+ keystoresFactory = ReflectionUtils.newInstance(klass, sslConf);
+
+ enabledProtocols = conf.getStrings(SSL_ENABLED_PROTOCOLS, DEFAULT_SSL_ENABLED_PROTOCOLS);
+ }
+
+ private Configuration readSSLConfiguration(Mode mode) {
+ Configuration sslConf = new Configuration(false);
+ sslConf.setBoolean(SSL_REQUIRE_CLIENT_CERT_KEY, requireClientCert);
+ String sslConfResource;
+ if (mode == Mode.CLIENT) {
+ sslConfResource = conf.get(SSL_CLIENT_CONF_KEY, "ssl-client.xml");
+ } else {
+ sslConfResource = conf.get(SSL_SERVER_CONF_KEY, "ssl-server.xml");
+ }
+ sslConf.addResource(sslConfResource);
+ return sslConf;
+ }
+
+ /**
+ * Initializes the factory.
+ *
+ * @throws GeneralSecurityException thrown if an SSL initialization error
+ * happened.
+ * @throws IOException thrown if an IO error happened while reading the SSL
+ * configuration.
+ */
+ public void init() throws GeneralSecurityException, IOException {
+ keystoresFactory.init(mode);
+ context = SSLContext.getInstance("TLS");
+ context.init(keystoresFactory.getKeyManagers(),
+ keystoresFactory.getTrustManagers(), null);
+ context.getDefaultSSLParameters().setProtocols(enabledProtocols);
+ hostnameVerifier = getHostnameVerifier(conf);
+ }
+
+ private HostnameVerifier getHostnameVerifier(Configuration conf)
+ throws GeneralSecurityException, IOException {
+ return getHostnameVerifier(conf.get(SSL_HOSTNAME_VERIFIER_KEY, "DEFAULT").
+ trim().toUpperCase());
+ }
+
+ public static HostnameVerifier getHostnameVerifier(String verifier)
+ throws GeneralSecurityException, IOException {
+ HostnameVerifier hostnameVerifier;
+ if (verifier.equals("DEFAULT")) {
+ hostnameVerifier = SSLHostnameVerifier.DEFAULT;
+ } else if (verifier.equals("DEFAULT_AND_LOCALHOST")) {
+ hostnameVerifier = SSLHostnameVerifier.DEFAULT_AND_LOCALHOST;
+ } else if (verifier.equals("STRICT")) {
+ hostnameVerifier = SSLHostnameVerifier.STRICT;
+ } else if (verifier.equals("STRICT_IE6")) {
+ hostnameVerifier = SSLHostnameVerifier.STRICT_IE6;
+ } else if (verifier.equals("ALLOW_ALL")) {
+ hostnameVerifier = SSLHostnameVerifier.ALLOW_ALL;
+ } else {
+ throw new GeneralSecurityException("Invalid hostname verifier: " +
+ verifier);
+ }
+ return hostnameVerifier;
+ }
+
+ /**
+ * Releases any resources being used.
+ */
+ public void destroy() {
+ keystoresFactory.destroy();
+ }
+
+ /**
+ * Returns the SSLFactory KeyStoresFactory instance.
+ *
+ * @return the SSLFactory KeyStoresFactory instance.
+ */
+ public KeyStoresFactory getKeystoresFactory() {
+ return keystoresFactory;
+ }
+
+
+ /**
+ * Returns a configured SSLSocketFactory.
+ *
+ * @return the configured SSLSocketFactory.
+ * @throws GeneralSecurityException thrown if the SSLSocketFactory could not
+ * be initialized.
+ * @throws IOException thrown if and IO error occurred while loading
+ * the server keystore.
+ */
+ public SSLSocketFactory createSSLSocketFactory() throws GeneralSecurityException, IOException {
+ if (mode != Mode.CLIENT) {
+ throw new IllegalStateException("Factory is in CLIENT mode");
+ }
+ return context.getSocketFactory();
+ }
+
+ /**
+ * Returns the hostname verifier it should be used in HttpsURLConnections.
+ *
+ * @return the hostname verifier.
+ */
+ public HostnameVerifier getHostnameVerifier() {
+ if (mode != Mode.CLIENT) {
+ throw new IllegalStateException("Factory is in CLIENT mode");
+ }
+ return hostnameVerifier;
+ }
+
+
+
+ /**
+ * If the given {@link HttpURLConnection} is an {@link HttpsURLConnection}
+ * configures the connection with the {@link SSLSocketFactory} and
+ * {@link HostnameVerifier} of this SSLFactory, otherwise does nothing.
+ *
+ * @param conn the {@link HttpURLConnection} instance to configure.
+ * @return the configured {@link HttpURLConnection} instance.
+ * @throws IOException if an IO error occurred.
+ */
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn) throws IOException {
+ if (conn instanceof HttpsURLConnection) {
+ HttpsURLConnection sslConn = (HttpsURLConnection) conn;
+ try {
+ sslConn.setSSLSocketFactory(createSSLSocketFactory());
+ } catch (GeneralSecurityException ex) {
+ throw new IOException(ex);
+ }
+ sslConn.setHostnameVerifier(getHostnameVerifier());
+ conn = sslConn;
+ }
+ return conn;
+ }
+
+ /**
+ * Set ssl context for {@link com.ning.http.client.AsyncHttpClientConfig.Builder}
+ *
+ * @param asyncNingBuilder {@link com.ning.http.client.AsyncHttpClientConfig.Builder} instance to
+ * configure.
+ * @throws IOException if an IO error occurred.
+ */
+ public void configure(AsyncHttpClientConfig.Builder asyncNingBuilder) throws IOException {
+ if (asyncNingBuilder != null) {
+ asyncNingBuilder.setSSLContext(context);
+ asyncNingBuilder.setHostnameVerifier(getHostnameVerifier());
+ }
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
new file mode 100644
index 0000000..f46939d
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/AsyncHttpConnection.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.http.async.netty;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
+import com.ning.http.client.AsyncHttpClient;
+import com.ning.http.client.AsyncHttpClientConfig;
+import com.ning.http.client.ListenableFuture;
+import com.ning.http.client.Request;
+import com.ning.http.client.RequestBuilder;
+import com.ning.http.client.Response;
+import org.apache.commons.io.IOUtils;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.http.SSLFactory;
+import org.apache.tez.common.security.JobTokenSecretManager;
+import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
+import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
+import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.io.PipedInputStream;
+import java.io.PipedOutputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+public class AsyncHttpConnection extends BaseHttpConnection {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AsyncHttpConnection.class);
+
+ private final JobTokenSecretManager jobTokenSecretMgr;
+ private String encHash;
+ private String msgToEncode;
+
+ private final HttpConnectionParams httpConnParams;
+ private final Stopwatch stopWatch;
+ private final URL url;
+
+ private static volatile AsyncHttpClient httpAsyncClient;
+
+ private final TezBodyDeferringAsyncHandler handler;
+ private final PipedOutputStream pos; //handler would write to this as and when it receives chunks
+ private final PipedInputStream pis; //connected to pos, which can be used by fetchers
+
+ private Response response;
+ private ListenableFuture<Response> responseFuture;
+ private TezBodyDeferringAsyncHandler.BodyDeferringInputStream dis;
+
+ private void initClient(HttpConnectionParams httpConnParams) throws IOException {
+ if (httpAsyncClient != null) {
+ return;
+ }
+
+ if (httpAsyncClient == null) {
+ synchronized (AsyncHttpConnection.class) {
+ if (httpAsyncClient == null) {
+ LOG.info("Initializing AsyncClient (TezBodyDeferringAsyncHandler)");
+ AsyncHttpClientConfig.Builder builder = new AsyncHttpClientConfig.Builder();
+ if (httpConnParams.isSslShuffle()) {
+ //Configure SSL
+ SSLFactory sslFactory = httpConnParams.getSslFactory();
+ Preconditions.checkArgument(sslFactory != null, "SSLFactory can not be null");
+ sslFactory.configure(builder);
+ }
+
+ /**
+ * TODO : following settings need fine tuning.
+ * Change following config to accept common thread pool later.
+ * Change max connections based on the total inputs (ordered & unordered). Need to tune
+ * setMaxConnections & addRequestFilter.
+ */
+ builder
+ .setAllowPoolingConnection(httpConnParams.isKeepAlive())
+ .setAllowSslConnectionPool(httpConnParams.isKeepAlive())
+ .setCompressionEnabled(false)
+ //.setExecutorService(applicationThreadPool)
+ //.addRequestFilter(new ThrottleRequestFilter())
+ .setMaximumConnectionsPerHost(1)
+ .setConnectionTimeoutInMs(httpConnParams.getConnectionTimeout())
+ .setRequestTimeoutInMs(httpConnParams.getReadTimeout())
+ .setUseRawUrl(true)
+ .build();
+ httpAsyncClient = new AsyncHttpClient(builder.build());
+ }
+ }
+ }
+ }
+
+ public AsyncHttpConnection(URL url, HttpConnectionParams connParams,
+ String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
+ this.jobTokenSecretMgr = jobTokenSecretManager;
+ this.httpConnParams = connParams;
+ this.url = url;
+ this.stopWatch = new Stopwatch();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("MapOutput URL :" + url.toString());
+ }
+
+ initClient(httpConnParams);
+ pos = new PipedOutputStream();
+ pis = new PipedInputStream(pos, httpConnParams.getBufferSize());
+ handler = new TezBodyDeferringAsyncHandler(pos, url, UNIT_CONNECT_TIMEOUT);
+ }
+
+ @VisibleForTesting
+ public void computeEncHash() throws IOException {
+ // generate hash of the url
+ msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+ encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
+ }
+
+ /**
+ * Connect to source
+ *
+ * @return true if connection was successful
+ * false if connection was previously cleaned up
+ * @throws IOException upon connection failure
+ */
+ public boolean connect() throws IOException, InterruptedException {
+ computeEncHash();
+
+ RequestBuilder rb = new RequestBuilder();
+ rb.setHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ rb.setHeader(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ rb.setHeader(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ Request request = rb.setUrl(url.toString()).build();
+
+ //for debugging
+ LOG.debug("Request url={}, encHash={}, id={}", url, encHash);
+
+ try {
+ //Blocks calling thread until it receives headers, but have the option to defer response body
+ responseFuture = httpAsyncClient.executeRequest(request, handler);
+
+ //BodyDeferringAsyncHandler would automatically manage producer and consumer frequency mismatch
+ dis = new TezBodyDeferringAsyncHandler.BodyDeferringInputStream(responseFuture, handler, pis);
+
+ response = dis.getAsapResponse();
+ if (response == null) {
+ throw new IOException("Response is null");
+ }
+ } catch(IOException e) {
+ throw e;
+ }
+
+ //verify the response
+ int rc = response.getStatusCode();
+ if (rc != HttpURLConnection.HTTP_OK) {
+ LOG.debug("Request url={}, id={}", response.getUri());
+ throw new IOException("Got invalid response code " + rc + " from "
+ + url + ": " + response.getStatusText());
+ }
+ return true;
+ }
+
+ public void validate() throws IOException {
+ // get the shuffle version
+ if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME
+ .equals(response.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+ || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION
+ .equals(response.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ throw new IOException("Incompatible shuffle response version");
+ }
+
+ // get the replyHash which is HMac of the encHash we sent to the server
+ String replyHash = response.getHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
+ if (replyHash == null) {
+ throw new IOException("security validation of TT Map output failed");
+ }
+ LOG.debug("url={};encHash={};replyHash={}", msgToEncode, encHash, replyHash);
+
+ // verify that replyHash is HMac of encHash
+ SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
+ //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
+ LOG.info("for url={} sent hash and receievd reply {} ms", url, stopWatch.elapsedMillis());
+ }
+
+ /**
+ * Get the inputstream from the connection
+ *
+ * @return DataInputStream
+ * @throws IOException
+ */
+ public DataInputStream getInputStream() throws IOException, InterruptedException {
+ Preconditions.checkState(response != null, "Response can not be null");
+ return new DataInputStream(dis);
+ }
+
+ @VisibleForTesting
+ public void close() {
+ httpAsyncClient.close();
+ httpAsyncClient = null;
+ }
+ /**
+ * Cleanup the connection.
+ *
+ * @param disconnect
+ * @throws IOException
+ */
+ public void cleanup(boolean disconnect) throws IOException {
+ // Netty internally has its own connection management and takes care of it.
+ if (response != null) {
+ dis.close();
+ }
+ IOUtils.closeQuietly(pos);
+ IOUtils.closeQuietly(pis);
+ response = null;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
new file mode 100644
index 0000000..8e83eac
--- /dev/null
+++ b/tez-runtime-library/src/main/java/org/apache/tez/http/async/netty/TezBodyDeferringAsyncHandler.java
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.tez.http.async.netty;
+
+import com.ning.http.client.AsyncHandler;
+import com.ning.http.client.HttpResponseBodyPart;
+import com.ning.http.client.HttpResponseHeaders;
+import com.ning.http.client.HttpResponseStatus;
+import com.ning.http.client.Response;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FilterInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URL;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Same as {@link com.ning.http.client.BodyDeferringAsyncHandler} with additional checks handle
+ * errors in getResponse(). Based on testing, at very high load {@link com.ning.http.client
+ * .BodyDeferringAsyncHandler} gets to hung state in getResponse() as it tries to wait
+ * indefinitely for headers to arrive. This class tries to fix the problem by waiting only for
+ * the connection timeout.
+ */
+@InterfaceAudience.Private
+class TezBodyDeferringAsyncHandler implements AsyncHandler<Response> {
+ private static final Logger LOG = LoggerFactory.getLogger(TezBodyDeferringAsyncHandler.class);
+
+ private final Response.ResponseBuilder responseBuilder = new Response.ResponseBuilder();
+ private final CountDownLatch headersArrived = new CountDownLatch(1);
+ private final OutputStream output;
+
+ private volatile boolean responseSet;
+ private volatile boolean statusReceived;
+ private volatile Response response;
+ private volatile Throwable throwable;
+
+ private final Semaphore semaphore = new Semaphore(1);
+
+ private final URL url;
+ private final int headerReceiveTimeout;
+
+ TezBodyDeferringAsyncHandler(final OutputStream os, final URL url, final int timeout) {
+ this.output = os;
+ this.responseSet = false;
+ this.url = url;
+ this.headerReceiveTimeout = timeout;
+ }
+
+ public void onThrowable(Throwable t) {
+ this.throwable = t;
+ // Counting down to handle error cases too.
+ // In "premature exceptions" cases, the onBodyPartReceived() and
+ // onCompleted()
+ // methods will never be invoked, leaving caller of getResponse() method
+ // blocked forever.
+ try {
+ semaphore.acquire();
+ } catch (InterruptedException e) {
+ // Ignore
+ } finally {
+ LOG.error("Error in asyncHandler ", t);
+ headersArrived.countDown();
+ semaphore.release();
+ }
+ try {
+ closeOut();
+ } catch (IOException e) {
+ // ignore
+ }
+ }
+
+ public AsyncHandler.STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
+ responseBuilder.reset();
+ responseBuilder.accumulate(responseStatus);
+ statusReceived = true;
+ return AsyncHandler.STATE.CONTINUE;
+ }
+
+ public AsyncHandler.STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {
+ responseBuilder.accumulate(headers);
+ return AsyncHandler.STATE.CONTINUE;
+ }
+
+ public AsyncHandler.STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {
+ // body arrived, flush headers
+ if (!responseSet) {
+ response = responseBuilder.build();
+ responseSet = true;
+ headersArrived.countDown();
+ }
+ bodyPart.writeTo(output);
+ return AsyncHandler.STATE.CONTINUE;
+ }
+
+ protected void closeOut() throws IOException {
+ try {
+ output.flush();
+ } finally {
+ output.close();
+ }
+ }
+
+ public Response onCompleted() throws IOException {
+ if (!responseSet) {
+ response = responseBuilder.build();
+ responseSet = true;
+ }
+ // Counting down to handle error cases too.
+ // In "normal" cases, latch is already at 0 here
+ // But in other cases, for example when because of some error
+ // onBodyPartReceived() is never called, the caller
+ // of getResponse() would remain blocked infinitely.
+ // By contract, onCompleted() is always invoked, even in case of errors
+ headersArrived.countDown();
+ closeOut();
+ try {
+ semaphore.acquire();
+ if (throwable != null) {
+ IOException ioe = new IOException(throwable.getMessage());
+ ioe.initCause(throwable);
+ throw ioe;
+ } else {
+ // sending out current response
+ return responseBuilder.build();
+ }
+ } catch (InterruptedException e) {
+ return null;
+ } finally {
+ semaphore.release();
+ }
+ }
+
+ /**
+ * This method -- unlike Future<Reponse>.get() -- will block only as long,
+ * as headers arrive. This is useful for large transfers, to examine headers
+ * ASAP, and defer body streaming to it's fine destination and prevent
+ * unneeded bandwidth consumption. The response here will contain the very
+ * 1st response from server, so status code and headers, but it might be
+ * incomplete in case of broken servers sending trailing headers. In that
+ * case, the "usual" Future<Response>.get() method will return complete
+ * headers, but multiple invocations of getResponse() will always return the
+ * 1st cached, probably incomplete one. Note: the response returned by this
+ * method will contain everything <em>except</em> the response body itself,
+ * so invoking any method like Response.getResponseBodyXXX() will result in
+ * error! Also, please not that this method might return <code>null</code>
+ * in case of some errors.
+ *
+ * @return a {@link Response}
+ * @throws InterruptedException
+ */
+ public Response getResponse() throws InterruptedException, IOException {
+ /**
+ * Based on testing, it is possible that it is in connected state, but the headers are not
+ * received. Instead of waiting forever, close after timeout for next retry.
+ */
+ boolean result = headersArrived.await(headerReceiveTimeout, TimeUnit.MILLISECONDS);
+ if (!result) {
+ LOG.error("Breaking after timeout={}, url={}, responseSet={} statusReceived={}",
+ headerReceiveTimeout, url, responseSet, statusReceived);
+ return null;
+ }
+ try {
+ semaphore.acquire();
+ if (throwable != null) {
+ IOException ioe = new IOException(throwable.getMessage());
+ ioe.initCause(throwable);
+ throw ioe;
+ } else {
+ return response;
+ }
+ } finally {
+ semaphore.release();
+ }
+ }
+
+ /**
+ * A simple helper class that is used to perform automatic "join" for async
+ * download and the error checking of the Future of the request.
+ */
+ static class BodyDeferringInputStream extends FilterInputStream {
+ private final Future<Response> future;
+ private final TezBodyDeferringAsyncHandler bdah;
+
+ public BodyDeferringInputStream(final Future<Response> future,
+ final TezBodyDeferringAsyncHandler bdah, final InputStream in) {
+ super(in);
+ this.future = future;
+ this.bdah = bdah;
+ }
+
+ /**
+ * Closes the input stream, and "joins" (wait for complete execution
+ * together with potential exception thrown) of the async request.
+ */
+ public void close() throws IOException {
+ // close
+ super.close();
+ // "join" async request
+ try {
+ getLastResponse();
+ } catch (Exception e) {
+ IOException ioe = new IOException(e.getMessage());
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+
+ /**
+ * Delegates to {@link TezBodyDeferringAsyncHandler#getResponse()}. Will
+ * blocks as long as headers arrives only. Might return
+ * <code>null</code>. See
+ * {@link TezBodyDeferringAsyncHandler#getResponse()} method for details.
+ *
+ * @return a {@link Response}
+ * @throws InterruptedException
+ */
+ public Response getAsapResponse() throws InterruptedException, IOException {
+ return bdah.getResponse();
+ }
+
+ /**
+ * Delegates to <code>Future<Response>#get()</code> method. Will block
+ * as long as complete response arrives.
+ *
+ * @return a {@link Response}
+ * @throws InterruptedException
+ * @throws java.util.concurrent.ExecutionException
+ */
+ public Response getLastResponse() throws InterruptedException, ExecutionException {
+ return future.get();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
index 3d9a701..fc94347 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/api/TezRuntimeConfiguration.java
@@ -210,6 +210,9 @@ public class TezRuntimeConfiguration {
public final static int TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT =
8 * 1024;
+ public static final String TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP = TEZ_RUNTIME_PREFIX +
+ "shuffle.use.async.http";
+ public static final boolean TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP_DEFAULT = false;
public static final String TEZ_RUNTIME_SHUFFLE_ENABLE_SSL = TEZ_RUNTIME_PREFIX +
"shuffle.ssl.enable";
@@ -352,6 +355,7 @@ public class TezRuntimeConfiguration {
tezRuntimeKeys.add(TEZ_RUNTIME_UNORDERED_OUTPUT_MAX_PER_BUFFER_SIZE_BYTES);
tezRuntimeKeys.add(TEZ_RUNTIME_PARTITIONER_CLASS);
tezRuntimeKeys.add(TEZ_RUNTIME_COMBINER_CLASS);
+ tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_PARALLEL_COPIES);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_FAILURES_LIMIT);
tezRuntimeKeys.add(TEZ_RUNTIME_SHUFFLE_FETCH_MAX_TASK_OUTPUT_AT_ONCE);
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
index 61e0151..e7c98b7 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/Fetcher.java
@@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.annotations.VisibleForTesting;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
@@ -59,7 +61,6 @@ import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.exceptions.FetcherReadTimeoutException;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
import com.google.common.base.Preconditions;
@@ -108,7 +109,7 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private URL url;
private volatile DataInputStream input;
- private HttpConnection httpConnection;
+ BaseHttpConnection httpConnection;
private HttpConnectionParams httpConnectionParams;
private final boolean localDiskFetchEnabled;
@@ -121,6 +122,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
// Initiative value is 0, which means it hasn't retried yet.
private long retryStartTime = 0;
+ private final boolean asyncHttp;
+
private final boolean isDebugEnabled = LOG.isDebugEnabled();
private Fetcher(FetcherCallback fetcherCallback, HttpConnectionParams params,
@@ -132,7 +135,8 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
boolean localDiskFetchEnabled,
boolean sharedFetchEnabled,
String localHostname,
- int shufflePort) {
+ int shufflePort, boolean asyncHttp) {
+ this.asyncHttp = asyncHttp;
this.fetcherCallback = fetcherCallback;
this.inputManager = inputManager;
this.jobTokenSecretMgr = jobTokenSecretManager;
@@ -402,13 +406,17 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
private HostFetchResult setupConnection(List<InputAttemptIdentifier> attempts) {
try {
StringBuilder baseURI = ShuffleUtils.constructBaseURIForShuffleHandler(host,
- port, partition, appId.toString(), httpConnectionParams.isSSLShuffleEnabled());
+ port, partition, appId.toString(), httpConnectionParams.isSslShuffle());
this.url = ShuffleUtils.constructInputURL(baseURI.toString(), attempts,
- httpConnectionParams.getKeepAlive());
+ httpConnectionParams.isKeepAlive());
- httpConnection = new HttpConnection(url, httpConnectionParams, logIdentifier, jobTokenSecretMgr);
+ httpConnection = ShuffleUtils.getHttpConnection(asyncHttp, url, httpConnectionParams,
+ logIdentifier, jobTokenSecretMgr);
httpConnection.connect();
- } catch (IOException e) {
+ } catch (IOException | InterruptedException e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
// ioErrs.increment(1);
// If connect did not succeed, just mark all the maps as failed,
// indirectly penalizing the host
@@ -449,6 +457,9 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
return new HostFetchResult(new FetchResult(host, port, partition, remaining),
new InputAttemptIdentifier[] { firstAttempt }, false);
}
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt(); //reset status
+ return null;
}
return null;
}
@@ -903,10 +914,11 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
public FetcherBuilder(FetcherCallback fetcherCallback,
HttpConnectionParams params, FetchedInputAllocator inputManager,
ApplicationId appId, JobTokenSecretManager jobTokenSecretMgr, String srcNameTrimmed,
- Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort) {
+ Configuration conf, boolean localDiskFetchEnabled, String localHostname, int shufflePort,
+ boolean asyncHttp) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
jobTokenSecretMgr, srcNameTrimmed, conf, null, null, null, localDiskFetchEnabled,
- false, localHostname, shufflePort);
+ false, localHostname, shufflePort, asyncHttp);
}
public FetcherBuilder(FetcherCallback fetcherCallback,
@@ -915,10 +927,10 @@ public class Fetcher extends CallableWithNdc<FetchResult> {
Configuration conf, RawLocalFileSystem localFs,
LocalDirAllocator localDirAllocator, Path lockPath,
boolean localDiskFetchEnabled, boolean sharedFetchEnabled,
- String localHostname, int shufflePort) {
+ String localHostname, int shufflePort, boolean asyncHttp) {
this.fetcher = new Fetcher(fetcherCallback, params, inputManager, appId,
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
- lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort);
+ lockPath, localDiskFetchEnabled, sharedFetchEnabled, localHostname, shufflePort, asyncHttp);
}
public FetcherBuilder setHttpConnectionParameters(HttpConnectionParams httpParams) {
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
deleted file mode 100644
index 7827f0a..0000000
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/HttpConnection.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.tez.runtime.library.common.shuffle;
-
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.security.GeneralSecurityException;
-import java.util.concurrent.TimeUnit;
-
-import javax.net.ssl.HttpsURLConnection;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.security.ssl.SSLFactory;
-import org.apache.tez.common.security.JobTokenSecretManager;
-import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
-import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
-
-import com.google.common.base.Stopwatch;
-
-/**
- * HttpConnection which can be used for Unordered / Ordered shuffle.
- */
-public class HttpConnection {
-
- private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
-
- /** Basic/unit connection timeout (in milliseconds) */
- private final static int UNIT_CONNECT_TIMEOUT = 60 * 1000;
-
- private URL url;
- private final String logIdentifier;
-
- //Shared by many threads
- private static SSLFactory sslFactory;
-
- @VisibleForTesting
- protected volatile HttpURLConnection connection;
- private volatile DataInputStream input;
-
- private volatile boolean connectionSucceeed;
- private volatile boolean cleanup;
-
- private final JobTokenSecretManager jobTokenSecretMgr;
- private String encHash;
- private String msgToEncode;
-
- private final HttpConnectionParams httpConnParams;
- private final Stopwatch stopWatch;
-
- /**
- * HttpConnection
- *
- * @param url
- * @param connParams
- * @param logIdentifier
- * @param jobTokenSecretManager
- * @throws IOException
- */
- public HttpConnection(URL url, HttpConnectionParams connParams,
- String logIdentifier, JobTokenSecretManager jobTokenSecretManager) throws IOException {
- this.logIdentifier = logIdentifier;
- this.jobTokenSecretMgr = jobTokenSecretManager;
- this.httpConnParams = connParams;
- this.url = url;
- this.stopWatch = new Stopwatch();
- if (LOG.isDebugEnabled()) {
- LOG.debug("MapOutput URL :" + url.toString());
- }
- }
-
- private void setupConnection() throws IOException {
- connection = (HttpURLConnection) url.openConnection();
- if (sslFactory != null && httpConnParams.sslShuffle) {
- try {
- ((HttpsURLConnection) connection).setSSLSocketFactory(sslFactory
- .createSSLSocketFactory());
- ((HttpsURLConnection) connection).setHostnameVerifier(sslFactory
- .getHostnameVerifier());
- } catch (GeneralSecurityException ex) {
- throw new IOException(ex);
- }
- }
- // generate hash of the url
- msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
- encHash = SecureShuffleUtils.hashFromString(msgToEncode, jobTokenSecretMgr);
-
- // put url hash into http header
- connection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH,
- encHash);
- // set the read timeout
- connection.setReadTimeout(httpConnParams.readTimeout);
- // put shuffle version into http header
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
- ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- connection.addRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
- ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- }
-
- /**
- * Connect to source
- *
- * @return true if connection was successful
- * false if connection was previously cleaned up
- * @throws IOException upon connection failure
- */
- public boolean connect() throws IOException {
- return connect(httpConnParams.connectionTimeout);
- }
-
- /**
- * Connect to source with specific timeout
- *
- * @param connectionTimeout
- * @return true if connection was successful
- * false if connection was previously cleaned up
- * @throws IOException upon connection failure
- */
- public boolean connect(int connectionTimeout) throws IOException {
- stopWatch.reset().start();
- if (connection == null) {
- setupConnection();
- }
- int unit = 0;
- if (connectionTimeout < 0) {
- throw new IOException("Invalid timeout " + "[timeout = "
- + connectionTimeout + " ms]");
- } else if (connectionTimeout > 0) {
- unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
- }
- // set the connect timeout to the unit-connect-timeout
- connection.setConnectTimeout(unit);
- int connectionFailures = 0;
- while (true) {
- long connectStartTime = System.currentTimeMillis();
- try {
- connection.connect();
- connectionSucceeed = true;
- break;
- } catch (IOException ioe) {
- // Don't attempt another connect if already cleanedup.
- connectionFailures++;
- if (cleanup) {
- LOG.info("Cleanup is set to true. Not attempting to"
- + " connect again. Last exception was: ["
- + ioe.getClass().getName() + ", " + ioe.getMessage() + "]");
- return false;
- }
- // update the total remaining connect-timeout
- connectionTimeout -= unit;
- // throw an exception if we have waited for timeout amount of time
- // note that the updated value if timeout is used here
- if (connectionTimeout <= 0) {
- throw new IOException(
- "Failed to connect to " + url + ", #connectionFailures=" + connectionFailures, ioe);
- }
- long elapsed = System.currentTimeMillis() - connectStartTime;
- if (elapsed < unit) {
- try {
- long sleepTime = unit - elapsed;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Sleeping for " + sleepTime + " while establishing connection to " + url +
- ", since connectAttempt returned in " + elapsed + " ms");
- }
- Thread.sleep(sleepTime);
- } catch (InterruptedException e) {
- throw new IOException(
- "Connection establishment sleep interrupted, #connectionFailures=" +
- connectionFailures, e);
- }
- }
-
- // reset the connect timeout for the last try
- if (connectionTimeout < unit) {
- unit = connectionTimeout;
- // reset the connect time out for the final connect
- connection.setConnectTimeout(unit);
- }
-
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Time taken to connect to " + url.toString() +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms; connectionFailures="+ connectionFailures);
- }
- return true;
- }
-
- public void validate() throws IOException {
- stopWatch.reset().start();
- int rc = connection.getResponseCode();
- if (rc != HttpURLConnection.HTTP_OK) {
- throw new IOException("Got invalid response code " + rc + " from " + url
- + ": " + connection.getResponseMessage());
- }
- // get the shuffle version
- if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(connection
- .getHeaderField(ShuffleHeader.HTTP_HEADER_NAME))
- || !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(connection
- .getHeaderField(ShuffleHeader.HTTP_HEADER_VERSION))) {
- throw new IOException("Incompatible shuffle response version");
- }
- // get the replyHash which is HMac of the encHash we sent to the server
- String replyHash =
- connection
- .getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
- if (replyHash == null) {
- throw new IOException("security validation of TT Map output failed");
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("url=" + msgToEncode + ";encHash=" + encHash + ";replyHash="
- + replyHash);
- }
- // verify that replyHash is HMac of encHash
- SecureShuffleUtils.verifyReply(replyHash, encHash, jobTokenSecretMgr);
- //Following log statement will be used by tez-tool perf-analyzer for mapping attempt to NM host
- LOG.info("for url=" + url +
- " sent hash and receievd reply " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
- }
-
- /**
- * Get the inputstream from the connection
- *
- * @return DataInputStream
- * @throws IOException
- */
- public DataInputStream getInputStream() throws IOException {
- stopWatch.reset().start();
- if (connectionSucceeed) {
- input =
- new DataInputStream(new BufferedInputStream(
- connection.getInputStream(), httpConnParams.bufferSize));
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Time taken to getInputStream (connect) " + url +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
- }
- return input;
- }
-
- /**
- * Cleanup the connection.
- *
- * @param disconnect
- * Close the connection if this is true; otherwise respect keepalive
- * @throws IOException
- */
- public void cleanup(boolean disconnect) throws IOException {
- cleanup = true;
- stopWatch.reset().start();
- try {
- if (input != null) {
- LOG.info("Closing input on " + logIdentifier);
- input.close();
- input = null;
- }
- if (httpConnParams.keepAlive && connectionSucceeed) {
- // Refer:
- // http://docs.oracle.com/javase/6/docs/technotes/guides/net/http-keepalive.html
- readErrorStream(connection.getErrorStream());
- }
- if (connection != null && (disconnect || !httpConnParams.keepAlive)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Closing connection on " + logIdentifier);
- }
- connection.disconnect();
- connection = null;
- }
- } catch (IOException e) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Exception while shutting down fetcher " + logIdentifier, e);
- } else {
- LOG.info("Exception while shutting down fetcher " + logIdentifier
- + ": " + e.getMessage());
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Time taken to cleanup connection to " + url +
- " " + stopWatch.elapsedTime(TimeUnit.MILLISECONDS) + " ms");
- }
- }
-
- /**
- * Cleanup the error stream if any, for keepAlive connections
- *
- * @param errorStream
- */
- private void readErrorStream(InputStream errorStream) {
- if (errorStream == null) {
- return;
- }
- try {
- DataOutputBuffer errorBuffer = new DataOutputBuffer();
- IOUtils.copyBytes(errorStream, errorBuffer, 4096);
- IOUtils.closeStream(errorBuffer);
- IOUtils.closeStream(errorStream);
- } catch (IOException ioe) {
- // ignore
- }
- }
-
- public static class HttpConnectionParams {
- private boolean keepAlive;
- private int keepAliveMaxConnections;
- private int connectionTimeout;
- private int readTimeout;
- private int bufferSize;
- private boolean sslShuffle;
-
- public boolean getKeepAlive() {
- return keepAlive;
- }
-
- public int getKeepAliveMaxConnections() {
- return keepAliveMaxConnections;
- }
-
- public int getConnectionTimeout() {
- return connectionTimeout;
- }
-
- public int getReadTimeout() {
- return readTimeout;
- }
-
- public void setReadTimeout(int readTimeout) {
- this.readTimeout = readTimeout;
- }
-
- public int getBufferSize() {
- return bufferSize;
- }
-
- public boolean isSSLShuffleEnabled() {
- return sslShuffle;
- }
-
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("keepAlive=").append(keepAlive).append(", ");
- sb.append("keepAliveMaxConnections=").append(keepAliveMaxConnections).append(", ");
- sb.append("connectionTimeout=").append(connectionTimeout).append(", ");
- sb.append("readTimeout=").append(readTimeout).append(", ");
- sb.append("bufferSize=").append(bufferSize).append(", ");
- sb.append("sslShuffle=").append(sslShuffle);
- return sb.toString();
- }
- }
-
- public static class HttpConnectionParamsBuilder {
- private HttpConnectionParams params;
-
- public HttpConnectionParamsBuilder() {
- params = new HttpConnectionParams();
- }
-
- public HttpConnectionParamsBuilder setKeepAlive(boolean keepAlive,
- int keepAliveMaxConnections) {
- params.keepAlive = keepAlive;
- params.keepAliveMaxConnections = keepAliveMaxConnections;
- return this;
- }
-
- public HttpConnectionParamsBuilder setTimeout(int connectionTimeout,
- int readTimeout) {
- params.connectionTimeout = connectionTimeout;
- params.readTimeout = readTimeout;
- return this;
- }
-
- public synchronized HttpConnectionParamsBuilder setSSL(boolean sslEnabled,
- Configuration conf) {
- synchronized (HttpConnectionParamsBuilder.class) {
- params.sslShuffle = sslEnabled;
- if (sslEnabled) {
- //Create sslFactory if it is null or if it was destroyed earlier
- if (sslFactory == null || sslFactory.getKeystoresFactory()
- .getTrustManagers() == null) {
- LOG.info("Initializing SSL factory in HttpConnection");
- sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
- try {
- sslFactory.init();
- } catch (Exception ex) {
- sslFactory.destroy();
- sslFactory = null;
- throw new RuntimeException(ex);
- }
- }
- }
- }
- return this;
- }
-
- public HttpConnectionParamsBuilder setBufferSize(int bufferSize) {
- params.bufferSize = bufferSize;
- return this;
- }
-
- public HttpConnectionParams build() {
- return params;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
index 46489ed..8b6e847 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/ShuffleUtils.java
@@ -33,13 +33,16 @@ import javax.crypto.SecretKey;
import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.tez.http.BaseHttpConnection;
+import org.apache.tez.http.HttpConnection;
+import org.apache.tez.http.HttpConnectionParams;
+import org.apache.tez.http.SSLFactory;
+import org.apache.tez.http.async.netty.AsyncHttpConnection;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.security.token.Token;
import org.apache.tez.common.TezCommonUtils;
@@ -54,8 +57,6 @@ import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.common.InputAttemptIdentifier;
import org.apache.tez.runtime.library.common.sort.impl.IFile;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParamsBuilder;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
import org.apache.tez.runtime.library.shuffle.impl.ShuffleUserPayloads;
@@ -66,6 +67,9 @@ public class ShuffleUtils {
private static final Logger LOG = LoggerFactory.getLogger(ShuffleUtils.class);
public static final String SHUFFLE_HANDLER_SERVICE_ID = "mapreduce_shuffle";
+ //Shared by multiple threads
+ private static volatile SSLFactory sslFactory;
+
static final ThreadLocal<DecimalFormat> MBPS_FORMAT =
new ThreadLocal<DecimalFormat>() {
@Override
@@ -213,45 +217,15 @@ public class ShuffleUtils {
return new URL(url.toString());
}
- public static HttpConnectionParams constructHttpShuffleConnectionParams(
- Configuration conf) {
- HttpConnectionParamsBuilder builder = new HttpConnectionParamsBuilder();
-
- int connectionTimeout =
- conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
-
- int readTimeout =
- conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
-
- int bufferSize =
- conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
-
- boolean keepAlive =
- conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
- int keepAliveMaxConnections =
- conf.getInt(
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
- if (keepAlive) {
- System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
- System.setProperty("http.maxConnections",
- String.valueOf(keepAliveMaxConnections));
- LOG.info("Set keepAlive max connections: " + keepAliveMaxConnections);
+ public static BaseHttpConnection getHttpConnection(boolean asyncHttp, URL url,
+ HttpConnectionParams params, String logIdentifier, JobTokenSecretManager jobTokenSecretManager)
+ throws IOException {
+ if (asyncHttp) {
+ //TODO: support other async packages? httpclient-async?
+ return new AsyncHttpConnection(url, params, logIdentifier, jobTokenSecretManager);
+ } else {
+ return new HttpConnection(url, params, logIdentifier, jobTokenSecretManager);
}
-
- builder.setTimeout(connectionTimeout, readTimeout)
- .setBufferSize(bufferSize)
- .setKeepAlive(keepAlive, keepAliveMaxConnections);
-
- boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
- TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
- builder.setSSL(sslShuffle, conf);
-
- return builder.build();
}
public static String stringify(DataMovementEventPayloadProto dmProto) {
@@ -473,5 +447,62 @@ public class ShuffleUtils {
", EndTime=" + System.currentTimeMillis() + ", TimeTaken=" + millis + ", Rate=" +
MBPS_FORMAT.get().format(rate) + " MB/s");
}
+
+ /**
+ * Build {@link org.apache.tez.http.HttpConnectionParams} from configuration
+ *
+ * @param conf
+ * @return HttpConnectionParams
+ */
+ public static HttpConnectionParams getHttpConnectionParams(Configuration conf) {
+ int connectionTimeout =
+ conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_CONNECT_TIMEOUT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_STALLED_COPY_TIMEOUT_DEFAULT);
+
+ int readTimeout = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_READ_TIMEOUT_DEFAULT);
+
+ int bufferSize = conf.getInt(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_BUFFER_SIZE_DEFAULT);
+
+ boolean keepAlive = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_ENABLED_DEFAULT);
+
+ int keepAliveMaxConnections = conf.getInt(
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_KEEP_ALIVE_MAX_CONNECTIONS_DEFAULT);
+
+ if (keepAlive) {
+ System.setProperty("sun.net.http.errorstream.enableBuffering", "true");
+ System.setProperty("http.maxConnections", String.valueOf(keepAliveMaxConnections));
+ }
+
+ boolean sslShuffle = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL,
+ TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_ENABLE_SSL_DEFAULT);
+
+ if (sslShuffle) {
+ if (sslFactory == null) {
+ synchronized (HttpConnectionParams.class) {
+ //Create sslFactory if it is null or if it was destroyed earlier
+ if (sslFactory == null || sslFactory.getKeystoresFactory().getTrustManagers() == null) {
+ sslFactory =
+ new SSLFactory(org.apache.hadoop.security.ssl.SSLFactory.Mode.CLIENT, conf);
+ try {
+ sslFactory.init();
+ } catch (Exception ex) {
+ sslFactory.destroy();
+ sslFactory = null;
+ throw new RuntimeException(ex);
+ }
+ }
+ }
+ }
+ }
+
+ HttpConnectionParams httpConnParams = new HttpConnectionParams(keepAlive,
+ keepAliveMaxConnections, connectionTimeout, readTimeout, bufferSize, sslShuffle,
+ sslFactory);
+ return httpConnParams;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9dabf947/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
----------------------------------------------------------------------
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
index f354920..b7c0742 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/impl/ShuffleManager.java
@@ -26,7 +26,6 @@ import java.text.DecimalFormat;
import java.util.Arrays;
import java.util.BitSet;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -46,7 +45,7 @@ import java.util.concurrent.locks.ReentrantLock;
import javax.crypto.SecretKey;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
+import org.apache.tez.http.HttpConnectionParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
@@ -55,7 +54,6 @@ import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.net.NetUtils;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezRuntimeFrameworkConfigs;
import org.apache.tez.common.TezUtilsInternal;
@@ -76,7 +74,6 @@ import org.apache.tez.runtime.library.common.shuffle.FetchedInput;
import org.apache.tez.runtime.library.common.shuffle.FetchedInputAllocator;
import org.apache.tez.runtime.library.common.shuffle.Fetcher;
import org.apache.tez.runtime.library.common.shuffle.FetcherCallback;
-import org.apache.tez.runtime.library.common.shuffle.HttpConnection.HttpConnectionParams;
import org.apache.tez.runtime.library.common.shuffle.InputHost;
import org.apache.tez.runtime.library.common.shuffle.ShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.FetchedInput.Type;
@@ -132,6 +129,7 @@ public class ShuffleManager implements FetcherCallback {
private final Condition wakeLoop = lock.newCondition();
private final int numFetchers;
+ private final boolean asyncHttp;
// Parameters required by Fetchers
private final JobTokenSecretManager jobTokenSecretMgr;
@@ -241,8 +239,8 @@ public class ShuffleManager implements FetcherCallback {
.getJobTokenSecretFromTokenBytes(inputContext
.getServiceConsumerMetaData(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID));
this.jobTokenSecretMgr = new JobTokenSecretManager(shuffleSecret);
- httpConnectionParams =
- ShuffleUtils.constructHttpShuffleConnectionParams(conf);
+ this.asyncHttp = conf.getBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_SHUFFLE_USE_ASYNC_HTTP, false);
+ httpConnectionParams = ShuffleUtils.getHttpConnectionParams(conf);
this.localFs = (RawLocalFileSystem) FileSystem.getLocal(conf).getRaw();
@@ -398,7 +396,7 @@ public class ShuffleManager implements FetcherCallback {
httpConnectionParams, inputManager, inputContext.getApplicationId(),
jobTokenSecretMgr, srcNameTrimmed, conf, localFs, localDirAllocator,
lockDisk, localDiskFetchEnabled, sharedFetchEnabled,
- localhostName, shufflePort);
+ localhostName, shufflePort, asyncHttp);
if (codec != null) {
fetcherBuilder.setCompressionParameters(codec);