You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by bt...@apache.org on 2023/02/10 16:40:42 UTC

[hadoop] branch trunk updated: MAPREDUCE-7431. ShuffleHandler refactor and fix after Netty4 upgrade. (#5311)

This is an automated email from the ASF dual-hosted git repository.

bteke pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 151b71d7aff MAPREDUCE-7431. ShuffleHandler refactor and fix after Netty4 upgrade. (#5311)
151b71d7aff is described below

commit 151b71d7affbbaadab5af7943f824f6ae6a6f47b
Author: Tamas Domok <td...@cloudera.com>
AuthorDate: Fri Feb 10 17:40:21 2023 +0100

    MAPREDUCE-7431. ShuffleHandler refactor and fix after Netty4 upgrade. (#5311)
---
 .../hadoop-mapreduce-client-shuffle/pom.xml        |    6 +
 .../hadoop/mapred/ShuffleChannelHandler.java       |  715 ++++++++
 .../mapred/ShuffleChannelHandlerContext.java       |  140 ++
 .../hadoop/mapred/ShuffleChannelInitializer.java   |   74 +
 .../org/apache/hadoop/mapred/ShuffleHandler.java   | 1049 ++---------
 .../hadoop/mapred/TestShuffleChannelHandler.java   |  562 ++++++
 .../apache/hadoop/mapred/TestShuffleHandler.java   | 1857 ++------------------
 .../hadoop/mapred/TestShuffleHandlerBase.java      |  172 ++
 .../src/test/resources/cert.pem                    |   27 +
 .../src/test/resources/key.pem                     |   52 +
 .../src/test/resources/log4j.properties            |    4 +-
 11 files changed, 2060 insertions(+), 2598 deletions(-)

diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
index 4e24a3d25cd..7117b4d9770 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/pom.xml
@@ -55,6 +55,12 @@
       <groupId>${leveldbjni.group}</groupId>
       <artifactId>leveldbjni-all</artifactId>
     </dependency>
+    <dependency>
+      <groupId>ch.qos.logback</groupId>
+      <artifactId>logback-classic</artifactId>
+      <version>1.1.2</version>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java
new file mode 100644
index 00000000000..49c0bb288b5
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandler.java
@@ -0,0 +1,715 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.util.CharsetUtil;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.URL;
+import java.nio.channels.ClosedChannelException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.crypto.SecretKey;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.SecureIOUtils;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleHandler.AUDITLOG;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.FETCH_RETRY_DELAY;
+import static org.apache.hadoop.mapred.ShuffleHandler.IGNORABLE_ERROR_MESSAGE;
+import static org.apache.hadoop.mapred.ShuffleHandler.RETRY_AFTER_HEADER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.TOO_MANY_REQ_STATUS;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+/**
+ * ShuffleChannelHandler verifies the map request then servers the attempts in a http stream.
+ * Before each attempt a serialised ShuffleHeader object is written with the details.
+ *
+ * <pre>
+ * Example Request
+ * ===================
+ * GET /mapOutput?job=job_1111111111111_0001&amp;reduce=0&amp;
+ *     map=attempt_1111111111111_0001_m_000001_0,
+ *     attempt_1111111111111_0002_m_000002_0,
+ *     attempt_1111111111111_0003_m_000003_0 HTTP/1.1
+ * name: mapreduce
+ * version: 1.0.0
+ * UrlHash: 9zS++qE0/7/D2l1Rg0TqRoSguAk=
+ *
+ * Example Response
+ * ===================
+ * HTTP/1.1 200 OK
+ * ReplyHash: GcuojWkAxXUyhZHPnwoV/MW2tGA=
+ * name: mapreduce
+ * version: 1.0.0
+ * connection: close
+ * content-length: 138
+ *
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 31 5f 6d 5f 30 30 |111111_0001_m_00|
+ * |00000020| 30 30 30 31 5f 30 05 0a 00                      |0001_0...       |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 61 61 61 61 61                                  |aaaaa           |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 32 5f 6d 5f 30 30 |111111_0002_m_00|
+ * |00000020| 30 30 30 32 5f 30 05 0a 00                      |0002_0...       |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 62 62 62 62 62                                  |bbbbb           |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 25 61 74 74 65 6d 70 74 5f 31 31 31 31 31 31 31 |%attempt_1111111|
+ * |00000010| 31 31 31 31 31 31 5f 30 30 30 33 5f 6d 5f 30 30 |111111_0003_m_00|
+ * |00000020| 30 30 30 33 5f 30 05 0a 00                      |0003_0...       |
+ * +--------+-------------------------------------------------+----------------+
+ * |00000000| 63 63 63 63 63                                  |ccccc           |
+ * +--------+-------------------------------------------------+----------------+
+ * </pre>
+ */
+public class ShuffleChannelHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
+  private final ShuffleChannelHandlerContext handlerCtx;
+
+  ShuffleChannelHandler(ShuffleChannelHandlerContext ctx) {
+    handlerCtx = ctx;
+  }
+
+  private List<String> splitMaps(List<String> mapq) {
+    if (null == mapq) {
+      return null;
+    }
+    final List<String> ret = new ArrayList<>();
+    for (String s : mapq) {
+      Collections.addAll(ret, s.split(","));
+    }
+    return ret;
+  }
+
+  @Override
+  public void channelActive(ChannelHandlerContext ctx)
+      throws Exception {
+    LOG.debug("Executing channelActive; channel='{}'", ctx.channel().id());
+    int numConnections = handlerCtx.activeConnections.incrementAndGet();
+    if ((handlerCtx.maxShuffleConnections > 0) &&
+        (numConnections > handlerCtx.maxShuffleConnections)) {
+      LOG.info(String.format("Current number of shuffle connections (%d) is " +
+              "greater than the max allowed shuffle connections (%d)",
+          handlerCtx.allChannels.size(), handlerCtx.maxShuffleConnections));
+
+      Map<String, String> headers = new HashMap<>(1);
+      // notify fetchers to backoff for a while before closing the connection
+      // if the shuffle connection limit is hit. Fetchers are expected to
+      // handle this notification gracefully, that is, not treating this as a
+      // fetch failure.
+      headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
+      sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
+    } else {
+      super.channelActive(ctx);
+      handlerCtx.allChannels.add(ctx.channel());
+      LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}",
+          ctx.channel(), ctx.channel().id(), handlerCtx.activeConnections.get());
+    }
+  }
+
+  @Override
+  public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+    LOG.debug("Executing channelInactive; channel='{}'", ctx.channel().id());
+    super.channelInactive(ctx);
+    int noOfConnections = handlerCtx.activeConnections.decrementAndGet();
+    LOG.debug("New value of Accepted number of connections={}", noOfConnections);
+  }
+
+  @Override
+  public void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
+    Channel channel = ctx.channel();
+    LOG.debug("Received HTTP request: {}, channel='{}'", request, channel.id());
+
+    if (request.method() != GET) {
+      sendError(ctx, METHOD_NOT_ALLOWED);
+      return;
+    }
+    // Check whether the shuffle version is compatible
+    String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
+    String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
+    if (request.headers() != null) {
+      shuffleVersion = request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
+      httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
+      LOG.debug("Received from request header: ShuffleVersion={} header name={}, channel id: {}",
+          shuffleVersion, httpHeaderName, channel.id());
+    }
+    if (request.headers() == null ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
+        !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
+      sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+      return;
+    }
+    final Map<String, List<String>> q =
+        new QueryStringDecoder(request.uri()).parameters();
+
+    final List<String> keepAliveList = q.get("keepAlive");
+    boolean keepAliveParam = false;
+    if (keepAliveList != null && keepAliveList.size() == 1) {
+      keepAliveParam = Boolean.parseBoolean(keepAliveList.get(0));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("KeepAliveParam: {} : {}, channel id: {}",
+            keepAliveList, keepAliveParam, channel.id());
+      }
+    }
+    final List<String> mapIds = splitMaps(q.get("map"));
+    final List<String> reduceQ = q.get("reduce");
+    final List<String> jobQ = q.get("job");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RECV: " + request.uri() +
+          "\n  mapId: " + mapIds +
+          "\n  reduceId: " + reduceQ +
+          "\n  jobId: " + jobQ +
+          "\n  keepAlive: " + keepAliveParam +
+          "\n  channel id: " + channel.id());
+    }
+
+    if (mapIds == null || reduceQ == null || jobQ == null) {
+      sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
+      return;
+    }
+    if (reduceQ.size() != 1 || jobQ.size() != 1) {
+      sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
+      return;
+    }
+
+    int reduceId;
+    String jobId;
+    try {
+      reduceId = Integer.parseInt(reduceQ.get(0));
+      jobId = jobQ.get(0);
+    } catch (NumberFormatException e) {
+      sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
+      return;
+    } catch (IllegalArgumentException e) {
+      sendError(ctx, "Bad job parameter", BAD_REQUEST);
+      return;
+    }
+    final String reqUri = request.uri();
+    if (null == reqUri) {
+      // TODO? add upstream?
+      sendError(ctx, FORBIDDEN);
+      return;
+    }
+    HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+    try {
+      verifyRequest(jobId, ctx, request, response,
+          new URL("http", "", handlerCtx.port, reqUri));
+    } catch (IOException e) {
+      LOG.warn("Shuffle failure ", e);
+      sendError(ctx, e.getMessage(), UNAUTHORIZED);
+      return;
+    }
+
+    Map<String, MapOutputInfo> mapOutputInfoMap = new HashMap<>();
+    ChannelPipeline pipeline = channel.pipeline();
+    ShuffleHandler.TimeoutHandler timeoutHandler =
+        (ShuffleHandler.TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+    timeoutHandler.setEnabledTimeout(false);
+    String user = handlerCtx.userRsrc.get(jobId);
+
+    try {
+      populateHeaders(mapIds, jobId, user, reduceId,
+          response, keepAliveParam, mapOutputInfoMap);
+    } catch(IOException e) {
+      LOG.error("Shuffle error while populating headers. Channel id: " + channel.id(), e);
+      sendError(ctx, getErrorMessage(e), INTERNAL_SERVER_ERROR);
+      return;
+    }
+
+    channel.write(response);
+
+    //Initialize one ReduceContext object per channelRead call
+    boolean keepAlive = keepAliveParam || handlerCtx.connectionKeepAliveEnabled;
+    ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
+        user, mapOutputInfoMap, jobId, keepAlive);
+
+    sendMap(reduceContext);
+  }
+
+  /**
+   * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
+   * and increments it. This method is first called by messageReceived()
+   * maxSessionOpenFiles times and then on the completion of every
+   * sendMapOutput operation. This limits the number of open files on a node,
+   * which can get really large(exhausting file descriptors on the NM) if all
+   * sendMapOutputs are called in one go, as was done previous to this change.
+   * @param reduceContext used to call sendMapOutput with correct params.
+   */
+  public void sendMap(ReduceContext reduceContext) {
+    LOG.trace("Executing sendMap; channel='{}'", reduceContext.ctx.channel().id());
+    if (reduceContext.getMapsToSend().get() <
+        reduceContext.getMapIds().size()) {
+      int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
+      String mapId = reduceContext.getMapIds().get(nextIndex);
+
+      try {
+        MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
+        if (info == null) {
+          info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
+              reduceContext.getJobId(), reduceContext.getUser());
+        }
+        LOG.trace("Calling sendMapOutput; channel='{}'", reduceContext.ctx.channel().id());
+        ChannelFuture nextMap = sendMapOutput(
+            reduceContext.getCtx().channel(),
+            reduceContext.getUser(), mapId,
+            reduceContext.getReduceId(), info);
+        nextMap.addListener(new ReduceMapFileCount(this, reduceContext));
+      } catch (IOException e) {
+        LOG.error("Shuffle error: {}; channel={}", e, reduceContext.ctx.channel().id());
+
+        // It is not possible to sendError, the success HttpResponse has been already sent
+        reduceContext.ctx.channel().close();
+      }
+    }
+  }
+
+  private String getErrorMessage(Throwable t) {
+    StringBuilder sb = new StringBuilder(t.getMessage());
+    while (t.getCause() != null) {
+      sb.append(t.getCause().getMessage());
+      t = t.getCause();
+    }
+    return sb.toString();
+  }
+
+  protected MapOutputInfo getMapOutputInfo(String mapId, int reduce, String jobId, String user)
+      throws IOException {
+    ShuffleHandler.AttemptPathInfo pathInfo;
+    try {
+      ShuffleHandler.AttemptPathIdentifier identifier = new ShuffleHandler.AttemptPathIdentifier(
+          jobId, user, mapId);
+      pathInfo = handlerCtx.pathCache.get(identifier);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Retrieved pathInfo for " + identifier +
+            " check for corresponding loaded messages to determine whether" +
+            " it was loaded or cached");
+      }
+    } catch (ExecutionException e) {
+      if (e.getCause() instanceof IOException) {
+        throw (IOException) e.getCause();
+      } else {
+        throw new RuntimeException(e.getCause());
+      }
+    }
+
+    IndexRecord info =
+        handlerCtx.indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
+          ",dataFile=" + pathInfo.dataPath + ", indexFile=" +
+          pathInfo.indexPath);
+      LOG.debug("getMapOutputInfo: startOffset={}, partLength={} rawLength={}",
+          info.startOffset, info.partLength, info.rawLength);
+    }
+
+    return new MapOutputInfo(pathInfo.dataPath, info);
+  }
+
+  protected void populateHeaders(List<String> mapIds, String jobId,
+                                 String user, int reduce, HttpResponse response,
+                                 boolean keepAliveParam,
+                                 Map<String, MapOutputInfo> mapOutputInfoMap)
+      throws IOException {
+
+    long contentLength = 0;
+    for (String mapId : mapIds) {
+      MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
+      if (mapOutputInfoMap.size() < handlerCtx.mapOutputMetaInfoCacheSize) {
+        mapOutputInfoMap.put(mapId, outputInfo);
+      }
+
+      ShuffleHeader header =
+          new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
+              outputInfo.indexRecord.rawLength, reduce);
+      DataOutputBuffer dob = new DataOutputBuffer();
+      header.write(dob);
+      contentLength += outputInfo.indexRecord.partLength;
+      contentLength += dob.getLength();
+
+      // verify file access to data file to send an actually correct http error
+      final File spillFile = new File(outputInfo.mapOutputFileName.toString());
+      RandomAccessFile r = SecureIOUtils.openForRandomRead(spillFile, "r", user, null);
+      r.close();
+    }
+
+    // Now set the response headers.
+    setResponseHeaders(response, keepAliveParam, contentLength);
+
+    // this audit log is disabled by default,
+    // to turn it on please enable this audit log
+    // on log4j.properties by uncommenting the setting
+    if (AUDITLOG.isDebugEnabled()) {
+      StringBuilder sb = new StringBuilder("shuffle for ");
+      sb.append(jobId).append(" reducer ").append(reduce);
+      sb.append(" length ").append(contentLength);
+      if (AUDITLOG.isTraceEnabled()) {
+        // For trace level logging, append the list of mappers
+        sb.append(" mappers: ").append(mapIds);
+        AUDITLOG.trace(sb.toString());
+      } else {
+        AUDITLOG.debug(sb.toString());
+      }
+    }
+  }
+
+  protected void setResponseHeaders(HttpResponse response,
+                                    boolean keepAliveParam, long contentLength) {
+    if (!handlerCtx.connectionKeepAliveEnabled && !keepAliveParam) {
+      response.headers().set(HttpHeader.CONNECTION.asString(), CONNECTION_CLOSE);
+    } else {
+      response.headers().set(HttpHeader.CONNECTION.asString(),
+          HttpHeader.KEEP_ALIVE.asString());
+      response.headers().set(HttpHeader.KEEP_ALIVE.asString(),
+          "timeout=" + handlerCtx.connectionKeepAliveTimeOut);
+    }
+
+    // Content length must be set (https://www.rfc-editor.org/rfc/rfc7230#section-3.3.3)
+    HttpUtil.setContentLength(response, contentLength);
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  static class MapOutputInfo {
+    final Path mapOutputFileName;
+    final IndexRecord indexRecord;
+
+    MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
+      this.mapOutputFileName = mapOutputFileName;
+      this.indexRecord = indexRecord;
+    }
+  }
+
+  protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+                               HttpRequest request, HttpResponse response, URL requestUri)
+      throws IOException {
+    SecretKey tokenSecret = handlerCtx.secretManager.retrieveTokenSecret(appid);
+    if (null == tokenSecret) {
+      LOG.info("Request for unknown token {}, channel id: {}", appid, ctx.channel().id());
+      throw new IOException("Could not find jobid");
+    }
+    // encrypting URL
+    String encryptedURL = SecureShuffleUtils.buildMsgFrom(requestUri);
+    // hash from the fetcher
+    String urlHashStr =
+        request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+    if (urlHashStr == null) {
+      LOG.info("Missing header hash for {}, channel id: {}", appid, ctx.channel().id());
+      throw new IOException("fetcher cannot be authenticated");
+    }
+    if (LOG.isDebugEnabled()) {
+      int len = urlHashStr.length();
+      LOG.debug("Verifying request. encryptedURL:{}, hash:{}, channel id: " +
+              "{}", encryptedURL,
+          urlHashStr.substring(len - len / 2, len - 1), ctx.channel().id());
+    }
+    // verify - throws exception
+    SecureShuffleUtils.verifyReply(urlHashStr, encryptedURL, tokenSecret);
+    // verification passed - encode the reply
+    String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
+        tokenSecret);
+    response.headers().set(
+        SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+    // Put shuffle version into http header
+    response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    if (LOG.isDebugEnabled()) {
+      int len = reply.length();
+      LOG.debug("Fetcher request verified. " +
+              "encryptedURL: {}, reply: {}, channel id: {}",
+          encryptedURL, reply.substring(len - len / 2, len - 1),
+          ctx.channel().id());
+    }
+  }
+
+  public static ByteBuf shuffleHeaderToBytes(ShuffleHeader header) throws IOException {
+    final DataOutputBuffer dob = new DataOutputBuffer();
+    header.write(dob);
+    return wrappedBuffer(dob.getData(), 0, dob.getLength());
+  }
+
+  protected ChannelFuture sendMapOutput(Channel ch, String user, String mapId, int reduce,
+                                        MapOutputInfo mapOutputInfo)
+      throws IOException {
+    final IndexRecord info = mapOutputInfo.indexRecord;
+    ch.write(shuffleHeaderToBytes(
+        new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce)));
+    final File spillFile =
+        new File(mapOutputInfo.mapOutputFileName.toString());
+    RandomAccessFile spill = SecureIOUtils.openForRandomRead(spillFile, "r", user, null);
+    ChannelFuture writeFuture;
+    if (ch.pipeline().get(SslHandler.class) == null) {
+      final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
+          info.startOffset, info.partLength, handlerCtx.manageOsCache, handlerCtx.readaheadLength,
+          handlerCtx.readaheadPool, spillFile.getAbsolutePath(),
+          handlerCtx.shuffleBufferSize, handlerCtx.shuffleTransferToAllowed);
+      writeFuture = ch.writeAndFlush(partition);
+      // TODO error handling; distinguish IO/connection failures,
+      //      attribute to appropriate spill output
+      writeFuture.addListener((ChannelFutureListener) future -> {
+        if (future.isSuccess()) {
+          partition.transferSuccessful();
+        }
+        partition.deallocate();
+      });
+    } else {
+      // HTTPS cannot be done with zero copy.
+      final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
+          info.startOffset, info.partLength, handlerCtx.sslFileBufferSize,
+          handlerCtx.manageOsCache, handlerCtx.readaheadLength, handlerCtx.readaheadPool,
+          spillFile.getAbsolutePath());
+      writeFuture = ch.writeAndFlush(chunk);
+    }
+
+    handlerCtx.metrics.shuffleConnections.incr();
+    handlerCtx.metrics.shuffleOutputBytes.incr(info.partLength); // optimistic
+    return writeFuture;
+  }
+
+  protected void sendError(ChannelHandlerContext ctx,
+                           HttpResponseStatus status) {
+    sendError(ctx, "", status);
+  }
+
+  protected void sendError(ChannelHandlerContext ctx, String message,
+                           HttpResponseStatus status) {
+    sendError(ctx, message, status, Collections.emptyMap());
+  }
+
+  protected void sendError(ChannelHandlerContext ctx, String msg,
+                           HttpResponseStatus status, Map<String, String> headers) {
+    FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status,
+        Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
+    response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    // Put shuffle version into http header
+    response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    for (Map.Entry<String, String> header : headers.entrySet()) {
+      response.headers().set(header.getKey(), header.getValue());
+    }
+    HttpUtil.setContentLength(response, response.content().readableBytes());
+
+    // Close the connection as soon as the error message is sent.
+    ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+    // TODO: missing keep-alive handling
+  }
+
+  @Override
+  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
+      throws Exception {
+    Channel ch = ctx.channel();
+    if (cause instanceof TooLongFrameException) {
+      LOG.trace("TooLongFrameException, channel id: {}", ch.id());
+      sendError(ctx, BAD_REQUEST);
+      return;
+    } else if (cause instanceof IOException) {
+      if (cause instanceof ClosedChannelException) {
+        LOG.debug("Ignoring closed channel error, channel id: " + ch.id(), cause);
+        return;
+      }
+      String message = String.valueOf(cause.getMessage());
+      if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
+        LOG.debug("Ignoring client socket close, channel id: " + ch.id(), cause);
+        return;
+      }
+    }
+
+    LOG.error("Shuffle error. Channel id: " + ch.id(), cause);
+    if (ch.isActive()) {
+      sendError(ctx, INTERNAL_SERVER_ERROR);
+    }
+  }
+
+  /**
+   * Maintain parameters per messageReceived() Netty context.
+   * Allows sendMapOutput calls from operationComplete()
+   */
+  public static class ReduceContext {
+    private final List<String> mapIds;
+    private final AtomicInteger mapsToWait;
+    private final AtomicInteger mapsToSend;
+    private final int reduceId;
+    private final ChannelHandlerContext ctx;
+    private final String user;
+    private final Map<String, ShuffleChannelHandler.MapOutputInfo> infoMap;
+    private final String jobId;
+    private final boolean keepAlive;
+
+    ReduceContext(List<String> mapIds, int rId,
+                  ChannelHandlerContext context, String usr,
+                  Map<String, ShuffleChannelHandler.MapOutputInfo> mapOutputInfoMap,
+                  String jobId, boolean keepAlive) {
+
+      this.mapIds = mapIds;
+      this.reduceId = rId;
+      /*
+       * Atomic count for tracking the no. of map outputs that are yet to
+       * complete. Multiple futureListeners' operationComplete() can decrement
+       * this value asynchronously. It is used to decide when the channel should
+       * be closed.
+       */
+      this.mapsToWait = new AtomicInteger(mapIds.size());
+      /*
+       * Atomic count for tracking the no. of map outputs that have been sent.
+       * Multiple sendMap() calls can increment this value
+       * asynchronously. Used to decide which mapId should be sent next.
+       */
+      this.mapsToSend = new AtomicInteger(0);
+      this.ctx = context;
+      this.user = usr;
+      this.infoMap = mapOutputInfoMap;
+      this.jobId = jobId;
+      this.keepAlive = keepAlive;
+    }
+
+    public int getReduceId() {
+      return reduceId;
+    }
+
+    public ChannelHandlerContext getCtx() {
+      return ctx;
+    }
+
+    public String getUser() {
+      return user;
+    }
+
+    public Map<String, ShuffleChannelHandler.MapOutputInfo> getInfoMap() {
+      return infoMap;
+    }
+
+    public String getJobId() {
+      return jobId;
+    }
+
+    public List<String> getMapIds() {
+      return mapIds;
+    }
+
+    public AtomicInteger getMapsToSend() {
+      return mapsToSend;
+    }
+
+    public AtomicInteger getMapsToWait() {
+      return mapsToWait;
+    }
+
+    public boolean getKeepAlive() {
+      return keepAlive;
+    }
+  }
+
+  static class ReduceMapFileCount implements ChannelFutureListener {
+    private final ShuffleChannelHandler handler;
+    private final ReduceContext reduceContext;
+
+    ReduceMapFileCount(ShuffleChannelHandler handler, ReduceContext rc) {
+      this.handler = handler;
+      this.reduceContext = rc;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      LOG.trace("SendMap operation complete; mapsToWait='{}', channel='{}'",
+          this.reduceContext.getMapsToWait().get(), future.channel().id());
+      if (!future.isSuccess()) {
+        LOG.error("Future is unsuccessful. channel='{}' Cause: ",
+            future.channel().id(), future.cause());
+        future.channel().close();
+        return;
+      }
+      int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
+      if (waitCount == 0) {
+        ChannelFuture lastContentFuture =
+            future.channel().writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+        handler.handlerCtx.metrics.operationComplete(future);
+
+        // Let the idle timer handler close keep-alive connections
+        if (reduceContext.getKeepAlive()) {
+          LOG.trace("SendMap operation complete, keeping alive the connection; channel='{}'",
+              future.channel().id());
+          ChannelPipeline pipeline = future.channel().pipeline();
+          ShuffleHandler.TimeoutHandler timeoutHandler =
+              (ShuffleHandler.TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
+          timeoutHandler.setEnabledTimeout(true);
+        } else {
+          LOG.trace("SendMap operation complete, closing connection; channel='{}'",
+              future.channel().id());
+          lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+        }
+      } else {
+        LOG.trace("SendMap operation complete, waitCount > 0, " +
+                "invoking sendMap with reduceContext; channel='{}'",
+            future.channel().id());
+        handler.sendMap(reduceContext);
+      }
+    }
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandlerContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandlerContext.java
new file mode 100644
index 00000000000..fa037e98e83
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelHandlerContext.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.channel.group.ChannelGroup;
+
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.util.Shell;
+
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_MAX_SHUFFLE_CONNECTIONS;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_BUFFER_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MANAGE_OS_CACHE;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_READAHEAD_BYTES;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED;
+import static org.apache.hadoop.mapred.ShuffleHandler.DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.MAX_SHUFFLE_CONNECTIONS;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_BUFFER_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MANAGE_OS_CACHE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_READAHEAD_BYTES;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_TRANSFERTO_ALLOWED;
+import static org.apache.hadoop.mapred.ShuffleHandler.SUFFLE_SSL_FILE_BUFFER_SIZE_KEY;
+import static org.apache.hadoop.mapred.ShuffleHandler.WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED;
+
+@SuppressWarnings("checkstyle:VisibilityModifier")
+public class ShuffleChannelHandlerContext {
+
+  public final Configuration conf;
+  public final JobTokenSecretManager secretManager;
+  public final Map<String, String> userRsrc;
+  public final LoadingCache<ShuffleHandler.AttemptPathIdentifier,
+      ShuffleHandler.AttemptPathInfo> pathCache;
+  public final IndexCache indexCache;
+  public final ShuffleHandler.ShuffleMetrics metrics;
+  public final ChannelGroup allChannels;
+
+
+  public final boolean connectionKeepAliveEnabled;
+  public final int sslFileBufferSize;
+  public final int connectionKeepAliveTimeOut;
+  public final int mapOutputMetaInfoCacheSize;
+
+  public final AtomicInteger activeConnections = new AtomicInteger();
+
+  /**
+   * Should the shuffle use posix_fadvise calls to manage the OS cache during
+   * sendfile.
+   */
+  public final boolean manageOsCache;
+  public final int readaheadLength;
+  public final int maxShuffleConnections;
+  public final int shuffleBufferSize;
+  public final boolean shuffleTransferToAllowed;
+  public final int maxSessionOpenFiles;
+  public final ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
+
+  public int port = -1;
+
+  public ShuffleChannelHandlerContext(Configuration conf,
+                                      Map<String, String> userRsrc,
+                                      JobTokenSecretManager secretManager,
+                                      LoadingCache<ShuffleHandler.AttemptPathIdentifier,
+                                          ShuffleHandler.AttemptPathInfo> patCache,
+                                      IndexCache indexCache,
+                                      ShuffleHandler.ShuffleMetrics metrics,
+                                      ChannelGroup allChannels) {
+    this.conf = conf;
+    this.userRsrc = userRsrc;
+    this.secretManager = secretManager;
+    this.pathCache = patCache;
+    this.indexCache = indexCache;
+    this.metrics = metrics;
+    this.allChannels = allChannels;
+
+    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
+        DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
+    connectionKeepAliveEnabled =
+        conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
+            DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
+    connectionKeepAliveTimeOut =
+        Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
+            DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
+    mapOutputMetaInfoCacheSize =
+        Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
+            DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
+
+    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
+        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
+
+    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
+        DEFAULT_SHUFFLE_READAHEAD_BYTES);
+
+    maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS,
+        DEFAULT_MAX_SHUFFLE_CONNECTIONS);
+
+    shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE,
+        DEFAULT_SHUFFLE_BUFFER_SIZE);
+
+    shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
+        (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
+            DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
+
+    maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
+        DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
+  }
+
+  void setPort(int port) {
+    this.port = port;
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelInitializer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelInitializer.java
new file mode 100644
index 00000000000..25f01322df9
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleChannelInitializer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapred.ShuffleHandler.LOG;
+
+public class ShuffleChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+  public static final int MAX_CONTENT_LENGTH = 1 << 16;
+
+  private final ShuffleChannelHandlerContext handlerContext;
+  private final SSLFactory sslFactory;
+
+
+  public ShuffleChannelInitializer(ShuffleChannelHandlerContext ctx, SSLFactory sslFactory) {
+    this.handlerContext = ctx;
+    this.sslFactory = sslFactory;
+  }
+
+  @Override
+  public void initChannel(SocketChannel ch) throws GeneralSecurityException, IOException {
+    LOG.debug("ShuffleChannelInitializer init; channel='{}'", ch.id());
+
+    ChannelPipeline pipeline = ch.pipeline();
+    if (sslFactory != null) {
+      pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+    }
+    pipeline.addLast("http", new HttpServerCodec());
+    pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
+    pipeline.addLast("chunking", new ChunkedWriteHandler());
+
+    // An EventExecutorGroup could be specified to run in a
+    // different thread than an I/O thread so that the I/O thread
+    // is not blocked by a time-consuming task:
+    // https://netty.io/4.1/api/io/netty/channel/ChannelPipeline.html
+    pipeline.addLast("shuffle", new ShuffleChannelHandler(handlerContext));
+
+    pipeline.addLast(TIMEOUT_HANDLER,
+        new ShuffleHandler.TimeoutHandler(handlerContext.connectionKeepAliveTimeOut));
+    // TODO factor security manager into pipeline
+    // TODO factor out encode/decode to permit binary shuffle
+    // TODO factor out decode of index to permit alt. models
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index e4a43f85b94..2fcfbf36db6 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -1,4 +1,4 @@
-/**
+/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
@@ -18,94 +18,52 @@
 
 package org.apache.hadoop.mapred;
 
-import static io.netty.buffer.Unpooled.wrappedBuffer;
-import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
-import static io.netty.handler.codec.http.HttpMethod.GET;
-import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static org.apache.hadoop.mapred.ShuffleHandler.NettyChannelHelper.*;
 import static org.fusesource.leveldbjni.JniDBFactory.asString;
 import static org.fusesource.leveldbjni.JniDBFactory.bytes;
 
 import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.net.InetSocketAddress;
-import java.net.URL;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
-import javax.crypto.SecretKey;
-
 import io.netty.bootstrap.ServerBootstrap;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.ChannelPromise;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.group.ChannelGroup;
 import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
-import io.netty.handler.codec.TooLongFrameException;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpRequestDecoder;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseEncoder;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.codec.http.LastHttpContent;
-import io.netty.handler.codec.http.QueryStringDecoder;
-import io.netty.handler.ssl.SslHandler;
-import io.netty.handler.stream.ChunkedWriteHandler;
 import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
-import io.netty.util.CharsetUtil;
-import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
-import org.apache.hadoop.io.ReadaheadPool;
-import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.proto.ShuffleHandlerRecoveryProtos.JobShuffleInfoProto;
 import org.apache.hadoop.mapreduce.MRConfig;
-import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.annotation.Metric;
 import org.apache.hadoop.metrics2.annotation.Metrics;
@@ -116,8 +74,6 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
 import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
@@ -132,23 +88,17 @@ import org.fusesource.leveldbjni.internal.NativeDB;
 import org.iq80.leveldb.DB;
 import org.iq80.leveldb.DBException;
 import org.iq80.leveldb.Options;
-import org.eclipse.jetty.http.HttpHeader;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hadoop.classification.VisibleForTesting;
-import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
-import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
-import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
 import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.thirdparty.protobuf.ByteString;
 
 public class ShuffleHandler extends AuxiliaryService {
 
-  private static final org.slf4j.Logger LOG =
+  public static final org.slf4j.Logger LOG =
       LoggerFactory.getLogger(ShuffleHandler.class);
-  private static final org.slf4j.Logger AUDITLOG =
+  public static final org.slf4j.Logger AUDITLOG =
       LoggerFactory.getLogger(ShuffleHandler.class.getName()+".audit");
   public static final String SHUFFLE_MANAGE_OS_CACHE = "mapreduce.shuffle.manage.os.cache";
   public static final boolean DEFAULT_SHUFFLE_MANAGE_OS_CACHE = true;
@@ -170,7 +120,7 @@ public class ShuffleHandler extends AuxiliaryService {
   
   // pattern to identify errors related to the client closing the socket early
   // idea borrowed from Netty SslHandler
-  private static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
+  public static final Pattern IGNORABLE_ERROR_MESSAGE = Pattern.compile(
       "^.*(?:connection.*reset|connection.*closed|broken.*pipe).*$",
       Pattern.CASE_INSENSITIVE);
 
@@ -187,37 +137,21 @@ public class ShuffleHandler extends AuxiliaryService {
   // This should be kept in sync with Fetcher.FETCH_RETRY_DELAY_DEFAULT
   public static final long FETCH_RETRY_DELAY = 1000L;
   public static final String RETRY_AFTER_HEADER = "Retry-After";
-  static final String ENCODER_HANDLER_NAME = "encoder";
 
   private int port;
   private EventLoopGroup bossGroup;
   private EventLoopGroup workerGroup;
-  private ServerBootstrap bootstrap;
-  private Channel ch;
-  private final ChannelGroup accepted =
-      new DefaultChannelGroup(new DefaultEventExecutorGroup(5).next());
-  private final AtomicInteger activeConnections = new AtomicInteger();
-  protected HttpPipelineFactory pipelineFact;
-  private int sslFileBufferSize;
-
-  //TODO snemeth add a config option for these later, this is temporarily disabled for now.
-  private boolean useOutboundExceptionHandler = false;
-  private boolean useOutboundLogger = false;
-  
-  /**
-   * Should the shuffle use posix_fadvise calls to manage the OS cache during
-   * sendfile.
-   */
-  private boolean manageOsCache;
-  private int readaheadLength;
-  private int maxShuffleConnections;
-  private int shuffleBufferSize;
-  private boolean shuffleTransferToAllowed;
-  private int maxSessionOpenFiles;
-  private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
-  private Map<String, String> userRsrc;
-  private JobTokenSecretManager secretManager;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final ChannelGroup allChannels =
+      new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
+
+  private SSLFactory sslFactory;
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected JobTokenSecretManager secretManager;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected Map<String, String> userRsrc;
 
   private DB stateDb = null;
 
@@ -276,9 +210,6 @@ public class ShuffleHandler extends AuxiliaryService {
       "mapreduce.shuffle.max.session-open-files";
   public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
 
-  boolean connectionKeepAliveEnabled = false;
-  private int connectionKeepAliveTimeOut;
-  private int mapOutputMetaInfoCacheSize;
 
   @Metrics(about="Shuffle output metrics", context="mapred")
   static class ShuffleMetrics implements ChannelFutureListener {
@@ -302,170 +233,11 @@ public class ShuffleHandler extends AuxiliaryService {
     }
   }
 
-  static class NettyChannelHelper {
-    static ChannelFuture writeToChannel(Channel ch, Object obj) {
-      LOG.debug("Writing {} to channel: {}", obj.getClass().getSimpleName(), ch.id());
-      return ch.writeAndFlush(obj);
-    }
-
-    static ChannelFuture writeToChannelAndClose(Channel ch, Object obj) {
-      return writeToChannel(ch, obj).addListener(ChannelFutureListener.CLOSE);
-    }
-
-    static ChannelFuture writeToChannelAndAddLastHttpContent(Channel ch, HttpResponse obj) {
-      writeToChannel(ch, obj);
-      return writeLastHttpContentToChannel(ch);
-    }
-
-    static ChannelFuture writeLastHttpContentToChannel(Channel ch) {
-      LOG.debug("Writing LastHttpContent, channel id: {}", ch.id());
-      return ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-    }
-
-    static ChannelFuture closeChannel(Channel ch) {
-      LOG.debug("Closing channel, channel id: {}", ch.id());
-      return ch.close();
-    }
-
-    static void closeChannels(ChannelGroup channelGroup) {
-      channelGroup.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
-    }
-
-    public static ChannelFuture closeAsIdle(Channel ch, int timeout) {
-      LOG.debug("Closing channel as writer was idle for {} seconds", timeout);
-      return closeChannel(ch);
-    }
-
-    public static void channelActive(Channel ch) {
-      LOG.debug("Executing channelActive, channel id: {}", ch.id());
-    }
-
-    public static void channelInactive(Channel ch) {
-      LOG.debug("Executing channelInactive, channel id: {}", ch.id());
-    }
-  }
-
-  private final MetricsSystem ms;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected final MetricsSystem ms;
+  @SuppressWarnings("checkstyle:VisibilityModifier")
   final ShuffleMetrics metrics;
 
-  class ReduceMapFileCount implements ChannelFutureListener {
-
-    private ReduceContext reduceContext;
-
-    ReduceMapFileCount(ReduceContext rc) {
-      this.reduceContext = rc;
-    }
-
-    @Override
-    public void operationComplete(ChannelFuture future) throws Exception {
-      LOG.trace("operationComplete");
-      if (!future.isSuccess()) {
-        LOG.error("Future is unsuccessful. Cause: ", future.cause());
-        closeChannel(future.channel());
-        return;
-      }
-      int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
-      if (waitCount == 0) {
-        LOG.trace("Finished with all map outputs");
-        //HADOOP-15327: Need to send an instance of LastHttpContent to define HTTP
-        //message boundaries. See details in jira.
-        writeLastHttpContentToChannel(future.channel());
-        metrics.operationComplete(future);
-        // Let the idle timer handler close keep-alive connections
-        if (reduceContext.getKeepAlive()) {
-          ChannelPipeline pipeline = future.channel().pipeline();
-          TimeoutHandler timeoutHandler =
-              (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
-          timeoutHandler.setEnabledTimeout(true);
-        } else {
-          closeChannel(future.channel());
-        }
-      } else {
-        LOG.trace("operationComplete, waitCount > 0, invoking sendMap with reduceContext");
-        pipelineFact.getSHUFFLE().sendMap(reduceContext);
-      }
-    }
-  }
-
-  /**
-   * Maintain parameters per messageReceived() Netty context.
-   * Allows sendMapOutput calls from operationComplete()
-   */
-  private static class ReduceContext {
-    private List<String> mapIds;
-    private AtomicInteger mapsToWait;
-    private AtomicInteger mapsToSend;
-    private int reduceId;
-    private ChannelHandlerContext ctx;
-    private String user;
-    private Map<String, Shuffle.MapOutputInfo> infoMap;
-    private String jobId;
-    private final boolean keepAlive;
-
-    ReduceContext(List<String> mapIds, int rId,
-                         ChannelHandlerContext context, String usr,
-                         Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
-                         String jobId, boolean keepAlive) {
-
-      this.mapIds = mapIds;
-      this.reduceId = rId;
-      /**
-      * Atomic count for tracking the no. of map outputs that are yet to
-      * complete. Multiple futureListeners' operationComplete() can decrement
-      * this value asynchronously. It is used to decide when the channel should
-      * be closed.
-      */
-      this.mapsToWait = new AtomicInteger(mapIds.size());
-      /**
-      * Atomic count for tracking the no. of map outputs that have been sent.
-      * Multiple sendMap() calls can increment this value
-      * asynchronously. Used to decide which mapId should be sent next.
-      */
-      this.mapsToSend = new AtomicInteger(0);
-      this.ctx = context;
-      this.user = usr;
-      this.infoMap = mapOutputInfoMap;
-      this.jobId = jobId;
-      this.keepAlive = keepAlive;
-    }
-
-    public int getReduceId() {
-      return reduceId;
-    }
-
-    public ChannelHandlerContext getCtx() {
-      return ctx;
-    }
-
-    public String getUser() {
-      return user;
-    }
-
-    public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
-      return infoMap;
-    }
-
-    public String getJobId() {
-      return jobId;
-    }
-
-    public List<String> getMapIds() {
-      return mapIds;
-    }
-
-    public AtomicInteger getMapsToSend() {
-      return mapsToSend;
-    }
-
-    public AtomicInteger getMapsToWait() {
-      return mapsToWait;
-    }
-
-    public boolean getKeepAlive() {
-      return keepAlive;
-    }
-  }
-
   ShuffleHandler(MetricsSystem ms) {
     super(MAPREDUCE_SHUFFLE_SERVICEID);
     this.ms = ms;
@@ -480,18 +252,20 @@ public class ShuffleHandler extends AuxiliaryService {
    * Serialize the shuffle port into a ByteBuffer for use later on.
    * @param port the port to be sent to the ApplciationMaster
    * @return the serialized form of the port.
+   * @throws IOException on failure
    */
   public static ByteBuffer serializeMetaData(int port) throws IOException {
     //TODO these bytes should be versioned
-    DataOutputBuffer port_dob = new DataOutputBuffer();
-    port_dob.writeInt(port);
-    return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+    DataOutputBuffer portDob = new DataOutputBuffer();
+    portDob.writeInt(port);
+    return ByteBuffer.wrap(portDob.getData(), 0, portDob.getLength());
   }
 
   /**
    * A helper function to deserialize the metadata returned by ShuffleHandler.
    * @param meta the metadata returned by the ShuffleHandler
    * @return the port the Shuffle Handler is listening on to serve shuffle data.
+   * @throws IOException on failure
    */
   public static int deserializeMetaData(ByteBuffer meta) throws IOException {
     //TODO this should be returning a class not just an int
@@ -507,16 +281,18 @@ public class ShuffleHandler extends AuxiliaryService {
    * @param jobToken the job token to be used for authentication of
    * shuffle data requests.
    * @return the serialized version of the jobToken.
+   * @throws IOException on failure
    */
   public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> jobToken)
       throws IOException {
     //TODO these bytes should be versioned
-    DataOutputBuffer jobToken_dob = new DataOutputBuffer();
-    jobToken.write(jobToken_dob);
-    return ByteBuffer.wrap(jobToken_dob.getData(), 0, jobToken_dob.getLength());
+    DataOutputBuffer jobTokenDob = new DataOutputBuffer();
+    jobToken.write(jobTokenDob);
+    return ByteBuffer.wrap(jobTokenDob.getData(), 0, jobTokenDob.getLength());
   }
 
-  static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret) throws IOException {
+  public static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer secret)
+      throws IOException {
     DataInputByteBuffer in = new DataInputByteBuffer();
     in.reset(secret);
     Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>();
@@ -556,14 +332,6 @@ public class ShuffleHandler extends AuxiliaryService {
 
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
-    manageOsCache = conf.getBoolean(SHUFFLE_MANAGE_OS_CACHE,
-        DEFAULT_SHUFFLE_MANAGE_OS_CACHE);
-
-    readaheadLength = conf.getInt(SHUFFLE_READAHEAD_BYTES,
-        DEFAULT_SHUFFLE_READAHEAD_BYTES);
-    
-    maxShuffleConnections = conf.getInt(MAX_SHUFFLE_CONNECTIONS, 
-                                        DEFAULT_MAX_SHUFFLE_CONNECTIONS);
     int maxShuffleThreads = conf.getInt(MAX_SHUFFLE_THREADS,
                                         DEFAULT_MAX_SHUFFLE_THREADS);
     // Since Netty 4.x, the value of 0 threads would default to:
@@ -574,16 +342,6 @@ public class ShuffleHandler extends AuxiliaryService {
     if (maxShuffleThreads == 0) {
       maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
     }
-    
-    shuffleBufferSize = conf.getInt(SHUFFLE_BUFFER_SIZE, 
-                                    DEFAULT_SHUFFLE_BUFFER_SIZE);
-        
-    shuffleTransferToAllowed = conf.getBoolean(SHUFFLE_TRANSFERTO_ALLOWED,
-         (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
-                         DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
-
-    maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
-        DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
 
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
         .setNameFormat("ShuffleHandler Netty Boss #%d")
@@ -592,66 +350,117 @@ public class ShuffleHandler extends AuxiliaryService {
         .setNameFormat("ShuffleHandler Netty Worker #%d")
         .build();
     
-    bossGroup = new NioEventLoopGroup(maxShuffleThreads, bossFactory);
+    bossGroup = new NioEventLoopGroup(1, bossFactory);
     workerGroup = new NioEventLoopGroup(maxShuffleThreads, workerFactory);
     super.serviceInit(new Configuration(conf));
   }
 
+  protected ShuffleChannelHandlerContext createHandlerContext() {
+    Configuration conf = getConfig();
+
+    final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache =
+        CacheBuilder.newBuilder().expireAfterAccess(
+                conf.getInt(EXPIRE_AFTER_ACCESS_MINUTES, DEFAULT_EXPIRE_AFTER_ACCESS_MINUTES),
+                TimeUnit.MINUTES).softValues().concurrencyLevel(conf.getInt(CONCURRENCY_LEVEL,
+                DEFAULT_CONCURRENCY_LEVEL)).
+            removalListener(
+                (RemovalListener<AttemptPathIdentifier, AttemptPathInfo>) notification -> {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("PathCache Eviction: " + notification.getKey() +
+                        ", Reason=" + notification.getCause());
+                  }
+                }
+            ).maximumWeight(conf.getInt(MAX_WEIGHT, DEFAULT_MAX_WEIGHT)).weigher(
+                (key, value) -> key.jobId.length() + key.user.length() +
+                    key.attemptId.length()+
+                    value.indexPath.toString().length() +
+                    value.dataPath.toString().length()
+            ).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() {
+              @Override
+              public AttemptPathInfo load(@Nonnull AttemptPathIdentifier key) throws
+                  Exception {
+                String base = getBaseLocation(key.jobId, key.user);
+                String attemptBase = base + key.attemptId;
+                Path indexFileName = getAuxiliaryLocalPathHandler()
+                    .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME);
+                Path mapOutputFileName = getAuxiliaryLocalPathHandler()
+                    .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME);
+
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Loaded : " + key + " via loader");
+                }
+                return new AttemptPathInfo(indexFileName, mapOutputFileName);
+              }
+            });
+
+    return new ShuffleChannelHandlerContext(conf,
+        userRsrc,
+        secretManager,
+        pathCache,
+        new IndexCache(new JobConf(conf)),
+        metrics,
+        allChannels
+    );
+  }
+
   // TODO change AbstractService to throw InterruptedException
   @Override
   protected void serviceStart() throws Exception {
     Configuration conf = getConfig();
-    userRsrc = new ConcurrentHashMap<String,String>();
+    userRsrc = new ConcurrentHashMap<>();
     secretManager = new JobTokenSecretManager();
     recoverState(conf);
-    try {
-      pipelineFact = new HttpPipelineFactory(conf);
-    } catch (Exception ex) {
-      throw new RuntimeException(ex);
+
+    if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+        MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+      LOG.info("Encrypted shuffle is enabled.");
+      sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+      sslFactory.init();
     }
 
-    bootstrap = new ServerBootstrap();
+    ShuffleChannelHandlerContext handlerContext = createHandlerContext();
+    ServerBootstrap bootstrap = new ServerBootstrap();
     bootstrap.group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .option(ChannelOption.SO_BACKLOG,
             conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
                 DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE))
         .childOption(ChannelOption.SO_KEEPALIVE, true)
-        .childHandler(pipelineFact);
+        .childHandler(new ShuffleChannelInitializer(
+            handlerContext,
+            sslFactory)
+        );
     port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-    ch = bootstrap.bind(new InetSocketAddress(port)).sync().channel();
-    accepted.add(ch);
+    Channel ch = bootstrap.bind(new InetSocketAddress(port)).sync().channel();
     port = ((InetSocketAddress)ch.localAddress()).getPort();
+    allChannels.add(ch);
     conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
-    pipelineFact.SHUFFLE.setPort(port);
+    handlerContext.setPort(port);
     LOG.info(getName() + " listening on port " + port);
     super.serviceStart();
-
-    sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
-                                    DEFAULT_SUFFLE_SSL_FILE_BUFFER_SIZE);
-    connectionKeepAliveEnabled =
-        conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
-          DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED);
-    connectionKeepAliveTimeOut =
-        Math.max(1, conf.getInt(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
-          DEFAULT_SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT));
-    mapOutputMetaInfoCacheSize =
-        Math.max(1, conf.getInt(SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE,
-          DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
   }
 
   @Override
   protected void serviceStop() throws Exception {
-    closeChannels(accepted);
+    allChannels.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
 
-    if (pipelineFact != null) {
-      pipelineFact.destroy();
+    if (sslFactory != null) {
+      sslFactory.destroy();
     }
 
     if (stateDb != null) {
       stateDb.close();
     }
     ms.unregisterSource(ShuffleMetrics.class.getSimpleName());
+
+    if (bossGroup != null) {
+      bossGroup.shutdownGracefully();
+    }
+
+    if (workerGroup != null) {
+      workerGroup.shutdownGracefully();
+    }
+
     super.serviceStop();
   }
 
@@ -666,10 +475,6 @@ public class ShuffleHandler extends AuxiliaryService {
     }
   }
 
-  protected Shuffle getShuffle(Configuration conf) {
-    return new Shuffle(conf);
-  }
-
   private void recoverState(Configuration conf) throws IOException {
     Path recoveryRoot = getRecoveryPath();
     if (recoveryRoot != null) {
@@ -845,11 +650,6 @@ public class ShuffleHandler extends AuxiliaryService {
     }
   }
 
-  @VisibleForTesting
-  public void setUseOutboundExceptionHandler(boolean useHandler) {
-    this.useOutboundExceptionHandler = useHandler;
-  }
-
   static class TimeoutHandler extends IdleStateHandler {
     private final int connectionKeepAliveTimeOut;
     private boolean enabledTimeout;
@@ -862,11 +662,6 @@ public class ShuffleHandler extends AuxiliaryService {
       this.connectionKeepAliveTimeOut = connectionKeepAliveTimeOut;
     }
 
-    @VisibleForTesting
-    public int getConnectionKeepAliveTimeOut() {
-      return connectionKeepAliveTimeOut;
-    }
-
     void setEnabledTimeout(boolean enabledTimeout) {
       this.enabledTimeout = enabledTimeout;
     }
@@ -874,607 +669,18 @@ public class ShuffleHandler extends AuxiliaryService {
     @Override
     public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
       if (e.state() == IdleState.WRITER_IDLE && enabledTimeout) {
-        closeAsIdle(ctx.channel(), connectionKeepAliveTimeOut);
-      }
-    }
-  }
-
-  class HttpPipelineFactory extends ChannelInitializer<SocketChannel> {
-    private static final int MAX_CONTENT_LENGTH = 1 << 16;
-
-    final Shuffle SHUFFLE;
-    private SSLFactory sslFactory;
-
-    HttpPipelineFactory(Configuration conf) throws Exception {
-      SHUFFLE = getShuffle(conf);
-      if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
-                          MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
-        LOG.info("Encrypted shuffle is enabled.");
-        sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
-        sslFactory.init();
-      }
-    }
-
-    public Shuffle getSHUFFLE() {
-      return SHUFFLE;
-    }
-
-    public void destroy() {
-      if (sslFactory != null) {
-        sslFactory.destroy();
-      }
-    }
-
-    @Override protected void initChannel(SocketChannel ch) throws Exception {
-      ChannelPipeline pipeline = ch.pipeline();
-      if (sslFactory != null) {
-        pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
-      }
-      pipeline.addLast("decoder", new HttpRequestDecoder());
-      pipeline.addLast("aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
-      pipeline.addLast(ENCODER_HANDLER_NAME, useOutboundLogger ?
-          new LoggingHttpResponseEncoder(false) : new HttpResponseEncoder());
-      pipeline.addLast("chunking", new ChunkedWriteHandler());
-      pipeline.addLast("shuffle", SHUFFLE);
-      if (useOutboundExceptionHandler) {
-        //https://stackoverflow.com/questions/50612403/catch-all-exception-handling-for-outbound-channelhandler
-        pipeline.addLast("outboundExceptionHandler", new ChannelOutboundHandlerAdapter() {
-          @Override
-          public void write(ChannelHandlerContext ctx, Object msg,
-              ChannelPromise promise) throws Exception {
-            promise.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
-            super.write(ctx, msg, promise);
-          }
-        });
-      }
-      pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler(connectionKeepAliveTimeOut));
-      // TODO factor security manager into pipeline
-      // TODO factor out encode/decode to permit binary shuffle
-      // TODO factor out decode of index to permit alt. models
-    }
-  }
-
-  @ChannelHandler.Sharable
-  class Shuffle extends ChannelInboundHandlerAdapter {
-    private final IndexCache indexCache;
-    private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache;
-
-    private int port;
-
-    Shuffle(Configuration conf) {
-      this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
-      this.indexCache = new IndexCache(new JobConf(conf));
-      this.pathCache = CacheBuilder.newBuilder()
-          .expireAfterAccess(conf.getInt(EXPIRE_AFTER_ACCESS_MINUTES,
-              DEFAULT_EXPIRE_AFTER_ACCESS_MINUTES), TimeUnit.MINUTES)
-          .softValues()
-          .concurrencyLevel(conf.getInt(CONCURRENCY_LEVEL,
-              DEFAULT_CONCURRENCY_LEVEL))
-          .removalListener((RemovalListener<AttemptPathIdentifier,
-              AttemptPathInfo>) notification ->
-              LOG.debug("PathCache Eviction: {}, Reason={}",
-                  notification.getKey(), notification.getCause()))
-          .maximumWeight(conf.getInt(MAX_WEIGHT, DEFAULT_MAX_WEIGHT))
-          .weigher((key, value) -> key.jobId.length() + key.user.length() +
-              key.attemptId.length()+ value.indexPath.toString().length() +
-              value.dataPath.toString().length())
-          .build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() {
-            @Override
-            public AttemptPathInfo load(AttemptPathIdentifier key) throws
-                Exception {
-              String base = getBaseLocation(key.jobId, key.user);
-              String attemptBase = base + key.attemptId;
-              Path indexFileName = getAuxiliaryLocalPathHandler()
-                  .getLocalPathForRead(attemptBase + "/" + INDEX_FILE_NAME);
-              Path mapOutputFileName = getAuxiliaryLocalPathHandler()
-                  .getLocalPathForRead(attemptBase + "/" + DATA_FILE_NAME);
-              LOG.debug("Loaded : {} via loader", key);
-              return new AttemptPathInfo(indexFileName, mapOutputFileName);
-            }
-          });
-    }
-
-    public void setPort(int port) {
-      this.port = port;
-    }
-
-    private List<String> splitMaps(List<String> mapq) {
-      if (null == mapq) {
-        return null;
-      }
-      final List<String> ret = new ArrayList<String>();
-      for (String s : mapq) {
-        Collections.addAll(ret, s.split(","));
-      }
-      return ret;
-    }
-
-    @Override
-    public void channelActive(ChannelHandlerContext ctx)
-        throws Exception {
-      NettyChannelHelper.channelActive(ctx.channel());
-      int numConnections = activeConnections.incrementAndGet();
-      if ((maxShuffleConnections > 0) && (numConnections > maxShuffleConnections)) {
-        LOG.info(String.format("Current number of shuffle connections (%d) is " + 
-            "greater than the max allowed shuffle connections (%d)",
-            accepted.size(), maxShuffleConnections));
-
-        Map<String, String> headers = new HashMap<>(1);
-        // notify fetchers to backoff for a while before closing the connection
-        // if the shuffle connection limit is hit. Fetchers are expected to
-        // handle this notification gracefully, that is, not treating this as a
-        // fetch failure.
-        headers.put(RETRY_AFTER_HEADER, String.valueOf(FETCH_RETRY_DELAY));
-        sendError(ctx, "", TOO_MANY_REQ_STATUS, headers);
-      } else {
-        super.channelActive(ctx);
-        accepted.add(ctx.channel());
-        LOG.debug("Added channel: {}, channel id: {}. Accepted number of connections={}",
-            ctx.channel(), ctx.channel().id(), activeConnections.get());
-      }
-    }
-
-    @Override
-    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
-      NettyChannelHelper.channelInactive(ctx.channel());
-      super.channelInactive(ctx);
-      int noOfConnections = activeConnections.decrementAndGet();
-      LOG.debug("New value of Accepted number of connections={}", noOfConnections);
-    }
-
-    @Override
-    public void channelRead(ChannelHandlerContext ctx, Object msg)
-        throws Exception {
-      Channel channel = ctx.channel();
-      LOG.trace("Executing channelRead, channel id: {}", channel.id());
-      HttpRequest request = (HttpRequest) msg;
-      LOG.debug("Received HTTP request: {}, channel id: {}", request, channel.id());
-      if (request.method() != GET) {
-        sendError(ctx, METHOD_NOT_ALLOWED);
-        return;
-      }
-      // Check whether the shuffle version is compatible
-      String shuffleVersion = ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION;
-      String httpHeaderName = ShuffleHeader.DEFAULT_HTTP_HEADER_NAME;
-      if (request.headers() != null) {
-        shuffleVersion = request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION);
-        httpHeaderName = request.headers().get(ShuffleHeader.HTTP_HEADER_NAME);
-        LOG.debug("Received from request header: ShuffleVersion={} header name={}, channel id: {}",
-            shuffleVersion, httpHeaderName, channel.id());
-      }
-      if (request.headers() == null ||
-          !ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(httpHeaderName) ||
-          !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(shuffleVersion)) {
-        sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
-      }
-      final Map<String, List<String>> q =
-          new QueryStringDecoder(request.uri()).parameters();
-      final List<String> keepAliveList = q.get("keepAlive");
-      boolean keepAliveParam = false;
-      if (keepAliveList != null && keepAliveList.size() == 1) {
-        keepAliveParam = Boolean.valueOf(keepAliveList.get(0));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("KeepAliveParam: {} : {}, channel id: {}",
-              keepAliveList, keepAliveParam, channel.id());
-        }
-      }
-      final List<String> mapIds = splitMaps(q.get("map"));
-      final List<String> reduceQ = q.get("reduce");
-      final List<String> jobQ = q.get("job");
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("RECV: " + request.uri() +
-            "\n  mapId: " + mapIds +
-            "\n  reduceId: " + reduceQ +
-            "\n  jobId: " + jobQ +
-            "\n  keepAlive: " + keepAliveParam +
-            "\n  channel id: " + channel.id());
-      }
-
-      if (mapIds == null || reduceQ == null || jobQ == null) {
-        sendError(ctx, "Required param job, map and reduce", BAD_REQUEST);
-        return;
-      }
-      if (reduceQ.size() != 1 || jobQ.size() != 1) {
-        sendError(ctx, "Too many job/reduce parameters", BAD_REQUEST);
-        return;
-      }
-
-      int reduceId;
-      String jobId;
-      try {
-        reduceId = Integer.parseInt(reduceQ.get(0));
-        jobId = jobQ.get(0);
-      } catch (NumberFormatException e) {
-        sendError(ctx, "Bad reduce parameter", BAD_REQUEST);
-        return;
-      } catch (IllegalArgumentException e) {
-        sendError(ctx, "Bad job parameter", BAD_REQUEST);
-        return;
-      }
-      final String reqUri = request.uri();
-      if (null == reqUri) {
-        // TODO? add upstream?
-        sendError(ctx, FORBIDDEN);
-        return;
-      }
-      HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-      try {
-        verifyRequest(jobId, ctx, request, response,
-            new URL("http", "", this.port, reqUri));
-      } catch (IOException e) {
-        LOG.warn("Shuffle failure ", e);
-        sendError(ctx, e.getMessage(), UNAUTHORIZED);
-        return;
-      }
-
-      Map<String, MapOutputInfo> mapOutputInfoMap =
-          new HashMap<String, MapOutputInfo>();
-      ChannelPipeline pipeline = channel.pipeline();
-      TimeoutHandler timeoutHandler =
-          (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
-      timeoutHandler.setEnabledTimeout(false);
-      String user = userRsrc.get(jobId);
-
-      try {
-        populateHeaders(mapIds, jobId, user, reduceId, request,
-            response, keepAliveParam, mapOutputInfoMap);
-      } catch(IOException e) {
-        //HADOOP-15327
-        // Need to send an instance of LastHttpContent to define HTTP
-        // message boundaries.
-        //Sending a HTTP 200 OK + HTTP 500 later (sendError)
-        // is quite a non-standard way of crafting HTTP responses,
-        // but we need to keep backward compatibility.
-        // See more details in jira.
-        writeToChannelAndAddLastHttpContent(channel, response);
-        LOG.error("Shuffle error while populating headers. Channel id: " + channel.id(), e);
-        sendError(ctx, getErrorMessage(e), INTERNAL_SERVER_ERROR);
-        return;
-      }
-      writeToChannel(channel, response).addListener((ChannelFutureListener) future -> {
-        if (future.isSuccess()) {
-          LOG.debug("Written HTTP response object successfully. Channel id: {}", channel.id());
-        } else {
-          LOG.error("Error while writing HTTP response object: {}. " +
-              "Cause: {}, channel id: {}", response, future.cause(), channel.id());
-        }
-      });
-      //Initialize one ReduceContext object per channelRead call
-      boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
-      ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
-          user, mapOutputInfoMap, jobId, keepAlive);
-      for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
-        ChannelFuture nextMap = sendMap(reduceContext);
-        if(nextMap == null) {
-          return;
-        }
-      }
-    }
-
-    /**
-     * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
-     * and increments it. This method is first called by messageReceived()
-     * maxSessionOpenFiles times and then on the completion of every
-     * sendMapOutput operation. This limits the number of open files on a node,
-     * which can get really large(exhausting file descriptors on the NM) if all
-     * sendMapOutputs are called in one go, as was done previous to this change.
-     * @param reduceContext used to call sendMapOutput with correct params.
-     * @return the ChannelFuture of the sendMapOutput, can be null.
-     */
-    public ChannelFuture sendMap(ReduceContext reduceContext) {
-      LOG.trace("Executing sendMap");
-      ChannelFuture nextMap = null;
-      if (reduceContext.getMapsToSend().get() <
-          reduceContext.getMapIds().size()) {
-        int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
-        String mapId = reduceContext.getMapIds().get(nextIndex);
-
-        try {
-          MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
-          if (info == null) {
-            info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
-                reduceContext.getJobId(), reduceContext.getUser());
-          }
-          LOG.trace("Calling sendMapOutput");
-          nextMap = sendMapOutput(
-              reduceContext.getCtx(),
-              reduceContext.getCtx().channel(),
-              reduceContext.getUser(), mapId,
-              reduceContext.getReduceId(), info);
-          if (nextMap == null) {
-            //This can only happen if spill file was not found
-            sendError(reduceContext.getCtx(), NOT_FOUND);
-            LOG.trace("Returning nextMap: null");
-            return null;
-          }
-          nextMap.addListener(new ReduceMapFileCount(reduceContext));
-        } catch (IOException e) {
-          if (e instanceof DiskChecker.DiskErrorException) {
-            LOG.error("Shuffle error: " + e);
-          } else {
-            LOG.error("Shuffle error: ", e);
-          }
-          String errorMessage = getErrorMessage(e);
-          sendError(reduceContext.getCtx(), errorMessage,
-              INTERNAL_SERVER_ERROR);
-          return null;
-        }
-      }
-      return nextMap;
-    }
-
-    private String getErrorMessage(Throwable t) {
-      StringBuffer sb = new StringBuffer(t.getMessage());
-      while (t.getCause() != null) {
-        sb.append(t.getCause().getMessage());
-        t = t.getCause();
-      }
-      return sb.toString();
-    }
-
-    private String getBaseLocation(String jobId, String user) {
-      final JobID jobID = JobID.forName(jobId);
-      final ApplicationId appID =
-          ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
-            jobID.getId());
-      final String baseStr =
-          ContainerLocalizer.USERCACHE + "/" + user + "/"
-              + ContainerLocalizer.APPCACHE + "/"
-              + appID.toString() + "/output" + "/";
-      return baseStr;
-    }
-
-    protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-        String jobId, String user) throws IOException {
-      AttemptPathInfo pathInfo;
-      try {
-        AttemptPathIdentifier identifier = new AttemptPathIdentifier(
-            jobId, user, mapId);
-        pathInfo = pathCache.get(identifier);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Retrieved pathInfo for " + identifier +
-              " check for corresponding loaded messages to determine whether" +
-              " it was loaded or cached");
-        }
-      } catch (ExecutionException e) {
-        if (e.getCause() instanceof IOException) {
-          throw (IOException) e.getCause();
-        } else {
-          throw new RuntimeException(e.getCause());
-        }
-      }
-
-      IndexRecord info = indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
-            ",dataFile=" + pathInfo.dataPath + ", indexFile=" +
-            pathInfo.indexPath);
-      }
-
-      MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info);
-      return outputInfo;
-    }
-
-    protected void populateHeaders(List<String> mapIds, String jobId,
-        String user, int reduce, HttpRequest request, HttpResponse response,
-        boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
-        throws IOException {
-
-      long contentLength = 0;
-      for (String mapId : mapIds) {
-        MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
-        if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
-          mapOutputInfoMap.put(mapId, outputInfo);
-        }
-
-        ShuffleHeader header =
-            new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
-            outputInfo.indexRecord.rawLength, reduce);
-        DataOutputBuffer dob = new DataOutputBuffer();
-        header.write(dob);
-        contentLength += outputInfo.indexRecord.partLength;
-        contentLength += dob.getLength();
-      }
-
-      // Now set the response headers.
-      setResponseHeaders(response, keepAliveParam, contentLength);
-
-      // this audit log is disabled by default,
-      // to turn it on please enable this audit log
-      // on log4j.properties by uncommenting the setting
-      if (AUDITLOG.isDebugEnabled()) {
-        StringBuilder sb = new StringBuilder("shuffle for ");
-        sb.append(jobId).append(" reducer ").append(reduce);
-        sb.append(" length ").append(contentLength);
-        if (AUDITLOG.isTraceEnabled()) {
-          // For trace level logging, append the list of mappers
-          sb.append(" mappers: ").append(mapIds);
-          AUDITLOG.trace(sb.toString());
-        } else {
-          AUDITLOG.debug(sb.toString());
-        }
-      }
-    }
-
-    protected void setResponseHeaders(HttpResponse response,
-        boolean keepAliveParam, long contentLength) {
-      if (!connectionKeepAliveEnabled && !keepAliveParam) {
-        response.headers().set(HttpHeader.CONNECTION.asString(), CONNECTION_CLOSE);
-      } else {
-        response.headers().set(HttpHeader.CONTENT_LENGTH.asString(),
-            String.valueOf(contentLength));
-        response.headers().set(HttpHeader.CONNECTION.asString(),
-            HttpHeader.KEEP_ALIVE.asString());
-        response.headers().set(HttpHeader.KEEP_ALIVE.asString(),
-            "timeout=" + connectionKeepAliveTimeOut);
-        LOG.info("Content Length in shuffle : " + contentLength);
-      }
-    }
-
-    class MapOutputInfo {
-      final Path mapOutputFileName;
-      final IndexRecord indexRecord;
-
-      MapOutputInfo(Path mapOutputFileName, IndexRecord indexRecord) {
-        this.mapOutputFileName = mapOutputFileName;
-        this.indexRecord = indexRecord;
-      }
-    }
-
-    protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-        HttpRequest request, HttpResponse response, URL requestUri)
-        throws IOException {
-      SecretKey tokenSecret = secretManager.retrieveTokenSecret(appid);
-      if (null == tokenSecret) {
-        LOG.info("Request for unknown token {}, channel id: {}", appid, ctx.channel().id());
-        throw new IOException("Could not find jobid");
-      }
-      // encrypting URL
-      String encryptedURL = SecureShuffleUtils.buildMsgFrom(requestUri);
-      // hash from the fetcher
-      String urlHashStr =
-          request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
-      if (urlHashStr == null) {
-        LOG.info("Missing header hash for {}, channel id: {}", appid, ctx.channel().id());
-        throw new IOException("fetcher cannot be authenticated");
-      }
-      if (LOG.isDebugEnabled()) {
-        int len = urlHashStr.length();
-        LOG.debug("Verifying request. encryptedURL:{}, hash:{}, channel id: " +
-                "{}", encryptedURL,
-            urlHashStr.substring(len - len / 2, len - 1), ctx.channel().id());
-      }
-      // verify - throws exception
-      SecureShuffleUtils.verifyReply(urlHashStr, encryptedURL, tokenSecret);
-      // verification passed - encode the reply
-      String reply = SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
-          tokenSecret);
-      response.headers().set(
-          SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
-      // Put shuffle version into http header
-      response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      if (LOG.isDebugEnabled()) {
-        int len = reply.length();
-        LOG.debug("Fetcher request verified. " +
-            "encryptedURL: {}, reply: {}, channel id: {}",
-            encryptedURL, reply.substring(len - len / 2, len - 1),
-            ctx.channel().id());
-      }
-    }
-
-    protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx, Channel ch,
-        String user, String mapId, int reduce, MapOutputInfo mapOutputInfo)
-        throws IOException {
-      final IndexRecord info = mapOutputInfo.indexRecord;
-      final ShuffleHeader header = new ShuffleHeader(mapId, info.partLength, info.rawLength,
-          reduce);
-      final DataOutputBuffer dob = new DataOutputBuffer();
-      header.write(dob);
-      writeToChannel(ch, wrappedBuffer(dob.getData(), 0, dob.getLength()));
-      final File spillfile =
-          new File(mapOutputInfo.mapOutputFileName.toString());
-      RandomAccessFile spill;
-      try {
-        spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
-      } catch (FileNotFoundException e) {
-        LOG.info("{} not found. Channel id: {}", spillfile, ctx.channel().id());
-        return null;
-      }
-      ChannelFuture writeFuture;
-      if (ch.pipeline().get(SslHandler.class) == null) {
-        final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
-            info.startOffset, info.partLength, manageOsCache, readaheadLength,
-            readaheadPool, spillfile.getAbsolutePath(), 
-            shuffleBufferSize, shuffleTransferToAllowed);
-        writeFuture = writeToChannel(ch, partition);
-        writeFuture.addListener(new ChannelFutureListener() {
-            // TODO error handling; distinguish IO/connection failures,
-            //      attribute to appropriate spill output
-          @Override
-          public void operationComplete(ChannelFuture future) {
-            if (future.isSuccess()) {
-              partition.transferSuccessful();
-            }
-            partition.deallocate();
-          }
-        });
-      } else {
-        // HTTPS cannot be done with zero copy.
-        final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
-            info.startOffset, info.partLength, sslFileBufferSize,
-            manageOsCache, readaheadLength, readaheadPool,
-            spillfile.getAbsolutePath());
-        writeFuture = writeToChannel(ch, chunk);
-      }
-      metrics.shuffleConnections.incr();
-      metrics.shuffleOutputBytes.incr(info.partLength); // optimistic
-      return writeFuture;
-    }
-
-    protected void sendError(ChannelHandlerContext ctx,
-        HttpResponseStatus status) {
-      sendError(ctx, "", status);
-    }
-
-    protected void sendError(ChannelHandlerContext ctx, String message,
-        HttpResponseStatus status) {
-      sendError(ctx, message, status, Collections.emptyMap());
-    }
-
-    protected void sendError(ChannelHandlerContext ctx, String msg,
-        HttpResponseStatus status, Map<String, String> headers) {
-      FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status,
-              Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
-      response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
-      // Put shuffle version into http header
-      response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      for (Map.Entry<String, String> header : headers.entrySet()) {
-        response.headers().set(header.getKey(), header.getValue());
-      }
-
-      // Close the connection as soon as the error message is sent.
-      writeToChannelAndClose(ctx.channel(), response);
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
-        throws Exception {
-      Channel ch = ctx.channel();
-      if (cause instanceof TooLongFrameException) {
-        LOG.trace("TooLongFrameException, channel id: {}", ch.id());
-        sendError(ctx, BAD_REQUEST);
-        return;
-      } else if (cause instanceof IOException) {
-        if (cause instanceof ClosedChannelException) {
-          LOG.debug("Ignoring closed channel error, channel id: " + ch.id(), cause);
-          return;
-        }
-        String message = String.valueOf(cause.getMessage());
-        if (IGNORABLE_ERROR_MESSAGE.matcher(message).matches()) {
-          LOG.debug("Ignoring client socket close, channel id: " + ch.id(), cause);
-          return;
-        }
-      }
-
-      LOG.error("Shuffle error. Channel id: " + ch.id(), cause);
-      if (ch.isActive()) {
-        sendError(ctx, INTERNAL_SERVER_ERROR);
+        LOG.debug("Closing channel as writer was idle for {} seconds", connectionKeepAliveTimeOut);
+        ctx.channel().close();
       }
     }
   }
 
+  @SuppressWarnings("checkstyle:VisibilityModifier")
   static class AttemptPathInfo {
     // TODO Change this over to just store local dir indices, instead of the
     // entire path. Far more efficient.
-    private final Path indexPath;
-    private final Path dataPath;
+    public final Path indexPath;
+    public final Path dataPath;
 
     AttemptPathInfo(Path indexPath, Path dataPath) {
       this.indexPath = indexPath;
@@ -1482,10 +688,11 @@ public class ShuffleHandler extends AuxiliaryService {
     }
   }
 
+  @SuppressWarnings("checkstyle:VisibilityModifier")
   static class AttemptPathIdentifier {
-    private final String jobId;
-    private final String user;
-    private final String attemptId;
+    public final String jobId;
+    public final String user;
+    public final String attemptId;
 
     AttemptPathIdentifier(String jobId, String user, String attemptId) {
       this.jobId = jobId;
@@ -1529,4 +736,14 @@ public class ShuffleHandler extends AuxiliaryService {
           '}';
     }
   }
+
+  private static String getBaseLocation(String jobId, String user) {
+    final JobID jobID = JobID.forName(jobId);
+    final ApplicationId appID =
+        ApplicationId.newInstance(Long.parseLong(jobID.getJtIdentifier()),
+            jobID.getId());
+    return ContainerLocalizer.USERCACHE + "/" + user + "/"
+        + ContainerLocalizer.APPCACHE + "/"
+        + appID + "/output" + "/";
+  }
 }
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java
new file mode 100644
index 00000000000..7fedc7bb2dc
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleChannelHandler.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import io.netty.buffer.ByteBufUtil;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.FileRegion;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.handler.codec.MessageToMessageEncoder;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpMethod;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpResponseDecoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpServerCodec;
+import io.netty.handler.codec.http.HttpUtil;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.nio.charset.StandardCharsets;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import javax.crypto.SecretKey;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
+import org.eclipse.jetty.http.HttpHeader;
+import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_LENGTH;
+import static io.netty.handler.codec.http.HttpHeaderNames.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.hadoop.mapred.ShuffleChannelHandler.shuffleHeaderToBytes;
+import static org.apache.hadoop.mapred.ShuffleChannelInitializer.MAX_CONTENT_LENGTH;
+import static org.apache.hadoop.mapred.ShuffleHandler.CONNECTION_CLOSE;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT;
+import static org.apache.hadoop.mapred.ShuffleHandler.TIMEOUT_HANDLER;
+import static org.apache.hadoop.mapreduce.security.SecureShuffleUtils.HTTP_HEADER_URL_HASH;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestShuffleChannelHandler extends TestShuffleHandlerBase {
+  private static final org.slf4j.Logger LOG =
+      LoggerFactory.getLogger(TestShuffleChannelHandler.class);
+
+  @Test
+  public void testGetMapsFileRegion() throws IOException {
+    final ShuffleTest t = createShuffleTest();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    t.testGetAllAttemptsForReduce0NoKeepAlive(shuffle.outboundMessages(), shuffle);
+  }
+
+  @Test
+  public void testGetMapsChunkedFileSSl() throws Exception {
+    final ShuffleTest t = createShuffleTest();
+    final LinkedList<Object> unencryptedMessages = new LinkedList<>();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerSSL(unencryptedMessages);
+    t.testGetAllAttemptsForReduce0NoKeepAlive(unencryptedMessages, shuffle);
+  }
+
+  @Test
+  public void testKeepAlive() throws Exception {
+    // TODO: problems with keep-alive
+    // current behaviour:
+    //  a) mapreduce.shuffle.connection-keep-alive.enable=false
+    //     + client request with &keepAlive=true
+    //     ==> connection is kept
+    //  b) mapreduce.shuffle.connection-keep-alive.enable=true
+    //     ==> connection is kept
+    //
+    // a) seems like a bug
+    // b) might be ok, because it's the default in HTTP/1.1
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "false");
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, "15");
+    final ShuffleTest t = createShuffleTest(conf);
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    t.testKeepAlive(shuffle.outboundMessages(), shuffle);
+  }
+
+  @Test
+  public void testKeepAliveSSL() throws Exception {
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "false");
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, "15");
+    final ShuffleTest t = createShuffleTest(conf);
+    final LinkedList<Object> unencryptedMessages = new LinkedList<>();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerSSL(unencryptedMessages);
+    t.testKeepAlive(unencryptedMessages, shuffle);
+  }
+
+  @Test
+  public void tetKeepAliveTimeout() throws InterruptedException, IOException {
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "true");
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, "1");
+    final ShuffleTest t = createShuffleTest(conf);
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Collections.singletonList(TEST_ATTEMPT_1), true));
+    shuffle.writeInbound(req);
+    t.assertResponse(shuffle.outboundMessages(),
+        t.getExpectedHttpResponse(req, true, 46),
+        t.getAttemptData(new Attempt(TEST_ATTEMPT_1, TEST_DATA_A))
+    );
+    assertTrue("keep-alive", shuffle.isActive());
+
+    TimeUnit.SECONDS.sleep(3);
+    shuffle.runScheduledPendingTasks();
+
+    assertFalse("closed", shuffle.isActive());
+  }
+
+  @Test
+  public void testIncompatibleShuffleVersion() {
+    Configuration conf = new Configuration();
+    conf.set(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, "true");
+    final ShuffleTest t = createShuffleTest(conf);
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Collections.singletonList(TEST_ATTEMPT_1), true));
+    req.headers().set(ShuffleHeader.HTTP_HEADER_NAME, "invalid");
+    shuffle.writeInbound(req);
+
+    final EmbeddedChannel decoder = t.createHttpResponseChannel();
+    for (Object obj : shuffle.outboundMessages()) {
+      decoder.writeInbound(obj);
+    }
+    DefaultHttpResponse actual = decoder.readInbound();
+    assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
+    actual.headers().set(CONTENT_LENGTH, 0);
+
+    assertEquals(getExpectedHttpResponse(HttpResponseStatus.BAD_REQUEST).toString(),
+        actual.toString());
+
+    assertFalse("closed", shuffle.isActive()); // known-issue
+  }
+
+  @Test
+  public void testInvalidMapNoIndexFile() {
+    final ShuffleTest t = createShuffleTest();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Arrays.asList(TEST_ATTEMPT_1, "non-existing"), true));
+    shuffle.writeInbound(req);
+
+    final EmbeddedChannel decoder = t.createHttpResponseChannel();
+    for (Object obj : shuffle.outboundMessages()) {
+      decoder.writeInbound(obj);
+    }
+
+    DefaultHttpResponse actual = decoder.readInbound();
+    assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
+    actual.headers().set(CONTENT_LENGTH, 0);
+
+    assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
+        actual.toString());
+
+    assertFalse("closed", shuffle.isActive());
+  }
+
+  @Test
+  public void testInvalidMapNoDataFile() {
+    final ShuffleTest t = createShuffleTest();
+    final EmbeddedChannel shuffle = t.createShuffleHandlerChannelFileRegion();
+
+    String dataFile = getDataFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2);
+    assertTrue("should delete", new File(dataFile).delete());
+
+    FullHttpRequest req = t.createRequest(getUri(TEST_JOB_ID, 0,
+        Arrays.asList(TEST_ATTEMPT_1, TEST_ATTEMPT_2), false));
+    shuffle.writeInbound(req);
+
+    final EmbeddedChannel decoder = t.createHttpResponseChannel();
+    for (Object obj : shuffle.outboundMessages()) {
+      decoder.writeInbound(obj);
+    }
+
+    DefaultHttpResponse actual = decoder.readInbound();
+    assertFalse(actual.headers().get(CONTENT_LENGTH).isEmpty());
+    actual.headers().set(CONTENT_LENGTH, 0);
+
+    assertEquals(getExpectedHttpResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR).toString(),
+        actual.toString());
+
+    assertFalse("closed", shuffle.isActive());
+  }
+
+  private DefaultHttpResponse getExpectedHttpResponse(HttpResponseStatus status) {
+    DefaultHttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
+    response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
+    response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    response.headers().set(CONTENT_LENGTH, 0);
+    return response;
+  }
+
+  private ShuffleTest createShuffleTest() {
+    return createShuffleTest(new Configuration());
+  }
+
+  private ShuffleTest createShuffleTest(Configuration conf) {
+    return new ShuffleTest(conf);
+  }
+
+  private File getResourceFile(String resourceName) {
+    ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+    return new File(Objects.requireNonNull(classLoader.getResource(resourceName)).getFile());
+  }
+
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  static class Attempt {
+    final String id;
+    final String content;
+
+    Attempt(String attempt, String content) {
+      this.id = attempt;
+      this.content = content;
+    }
+  }
+
+  private class ShuffleTest {
+    private final ShuffleChannelHandlerContext ctx;
+    private final SecretKey shuffleSecretKey;
+
+    ShuffleTest(Configuration conf) {
+      JobConf jobConf = new JobConf(conf);
+      MetricsSystem ms = DefaultMetricsSystem.instance();
+      this.ctx = new ShuffleChannelHandlerContext(conf,
+          new ConcurrentHashMap<>(),
+          new JobTokenSecretManager(),
+          createLoadingCache(),
+          new IndexCache(jobConf),
+          ms.register(new ShuffleHandler.ShuffleMetrics()),
+          new DefaultChannelGroup(GlobalEventExecutor.INSTANCE)
+      );
+
+      JobTokenIdentifier tokenId = new JobTokenIdentifier(new Text(TEST_JOB_ID));
+      Token<JobTokenIdentifier> token = new Token<>(tokenId, ctx.secretManager);
+      shuffleSecretKey = JobTokenSecretManager.createSecretKey(token.getPassword());
+
+      ctx.userRsrc.put(TEST_JOB_ID, TEST_USER);
+      ctx.secretManager.addTokenForJob(TEST_JOB_ID, token);
+    }
+
+    public FullHttpRequest createRequest(String uri) {
+      FullHttpRequest request =
+          new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
+      request.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      request.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      request.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      try {
+        String msgToEncode = SecureShuffleUtils.buildMsgFrom(new URL("http", "", ctx.port, uri));
+        request.headers().set(HTTP_HEADER_URL_HASH,
+            SecureShuffleUtils.hashFromString(msgToEncode, shuffleSecretKey));
+      } catch (IOException e) {
+        e.printStackTrace();
+        fail("Could not create URL hash for test request");
+      }
+
+      return request;
+    }
+
+    public DefaultHttpResponse getExpectedHttpResponse(
+        FullHttpRequest request, boolean keepAlive, long contentLength) {
+      DefaultHttpResponse response =
+          new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
+      HttpHeaders headers = response.headers();
+      try {
+        SecretKey tokenSecret = ctx.secretManager.retrieveTokenSecret(TEST_JOB_ID);
+        headers.set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH,
+            SecureShuffleUtils.generateHash(
+                request.headers().get(HTTP_HEADER_URL_HASH).getBytes(Charsets.UTF_8),
+                tokenSecret));
+      } catch (SecretManager.InvalidToken e) {
+        fail("Could not generate reply hash");
+      }
+      headers.set(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      headers.set(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      if (keepAlive) {
+        headers.set(HttpHeader.CONNECTION.asString(), HttpHeader.KEEP_ALIVE.asString());
+        headers.set(HttpHeader.KEEP_ALIVE.asString(), "timeout=" + ctx.connectionKeepAliveTimeOut);
+      } else {
+        response.headers().set(HttpHeader.CONNECTION.asString(), CONNECTION_CLOSE);
+      }
+      HttpUtil.setContentLength(response, contentLength);
+      return response;
+    }
+
+    private void testGetAllAttemptsForReduce0NoKeepAlive(
+        java.util.Queue<Object> outboundMessages, EmbeddedChannel shuffle) throws IOException {
+      final FullHttpRequest request = createRequest(
+          getUri(TEST_JOB_ID, 0,
+              Arrays.asList(TEST_ATTEMPT_1, TEST_ATTEMPT_2, TEST_ATTEMPT_3), false));
+      shuffle.writeInbound(request);
+      assertResponse(outboundMessages,
+          getExpectedHttpResponse(request, false, 138),
+          getAllAttemptsForReduce0()
+      );
+      assertFalse("no keep-alive", shuffle.isActive());
+    }
+
+    private void testKeepAlive(java.util.Queue<Object> messages,
+                               EmbeddedChannel shuffle) throws IOException {
+      final FullHttpRequest req1 = createRequest(
+          getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true));
+      shuffle.writeInbound(req1);
+      assertResponse(messages,
+          getExpectedHttpResponse(req1, true, 46),
+          getAttemptData(new Attempt(TEST_ATTEMPT_1, TEST_DATA_A))
+      );
+      assertTrue("keep-alive", shuffle.isActive());
+      messages.clear();
+
+      final FullHttpRequest req2 = createRequest(
+          getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_2), true));
+      shuffle.writeInbound(req2);
+      assertResponse(messages,
+          getExpectedHttpResponse(req2, true, 46),
+          getAttemptData(new Attempt(TEST_ATTEMPT_2, TEST_DATA_B))
+      );
+      assertTrue("keep-alive", shuffle.isActive());
+      messages.clear();
+
+      final FullHttpRequest req3 = createRequest(
+          getUri(TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_3), false));
+      shuffle.writeInbound(req3);
+      assertResponse(messages,
+          getExpectedHttpResponse(req3, false, 46),
+          getAttemptData(new Attempt(TEST_ATTEMPT_3, TEST_DATA_C))
+      );
+      assertFalse("no keep-alive", shuffle.isActive());
+    }
+
+    private ArrayList<ByteBuf> getAllAttemptsForReduce0() throws IOException {
+      return getAttemptData(
+          new Attempt(TEST_ATTEMPT_1, TEST_DATA_A),
+          new Attempt(TEST_ATTEMPT_2, TEST_DATA_B),
+          new Attempt(TEST_ATTEMPT_3, TEST_DATA_C)
+      );
+    }
+
+    private ArrayList<ByteBuf> getAttemptData(Attempt... attempts) throws IOException {
+      ArrayList<ByteBuf> data = new ArrayList<>();
+      for (Attempt attempt : attempts) {
+        data.add(shuffleHeaderToBytes(new ShuffleHeader(attempt.id, attempt.content.length(),
+            attempt.content.length() * 2L, 0)));
+        data.add(Unpooled.copiedBuffer(attempt.content.getBytes(StandardCharsets.UTF_8)));
+      }
+      return data;
+    }
+
+    private void assertResponse(java.util.Queue<Object> outboundMessages,
+                                DefaultHttpResponse response,
+                                List<ByteBuf> content) {
+      final EmbeddedChannel decodeChannel = createHttpResponseChannel();
+
+      content.add(LastHttpContent.EMPTY_LAST_CONTENT.content());
+
+      int i = 0;
+      for (Object outboundMessage : outboundMessages) {
+        ByteBuf actualBytes = ((ByteBuf) outboundMessage);
+        String actualHexdump = ByteBufUtil.prettyHexDump(actualBytes);
+        LOG.info("\n{}", actualHexdump);
+
+        decodeChannel.writeInbound(actualBytes);
+        Object obj = decodeChannel.readInbound();
+        LOG.info("Decoded object: {}", obj);
+
+        if (i == 0) {
+          DefaultHttpResponse resp = (DefaultHttpResponse) obj;
+          assertEquals(response.toString(), resp.toString());
+        }
+        if (i > 0 && i <= content.size()) {
+          assertEquals("data should match",
+              ByteBufUtil.prettyHexDump(content.get(i - 1)), actualHexdump);
+        }
+
+        i++;
+      }
+
+      // This check is done after to have better debug logs on failure.
+      assertEquals("all data should match", content.size() + 1, outboundMessages.size());
+    }
+
+    public EmbeddedChannel createShuffleHandlerChannelFileRegion() {
+      final EmbeddedChannel channel = createShuffleHandlerChannel();
+
+      channel.pipeline().addFirst(
+          new MessageToMessageEncoder<FileRegion>() {
+            @Override
+            protected void encode(
+                ChannelHandlerContext cCtx, FileRegion msg, List<Object> out) throws Exception {
+              ByteArrayOutputStream stream = new ByteArrayOutputStream();
+              WritableByteChannel wbc = Channels.newChannel(stream);
+              msg.transferTo(wbc, msg.position());
+              out.add(Unpooled.wrappedBuffer(stream.toByteArray()));
+            }
+          }
+      );
+
+      return channel;
+    }
+
+    public EmbeddedChannel createSSLClient() throws Exception {
+      final EmbeddedChannel channel = createShuffleHandlerChannel();
+
+      SSLContext sc = SSLContext.getInstance("SSL");
+
+      final TrustManager trm = new X509TrustManager() {
+        public X509Certificate[] getAcceptedIssuers() {
+          return null;
+        }
+
+        public void checkClientTrusted(X509Certificate[] certs, String authType) {
+        }
+
+        public void checkServerTrusted(X509Certificate[] certs, String authType) {
+        }
+      };
+
+      sc.init(null, new TrustManager[]{trm}, null);
+
+      final SSLEngine sslEngine = sc.createSSLEngine();
+      sslEngine.setUseClientMode(true);
+      channel.pipeline().addFirst("ssl", new SslHandler(sslEngine));
+
+      return channel;
+    }
+
+    public EmbeddedChannel createShuffleHandlerSSL(java.util.Queue<Object> unencryptedMessages)
+        throws Exception {
+      final EmbeddedChannel channel = createShuffleHandlerChannel();
+      // SelfSignedCertificate was generated manually with:
+      //  openssl req -x509 -newkey rsa:4096 -keyout key.pem \
+      //    -out cert.pem -sha256 -days 3650 -nodes -subj '/CN=localhost'
+      // Because:
+      //  SelfSignedCertificate ssc = new SelfSignedCertificate();
+      // Throws: Failed to generate a self-signed X.509 certificate using Bouncy Castle
+      final SslContext sslCtx = SslContextBuilder
+          .forServer(getResourceFile("cert.pem"), getResourceFile("key.pem"))
+          .build();
+      final SslHandler sslHandler = sslCtx.newHandler(ByteBufAllocator.DEFAULT);
+      channel.pipeline().addFirst("ssl", sslHandler);
+
+      channel.pipeline().addAfter("ssl", "unencrypted", new MessageToMessageEncoder<ByteBuf>() {
+        @Override
+        protected void encode(ChannelHandlerContext cCtx, ByteBuf msg, List<Object> out) {
+          unencryptedMessages.add(msg.copy());
+          out.add(msg.retain());
+        }
+      });
+
+      channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
+        @Override
+        public void userEventTriggered(ChannelHandlerContext cCtx, Object evt) {
+          LOG.info("EVENT: {}", evt);
+        }
+      });
+
+      // SSLHandshake must be done, otherwise messages are buffered
+      final EmbeddedChannel client = createSSLClient();
+      for (Object obj : client.outboundMessages()) {
+        channel.writeInbound(obj);
+      }
+      client.outboundMessages().clear();
+      for (Object obj : channel.outboundMessages()) {
+        client.writeInbound(obj);
+      }
+      channel.outboundMessages().clear();
+      for (Object obj : client.outboundMessages()) {
+        channel.writeInbound(obj);
+      }
+      client.outboundMessages().clear();
+
+      return channel;
+    }
+
+    public EmbeddedChannel createShuffleHandlerChannel() {
+      final EmbeddedChannel channel = new EmbeddedChannel();
+      channel.pipeline().addLast("http", new HttpServerCodec());
+      channel.pipeline().addLast("aggregator", new HttpObjectAggregator(MAX_CONTENT_LENGTH));
+      channel.pipeline().addLast("chunking", new ChunkedWriteHandler());
+      channel.pipeline().addLast("shuffle", new ShuffleChannelHandler(ctx));
+      channel.pipeline().addLast(TIMEOUT_HANDLER,
+          new ShuffleHandler.TimeoutHandler(ctx.connectionKeepAliveTimeOut));
+      return channel;
+    }
+
+    public EmbeddedChannel createHttpResponseChannel() {
+      return new EmbeddedChannel(
+          new HttpResponseDecoder()
+      );
+    }
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 38500032ef3..37a9210286c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -17,78 +17,50 @@
  */
 package org.apache.hadoop.mapred;
 
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.DefaultFileRegion;
+
 import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
-import io.netty.channel.AbstractChannel;
-import io.netty.channel.Channel;
+
 import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPipeline;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.HttpResponse;
-import io.netty.handler.codec.http.HttpResponseEncoder;
 import io.netty.handler.codec.http.HttpResponseStatus;
-import io.netty.handler.timeout.IdleStateEvent;
-import org.apache.hadoop.test.GenericTestUtils;
 
-import static io.netty.buffer.Unpooled.wrappedBuffer;
-import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.mapred.ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY;
+import static org.apache.hadoop.mapreduce.security.SecureShuffleUtils.HTTP_HEADER_URL_HASH;
 import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import static org.junit.Assume.assumeTrue;
-import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
 import java.io.DataInputStream;
-import java.io.EOFException;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.net.Proxy;
-import java.net.Socket;
+import java.net.MalformedURLException;
 import java.net.URL;
-import java.net.SocketAddress;
-import java.net.URLConnection;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.function.Consumer;
-import java.util.zip.CheckedOutputStream;
-import java.util.zip.Checksum;
+
+import javax.crypto.SecretKey;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
@@ -100,761 +72,22 @@ import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.ServiceStateException;
-import org.apache.hadoop.util.DiskChecker;
-import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.Sets;
-import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
 import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
-import org.apache.hadoop.yarn.server.api.AuxiliaryLocalPathHandler;
-import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
 import org.apache.hadoop.yarn.server.records.Version;
-import org.hamcrest.CoreMatchers;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.TestName;
-import org.mockito.Mockito;
-import org.eclipse.jetty.http.HttpHeader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class TestShuffleHandler {
-  static final long MiB = 1024 * 1024; 
+public class TestShuffleHandler extends TestShuffleHandlerBase {
+  static final long MIB = 1024 * 1024;
   private static final Logger LOG =
       LoggerFactory.getLogger(TestShuffleHandler.class);
-  private static final File ABS_LOG_DIR = GenericTestUtils.getTestDir(
-      TestShuffleHandler.class.getSimpleName() + "LocDir");
-  private static final long ATTEMPT_ID = 12345L;
-  private static final long ATTEMPT_ID_2 = 12346L;
-  private static final HttpResponseStatus OK_STATUS = new HttpResponseStatus(200, "OK");
-
-
-  //Control test execution properties with these flags
-  private static final boolean DEBUG_MODE = false;
-  //WARNING: If this is set to true and proxy server is not running, tests will fail!
-  private static final boolean USE_PROXY = false;
-  private static final int HEADER_WRITE_COUNT = 100000;
-  private static final int ARBITRARY_NEGATIVE_TIMEOUT_SECONDS = -100;
-  private static TestExecution TEST_EXECUTION;
-
-  private static class TestExecution {
-    private static final int DEFAULT_KEEP_ALIVE_TIMEOUT_SECONDS = 1;
-    private static final int DEBUG_KEEP_ALIVE_SECONDS = 1000;
-    private static final int DEFAULT_PORT = 0; //random port
-    private static final int FIXED_PORT = 8088;
-    private static final String PROXY_HOST = "127.0.0.1";
-    private static final int PROXY_PORT = 8888;
-    private static final int CONNECTION_DEBUG_TIMEOUT = 1000000;
-    private final boolean debugMode;
-    private final boolean useProxy;
-
-    TestExecution(boolean debugMode, boolean useProxy) {
-      this.debugMode = debugMode;
-      this.useProxy = useProxy;
-    }
-
-    int getKeepAliveTimeout() {
-      if (debugMode) {
-        return DEBUG_KEEP_ALIVE_SECONDS;
-      }
-      return DEFAULT_KEEP_ALIVE_TIMEOUT_SECONDS;
-    }
-
-    HttpURLConnection openConnection(URL url) throws IOException {
-      HttpURLConnection conn;
-      if (useProxy) {
-        Proxy proxy
-            = new Proxy(Proxy.Type.HTTP, new InetSocketAddress(PROXY_HOST, PROXY_PORT));
-        conn = (HttpURLConnection) url.openConnection(proxy);
-      } else {
-        conn = (HttpURLConnection) url.openConnection();
-      }
-      return conn;
-    }
-
-    int shuffleHandlerPort() {
-      if (debugMode) {
-        return FIXED_PORT;
-      } else {
-        return DEFAULT_PORT;
-      }
-    }
-
-    void parameterizeConnection(URLConnection conn) {
-      if (DEBUG_MODE) {
-        conn.setReadTimeout(CONNECTION_DEBUG_TIMEOUT);
-        conn.setConnectTimeout(CONNECTION_DEBUG_TIMEOUT);
-      }
-    }
-  }
-
-  private static class ResponseConfig {
-    private final int headerWriteCount;
-    private final int mapOutputCount;
-    private final int contentLengthOfOneMapOutput;
-    private long headerSize;
-    public long contentLengthOfResponse;
-
-    ResponseConfig(int headerWriteCount, int mapOutputCount,
-        int contentLengthOfOneMapOutput) {
-      if (mapOutputCount <= 0 && contentLengthOfOneMapOutput > 0) {
-        throw new IllegalStateException("mapOutputCount should be at least 1");
-      }
-      this.headerWriteCount = headerWriteCount;
-      this.mapOutputCount = mapOutputCount;
-      this.contentLengthOfOneMapOutput = contentLengthOfOneMapOutput;
-    }
-
-    private void setHeaderSize(long headerSize) {
-      this.headerSize = headerSize;
-      long contentLengthOfAllHeaders = headerWriteCount * headerSize;
-      this.contentLengthOfResponse = computeContentLengthOfResponse(contentLengthOfAllHeaders);
-      LOG.debug("Content-length of all headers: {}", contentLengthOfAllHeaders);
-      LOG.debug("Content-length of one MapOutput: {}", contentLengthOfOneMapOutput);
-      LOG.debug("Content-length of final HTTP response: {}", contentLengthOfResponse);
-    }
-
-    private long computeContentLengthOfResponse(long contentLengthOfAllHeaders) {
-      int mapOutputCountMultiplier = mapOutputCount;
-      if (mapOutputCount == 0) {
-        mapOutputCountMultiplier = 1;
-      }
-      return (contentLengthOfAllHeaders + contentLengthOfOneMapOutput) * mapOutputCountMultiplier;
-    }
-  }
-
-  private enum ShuffleUrlType {
-    SIMPLE, WITH_KEEPALIVE, WITH_KEEPALIVE_MULTIPLE_MAP_IDS, WITH_KEEPALIVE_NO_MAP_IDS
-  }
-
-  private static class InputStreamReadResult {
-    final String asString;
-    int totalBytesRead;
-
-    InputStreamReadResult(byte[] bytes, int totalBytesRead) {
-      this.asString = new String(bytes, StandardCharsets.UTF_8);
-      this.totalBytesRead = totalBytesRead;
-    }
-  }
-
-  private static abstract class AdditionalMapOutputSenderOperations {
-    public abstract ChannelFuture perform(ChannelHandlerContext ctx, Channel ch) throws IOException;
-  }
-
-  private class ShuffleHandlerForKeepAliveTests extends ShuffleHandler {
-    final LastSocketAddress lastSocketAddress = new LastSocketAddress();
-    final ArrayList<Throwable> failures = new ArrayList<>();
-    final ShuffleHeaderProvider shuffleHeaderProvider;
-    final HeaderPopulator headerPopulator;
-    MapOutputSender mapOutputSender;
-    private Consumer<IdleStateEvent> channelIdleCallback;
-    private CustomTimeoutHandler customTimeoutHandler;
-    private boolean failImmediatelyOnErrors = false;
-    private boolean closeChannelOnError = true;
-    private ResponseConfig responseConfig;
-
-    ShuffleHandlerForKeepAliveTests(long attemptId, ResponseConfig responseConfig,
-        Consumer<IdleStateEvent> channelIdleCallback) throws IOException {
-      this(attemptId, responseConfig);
-      this.channelIdleCallback = channelIdleCallback;
-    }
-
-    ShuffleHandlerForKeepAliveTests(long attemptId, ResponseConfig responseConfig)
-        throws IOException {
-      this.responseConfig = responseConfig;
-      this.shuffleHeaderProvider = new ShuffleHeaderProvider(attemptId);
-      this.responseConfig.setHeaderSize(shuffleHeaderProvider.getShuffleHeaderSize());
-      this.headerPopulator = new HeaderPopulator(this, responseConfig, shuffleHeaderProvider, true);
-      this.mapOutputSender = new MapOutputSender(responseConfig, lastSocketAddress,
-          shuffleHeaderProvider);
-      setUseOutboundExceptionHandler(true);
-    }
-
-    public void setFailImmediatelyOnErrors(boolean failImmediatelyOnErrors) {
-      this.failImmediatelyOnErrors = failImmediatelyOnErrors;
-    }
-
-    public void setCloseChannelOnError(boolean closeChannelOnError) {
-      this.closeChannelOnError = closeChannelOnError;
-    }
-
-    @Override
-    protected Shuffle getShuffle(final Configuration conf) {
-      // replace the shuffle handler with one stubbed for testing
-      return new Shuffle(conf) {
-        @Override
-        protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-            String jobId, String user) {
-          return null;
-        }
-        @Override
-        protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-            HttpRequest request, HttpResponse response, URL requestUri) {
-        }
-
-        @Override
-        protected void populateHeaders(List<String> mapIds, String jobId,
-            String user, int reduce, HttpRequest request,
-            HttpResponse response, boolean keepAliveParam,
-            Map<String, MapOutputInfo> infoMap) throws IOException {
-          long contentLength = headerPopulator.populateHeaders(
-              keepAliveParam);
-          super.setResponseHeaders(response, keepAliveParam, contentLength);
-        }
-
-        @Override
-        protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-            Channel ch, String user, String mapId, int reduce,
-            MapOutputInfo info) throws IOException {
-          return mapOutputSender.send(ctx, ch);
-        }
-
-        @Override
-        public void channelActive(ChannelHandlerContext ctx) throws Exception {
-          ctx.pipeline().replace(HttpResponseEncoder.class, ENCODER_HANDLER_NAME,
-              new LoggingHttpResponseEncoder(false));
-          replaceTimeoutHandlerWithCustom(ctx);
-          LOG.debug("Modified pipeline: {}", ctx.pipeline());
-          super.channelActive(ctx);
-        }
-
-        private void replaceTimeoutHandlerWithCustom(ChannelHandlerContext ctx) {
-          TimeoutHandler oldTimeoutHandler =
-              (TimeoutHandler)ctx.pipeline().get(TIMEOUT_HANDLER);
-          int timeoutValue =
-              oldTimeoutHandler.getConnectionKeepAliveTimeOut();
-          customTimeoutHandler = new CustomTimeoutHandler(timeoutValue, channelIdleCallback);
-          ctx.pipeline().replace(TIMEOUT_HANDLER, TIMEOUT_HANDLER, customTimeoutHandler);
-        }
-
-        @Override
-        protected void sendError(ChannelHandlerContext ctx,
-            HttpResponseStatus status) {
-          String message = "Error while processing request. Status: " + status;
-          handleError(ctx, message);
-          if (failImmediatelyOnErrors) {
-            stop();
-          }
-        }
-
-        @Override
-        protected void sendError(ChannelHandlerContext ctx, String message,
-            HttpResponseStatus status) {
-          String errMessage = String.format("Error while processing request. " +
-              "Status: " +
-              "%s, message: %s", status, message);
-          handleError(ctx, errMessage);
-          if (failImmediatelyOnErrors) {
-            stop();
-          }
-        }
-      };
-    }
-
-    private void handleError(ChannelHandlerContext ctx, String message) {
-      LOG.error(message);
-      failures.add(new Error(message));
-      if (closeChannelOnError) {
-        LOG.warn("sendError: Closing channel");
-        ctx.channel().close();
-      }
-    }
-
-    private class CustomTimeoutHandler extends TimeoutHandler {
-      private boolean channelIdle = false;
-      private final Consumer<IdleStateEvent> channelIdleCallback;
-
-      CustomTimeoutHandler(int connectionKeepAliveTimeOut,
-          Consumer<IdleStateEvent> channelIdleCallback) {
-        super(connectionKeepAliveTimeOut);
-        this.channelIdleCallback = channelIdleCallback;
-      }
-
-      @Override
-      public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
-        LOG.debug("Channel idle");
-        this.channelIdle = true;
-        if (channelIdleCallback != null) {
-          LOG.debug("Calling channel idle callback..");
-          channelIdleCallback.accept(e);
-        }
-        super.channelIdle(ctx, e);
-      }
-    }
-  }
-
-  private static class MapOutputSender {
-    private final ResponseConfig responseConfig;
-    private final LastSocketAddress lastSocketAddress;
-    private final ShuffleHeaderProvider shuffleHeaderProvider;
-    private AdditionalMapOutputSenderOperations additionalMapOutputSenderOperations;
-
-    MapOutputSender(ResponseConfig responseConfig, LastSocketAddress lastSocketAddress,
-        ShuffleHeaderProvider shuffleHeaderProvider) {
-      this.responseConfig = responseConfig;
-      this.lastSocketAddress = lastSocketAddress;
-      this.shuffleHeaderProvider = shuffleHeaderProvider;
-    }
-
-    public ChannelFuture send(ChannelHandlerContext ctx, Channel ch) throws IOException {
-      LOG.debug("In MapOutputSender#send");
-      lastSocketAddress.setAddress(ch.remoteAddress());
-      ShuffleHeader header = shuffleHeaderProvider.createNewShuffleHeader();
-      ChannelFuture future = writeHeaderNTimes(ch, header, responseConfig.headerWriteCount);
-      // This is the last operation
-      // It's safe to increment ShuffleHeader counter for better identification
-      shuffleHeaderProvider.incrementCounter();
-      if (additionalMapOutputSenderOperations != null) {
-        return additionalMapOutputSenderOperations.perform(ctx, ch);
-      }
-      return future;
-    }
-
-    private ChannelFuture writeHeaderNTimes(Channel ch, ShuffleHeader header, int iterations)
-        throws IOException {
-      DataOutputBuffer dob = new DataOutputBuffer();
-      for (int i = 0; i < iterations; ++i) {
-        header.write(dob);
-      }
-      LOG.debug("MapOutputSender#writeHeaderNTimes WriteAndFlush big chunk of data, " +
-          "outputBufferSize: " + dob.size());
-      return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-    }
-  }
-
-  private static class ShuffleHeaderProvider {
-    private final long attemptId;
-    private int attemptCounter = 0;
-    private int cachedSize = Integer.MIN_VALUE;
-
-    ShuffleHeaderProvider(long attemptId) {
-      this.attemptId = attemptId;
-    }
-
-    ShuffleHeader createNewShuffleHeader() {
-      return new ShuffleHeader(String.format("attempt_%s_1_m_1_0%s", attemptId, attemptCounter),
-          5678, 5678, 1);
-    }
-
-    void incrementCounter() {
-      attemptCounter++;
-    }
-
-    private int getShuffleHeaderSize() throws IOException {
-      if (cachedSize != Integer.MIN_VALUE) {
-        return cachedSize;
-      }
-      DataOutputBuffer dob = new DataOutputBuffer();
-      ShuffleHeader header = createNewShuffleHeader();
-      header.write(dob);
-      cachedSize = dob.size();
-      return cachedSize;
-    }
-  }
-
-  private static class HeaderPopulator {
-    private final ShuffleHandler shuffleHandler;
-    private final boolean disableKeepAliveConfig;
-    private final ShuffleHeaderProvider shuffleHeaderProvider;
-    private final ResponseConfig responseConfig;
-
-    HeaderPopulator(ShuffleHandler shuffleHandler,
-        ResponseConfig responseConfig,
-        ShuffleHeaderProvider shuffleHeaderProvider,
-        boolean disableKeepAliveConfig) {
-      this.shuffleHandler = shuffleHandler;
-      this.responseConfig = responseConfig;
-      this.disableKeepAliveConfig = disableKeepAliveConfig;
-      this.shuffleHeaderProvider = shuffleHeaderProvider;
-    }
-
-    public long populateHeaders(boolean keepAliveParam) throws IOException {
-      // Send some dummy data (populate content length details)
-      DataOutputBuffer dob = new DataOutputBuffer();
-      for (int i = 0; i < responseConfig.headerWriteCount; ++i) {
-        ShuffleHeader header =
-            shuffleHeaderProvider.createNewShuffleHeader();
-        header.write(dob);
-      }
-      // for testing purpose;
-      // disable connectionKeepAliveEnabled if keepAliveParam is available
-      if (keepAliveParam && disableKeepAliveConfig) {
-        shuffleHandler.connectionKeepAliveEnabled = false;
-      }
-      return responseConfig.contentLengthOfResponse;
-    }
-  }
-
-  private static final class HttpConnectionData {
-    private final Map<String, List<String>> headers;
-    private HttpURLConnection conn;
-    private final int payloadLength;
-    private final SocketAddress socket;
-    private int responseCode = -1;
-
-    private HttpConnectionData(HttpURLConnection conn, int payloadLength,
-        SocketAddress socket) {
-      this.headers = conn.getHeaderFields();
-      this.conn = conn;
-      this.payloadLength = payloadLength;
-      this.socket = socket;
-      try {
-        this.responseCode = conn.getResponseCode();
-      } catch (IOException e) {
-        fail("Failed to read response code from connection: " + conn);
-      }
-    }
-
-    static HttpConnectionData create(HttpURLConnection conn, int payloadLength,
-        SocketAddress socket) {
-      return new HttpConnectionData(conn, payloadLength, socket);
-    }
-  }
-
-  private static final class HttpConnectionAssert {
-    private final HttpConnectionData connData;
-
-    private HttpConnectionAssert(HttpConnectionData connData) {
-      this.connData = connData;
-    }
-
-    static HttpConnectionAssert create(HttpConnectionData connData) {
-      return new HttpConnectionAssert(connData);
-    }
-
-    public static void assertKeepAliveConnectionsAreSame(
-        HttpConnectionHelper httpConnectionHelper) {
-      assertTrue("At least two connection data " +
-          "is required to perform this assertion",
-          httpConnectionHelper.connectionData.size() >= 2);
-      SocketAddress firstAddress = httpConnectionHelper.getConnectionData(0).socket;
-      SocketAddress secondAddress = httpConnectionHelper.getConnectionData(1).socket;
-      Assert.assertNotNull("Initial shuffle address should not be null",
-          firstAddress);
-      Assert.assertNotNull("Keep-Alive shuffle address should not be null",
-          secondAddress);
-      assertEquals("Initial shuffle address and keep-alive shuffle "
-          + "address should be the same", firstAddress, secondAddress);
-    }
-
-    public HttpConnectionAssert expectKeepAliveWithTimeout(long timeout) {
-      assertEquals(HttpURLConnection.HTTP_OK, connData.responseCode);
-      assertHeaderValue(HttpHeader.CONNECTION, HttpHeader.KEEP_ALIVE.asString());
-      assertHeaderValue(HttpHeader.KEEP_ALIVE, "timeout=" + timeout);
-      return this;
-    }
-
-    public HttpConnectionAssert expectBadRequest(long timeout) {
-      assertEquals(HttpURLConnection.HTTP_BAD_REQUEST, connData.responseCode);
-      assertHeaderValue(HttpHeader.CONNECTION, HttpHeader.KEEP_ALIVE.asString());
-      assertHeaderValue(HttpHeader.KEEP_ALIVE, "timeout=" + timeout);
-      return this;
-    }
-
-    public HttpConnectionAssert expectResponseContentLength(long size) {
-      assertEquals(size, connData.payloadLength);
-      return this;
-    }
-
-    private void assertHeaderValue(HttpHeader header, String expectedValue) {
-      List<String> headerList = connData.headers.get(header.asString());
-      Assert.assertNotNull("Got null header value for header: " + header, headerList);
-      Assert.assertFalse("Got empty header value for header: " + header, headerList.isEmpty());
-      assertEquals("Unexpected size of header list for header: " + header, 1,
-          headerList.size());
-      assertEquals(expectedValue, headerList.get(0));
-    }
-  }
-
-  private static class HttpConnectionHelper {
-    private final LastSocketAddress lastSocketAddress;
-    List<HttpConnectionData> connectionData = new ArrayList<>();
-
-    HttpConnectionHelper(LastSocketAddress lastSocketAddress) {
-      this.lastSocketAddress = lastSocketAddress;
-    }
-
-    public void connectToUrls(String[] urls, ResponseConfig responseConfig) throws IOException {
-      connectToUrlsInternal(urls, responseConfig, HttpURLConnection.HTTP_OK);
-    }
-
-    public void connectToUrls(String[] urls, ResponseConfig responseConfig, int expectedHttpStatus)
-        throws IOException {
-      connectToUrlsInternal(urls, responseConfig, expectedHttpStatus);
-    }
-
-    private void connectToUrlsInternal(String[] urls, ResponseConfig responseConfig,
-        int expectedHttpStatus) throws IOException {
-      int requests = urls.length;
-      int expectedConnections = urls.length;
-      LOG.debug("Will connect to URLs: {}", Arrays.toString(urls));
-      for (int reqIdx = 0; reqIdx < requests; reqIdx++) {
-        String urlString = urls[reqIdx];
-        LOG.debug("Connecting to URL: {}", urlString);
-        URL url = new URL(urlString);
-        HttpURLConnection conn = TEST_EXECUTION.openConnection(url);
-        conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-            ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-        conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-            ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-        TEST_EXECUTION.parameterizeConnection(conn);
-        conn.connect();
-        if (expectedHttpStatus == HttpURLConnection.HTTP_BAD_REQUEST) {
-          //Catch exception as error are caught with overridden sendError method
-          //Caught errors will be validated later.
-          try {
-            DataInputStream input = new DataInputStream(conn.getInputStream());
-          } catch (Exception e) {
-            expectedConnections--;
-            continue;
-          }
-        }
-        DataInputStream input = new DataInputStream(conn.getInputStream());
-        LOG.debug("Opened DataInputStream for connection: {}/{}", (reqIdx + 1), requests);
-        ShuffleHeader header = new ShuffleHeader();
-        header.readFields(input);
-        InputStreamReadResult result = readDataFromInputStream(input);
-        result.totalBytesRead += responseConfig.headerSize;
-        int expectedContentLength =
-            Integer.parseInt(conn.getHeaderField(HttpHeader.CONTENT_LENGTH.asString()));
-
-        if (result.totalBytesRead != expectedContentLength) {
-          throw new IOException(String.format("Premature EOF InputStream. " +
-              "Expected content-length: %s, " +
-              "Actual content-length: %s", expectedContentLength, result.totalBytesRead));
-        }
-        connectionData.add(HttpConnectionData
-            .create(conn, result.totalBytesRead, lastSocketAddress.getSocketAddres()));
-        input.close();
-        LOG.debug("Finished all interactions with URL: {}. Progress: {}/{}", url, (reqIdx + 1),
-            requests);
-      }
-      assertEquals(expectedConnections, connectionData.size());
-    }
-
-    void validate(Consumer<HttpConnectionData> connDataValidator) {
-      for (int i = 0; i < connectionData.size(); i++) {
-        LOG.debug("Validating connection data #{}", (i + 1));
-        HttpConnectionData connData = connectionData.get(i);
-        connDataValidator.accept(connData);
-      }
-    }
-
-    HttpConnectionData getConnectionData(int i) {
-      return connectionData.get(i);
-    }
-
-    private static InputStreamReadResult readDataFromInputStream(
-        InputStream input) throws IOException {
-      ByteArrayOutputStream dataStream = new ByteArrayOutputStream();
-      byte[] buffer = new byte[1024];
-      int bytesRead;
-      int totalBytesRead = 0;
-      while ((bytesRead = input.read(buffer)) != -1) {
-        dataStream.write(buffer, 0, bytesRead);
-        totalBytesRead += bytesRead;
-      }
-      LOG.debug("Read total bytes: " + totalBytesRead);
-      dataStream.flush();
-      return new InputStreamReadResult(dataStream.toByteArray(), totalBytesRead);
-    }
-  }
-
-  class ShuffleHandlerForTests extends ShuffleHandler {
-    public final ArrayList<Throwable> failures = new ArrayList<>();
-
-    ShuffleHandlerForTests() {
-      setUseOutboundExceptionHandler(true);
-    }
-
-    ShuffleHandlerForTests(MetricsSystem ms) {
-      super(ms);
-      setUseOutboundExceptionHandler(true);
-    }
-
-    @Override
-    protected Shuffle getShuffle(final Configuration conf) {
-      return new Shuffle(conf) {
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx,
-            Throwable cause) throws Exception {
-          LOG.debug("ExceptionCaught");
-          failures.add(cause);
-          super.exceptionCaught(ctx, cause);
-        }
-      };
-    }
-  }
-
-  class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler {
-    final ArrayList<Throwable> failures = new ArrayList<>();
-
-    private final AuxiliaryLocalPathHandler pathHandler =
-        new TestAuxiliaryLocalPathHandler();
-
-    MockShuffleHandler() {
-      setUseOutboundExceptionHandler(true);
-    }
-
-    MockShuffleHandler(MetricsSystem ms) {
-      super(ms);
-      setUseOutboundExceptionHandler(true);
-    }
 
-    @Override
-    protected Shuffle getShuffle(final Configuration conf) {
-      return new Shuffle(conf) {
-        @Override
-        protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-            HttpRequest request, HttpResponse response, URL requestUri)
-            throws IOException {
-        }
-        @Override
-        protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-            String jobId, String user) {
-          // Do nothing.
-          return null;
-        }
-        @Override
-        protected void populateHeaders(List<String> mapIds, String jobId,
-            String user, int reduce, HttpRequest request,
-            HttpResponse response, boolean keepAliveParam,
-            Map<String, MapOutputInfo> infoMap) {
-          // Do nothing.
-        }
-        @Override
-        protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-            Channel ch, String user, String mapId, int reduce,
-            MapOutputInfo info) throws IOException {
-
-          ShuffleHeader header =
-              new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
-          DataOutputBuffer dob = new DataOutputBuffer();
-          header.write(dob);
-          ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-          dob = new DataOutputBuffer();
-          for (int i = 0; i < 100; ++i) {
-            header.write(dob);
-          }
-          return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx,
-            Throwable cause) throws Exception {
-          LOG.debug("ExceptionCaught");
-          failures.add(cause);
-          super.exceptionCaught(ctx, cause);
-        }
-      };
-    }
-
-    @Override
-    public AuxiliaryLocalPathHandler getAuxiliaryLocalPathHandler() {
-      return pathHandler;
-    }
-  }
-
-  private class TestAuxiliaryLocalPathHandler
-      implements AuxiliaryLocalPathHandler {
-    @Override
-    public Path getLocalPathForRead(String path) {
-      return new Path(ABS_LOG_DIR.getAbsolutePath(), path);
-    }
-
-    @Override
-    public Path getLocalPathForWrite(String path) {
-      return new Path(ABS_LOG_DIR.getAbsolutePath());
-    }
-
-    @Override
-    public Path getLocalPathForWrite(String path, long size) {
-      return new Path(ABS_LOG_DIR.getAbsolutePath());
-    }
-
-    @Override
-    public Iterable<Path> getAllLocalPathsForRead(String path) {
-      ArrayList<Path> paths = new ArrayList<>();
-      paths.add(new Path(ABS_LOG_DIR.getAbsolutePath()));
-      return paths;
-    }
-  }
-
-  private static class MockShuffleHandler2 extends
-      org.apache.hadoop.mapred.ShuffleHandler {
-    final ArrayList<Throwable> failures = new ArrayList<>(1);
-    boolean socketKeepAlive = false;
-
-    MockShuffleHandler2() {
-      setUseOutboundExceptionHandler(true);
-    }
-
-    MockShuffleHandler2(MetricsSystem ms) {
-      super(ms);
-      setUseOutboundExceptionHandler(true);
-    }
-
-    @Override
-    protected Shuffle getShuffle(final Configuration conf) {
-      return new Shuffle(conf) {
-        @Override
-        protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-            HttpRequest request, HttpResponse response, URL requestUri) {
-          SocketChannel channel = (SocketChannel)(ctx.channel());
-          socketKeepAlive = channel.config().isKeepAlive();
-        }
-
-        @Override
-        public void exceptionCaught(ChannelHandlerContext ctx,
-            Throwable cause) throws Exception {
-          LOG.debug("ExceptionCaught");
-          failures.add(cause);
-          super.exceptionCaught(ctx, cause);
-        }
-      };
-    }
-
-    protected boolean isSocketKeepAlive() {
-      return socketKeepAlive;
-    }
-  }
-
-  @Rule
-  public TestName name = new TestName();
-
-  @Before
-  public void setup() {
-    TEST_EXECUTION = new TestExecution(DEBUG_MODE, USE_PROXY);
-  }
-
-  @After
-  public void tearDown() {
-    int port = TEST_EXECUTION.shuffleHandlerPort();
-    if (isPortUsed(port)) {
-      String msg = String.format("Port is being used: %d. " +
-          "Current testcase name: %s",
-          port, name.getMethodName());
-      throw new IllegalStateException(msg);
-    }
-  }
-
-  private static boolean isPortUsed(int port) {
-    if (port == 0) {
-      //Don't check if port is 0
-      return false;
-    }
-    try (Socket ignored = new Socket("localhost", port)) {
-      return true;
-    } catch (IOException e) {
-      LOG.error("Port: {}, port check result: {}", port, e.getMessage());
-      return false;
-    }
-  }
+  private static final HttpResponseStatus OK_STATUS = new HttpResponseStatus(200, "OK");
+  private static final ApplicationId TEST_APP_ID = ApplicationId.newInstance(1111111111111L, 1);
 
   /**
    * Test the validation of ShuffleHandler's meta-data's serialization and
@@ -862,8 +95,8 @@ public class TestShuffleHandler {
    *
    * @throws Exception exception
    */
-  @Test (timeout = 10000)
-  public void testSerializeMeta()  throws Exception {
+  @Test(timeout = 10000)
+  public void testSerializeMeta() throws Exception {
     assertEquals(1, ShuffleHandler.deserializeMetaData(
         ShuffleHandler.serializeMetaData(1)));
     assertEquals(-1, ShuffleHandler.deserializeMetaData(
@@ -877,24 +110,24 @@ public class TestShuffleHandler {
    *
    * @throws Exception exception
    */
-  @Test (timeout = 10000)
+  @Test(timeout = 10000)
   public void testShuffleMetrics() throws Exception {
     MetricsSystem ms = new MetricsSystemImpl();
-    ShuffleHandler sh = new ShuffleHandlerForTests(ms);
+    ShuffleHandler sh = new ShuffleHandler(ms);
     ChannelFuture cf = mock(ChannelFuture.class);
     when(cf.isSuccess()).thenReturn(true).thenReturn(false);
 
     sh.metrics.shuffleConnections.incr();
-    sh.metrics.shuffleOutputBytes.incr(MiB);
+    sh.metrics.shuffleOutputBytes.incr(MIB);
     sh.metrics.shuffleConnections.incr();
-    sh.metrics.shuffleOutputBytes.incr(2*MiB);
+    sh.metrics.shuffleOutputBytes.incr(2 * MIB);
 
-    checkShuffleMetrics(ms, 3*MiB, 0, 0, 2);
+    checkShuffleMetrics(ms, 3 * MIB, 0, 0, 2);
 
     sh.metrics.operationComplete(cf);
     sh.metrics.operationComplete(cf);
 
-    checkShuffleMetrics(ms, 3*MiB, 1, 1, 0);
+    checkShuffleMetrics(ms, 3 * MIB, 1, 1, 0);
 
     sh.stop();
   }
@@ -909,447 +142,32 @@ public class TestShuffleHandler {
     assertGauge("ShuffleConnections", connections, rb);
   }
 
-  /**
-   * Verify client prematurely closing a connection.
-   *
-   * @throws Exception exception.
-   */
-  @Test (timeout = 10000)
-  public void testClientClosesConnection() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    ShuffleHandlerForTests shuffleHandler = new ShuffleHandlerForTests() {
-
-      @Override
-      protected Shuffle getShuffle(Configuration conf) {
-        // replace the shuffle handler with one stubbed for testing
-        return new Shuffle(conf) {
-          @Override
-          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-              String jobId, String user) {
-            return null;
-          }
-          @Override
-          protected void populateHeaders(List<String> mapIds, String jobId,
-              String user, int reduce, HttpRequest request,
-              HttpResponse response, boolean keepAliveParam,
-              Map<String, MapOutputInfo> infoMap) {
-            // Only set response headers and skip everything else
-            // send some dummy value for content-length
-            super.setResponseHeaders(response, keepAliveParam, 100);
-          }
-          @Override
-          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-              HttpRequest request, HttpResponse response, URL requestUri) {
-          }
-          @Override
-          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String mapId, int reduce,
-              MapOutputInfo info)
-                  throws IOException {
-            ShuffleHeader header =
-                new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
-            DataOutputBuffer dob = new DataOutputBuffer();
-            header.write(dob);
-            ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-            dob = new DataOutputBuffer();
-            for (int i = 0; i < 100000; ++i) {
-              header.write(dob);
-            }
-            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-          }
-          @Override
-          protected void sendError(ChannelHandlerContext ctx,
-              HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error());
-              ctx.channel().close();
-            }
-          }
-          @Override
-          protected void sendError(ChannelHandlerContext ctx, String message,
-              HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error());
-              ctx.channel().close();
-            }
-          }
-        };
-      }
-    };
-    shuffleHandler.init(conf);
-    shuffleHandler.start();
-
-    // simulate a reducer that closes early by reading a single shuffle header
-    // then closing the connection
-    URL url = new URL("http://127.0.0.1:"
-        + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-        + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
-    HttpURLConnection conn = TEST_EXECUTION.openConnection(url);
-    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-    conn.connect();
-    DataInputStream input = new DataInputStream(conn.getInputStream());
-    assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
-    assertEquals("close",
-        conn.getHeaderField(HttpHeader.CONNECTION.asString()));
-    ShuffleHeader header = new ShuffleHeader();
-    header.readFields(input);
-    input.close();
-
-    assertEquals("sendError called when client closed connection", 0,
-        shuffleHandler.failures.size());
-    assertEquals("Should have no caught exceptions", Collections.emptyList(),
-        shuffleHandler.failures);
-
-    shuffleHandler.stop();
-  }
-
-  static class LastSocketAddress {
-    SocketAddress lastAddress;
-    void setAddress(SocketAddress lastAddress) {
-      this.lastAddress = lastAddress;
-    }
-    SocketAddress getSocketAddres() {
-      return lastAddress;
-    }
-  }
-
-  @Test(timeout = 10000)
-  public void testKeepAliveInitiallyEnabled() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
-    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
-        TEST_EXECUTION.getKeepAliveTimeout());
-    ResponseConfig responseConfig = new ResponseConfig(HEADER_WRITE_COUNT, 0, 0);
-    ShuffleHandlerForKeepAliveTests shuffleHandler = new ShuffleHandlerForKeepAliveTests(
-        ATTEMPT_ID, responseConfig);
-    testKeepAliveWithHttpOk(conf, shuffleHandler, ShuffleUrlType.SIMPLE,
-        ShuffleUrlType.WITH_KEEPALIVE);
-  }
-
-  @Test(timeout = 1000000)
-  public void testKeepAliveInitiallyEnabledTwoKeepAliveUrls() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
-    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
-        TEST_EXECUTION.getKeepAliveTimeout());
-    ResponseConfig responseConfig = new ResponseConfig(HEADER_WRITE_COUNT, 0, 0);
-    ShuffleHandlerForKeepAliveTests shuffleHandler = new ShuffleHandlerForKeepAliveTests(
-        ATTEMPT_ID, responseConfig);
-    testKeepAliveWithHttpOk(conf, shuffleHandler, ShuffleUrlType.WITH_KEEPALIVE,
-        ShuffleUrlType.WITH_KEEPALIVE);
-  }
-
-  //TODO snemeth implement keepalive test that used properly mocked ShuffleHandler
-  @Test(timeout = 10000)
-  public void testKeepAliveInitiallyDisabled() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, false);
-    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
-        TEST_EXECUTION.getKeepAliveTimeout());
-    ResponseConfig responseConfig = new ResponseConfig(HEADER_WRITE_COUNT, 0, 0);
-    ShuffleHandlerForKeepAliveTests shuffleHandler = new ShuffleHandlerForKeepAliveTests(
-        ATTEMPT_ID, responseConfig);
-    testKeepAliveWithHttpOk(conf, shuffleHandler, ShuffleUrlType.WITH_KEEPALIVE,
-        ShuffleUrlType.WITH_KEEPALIVE);
-  }
-
-  @Test(timeout = 10000)
-  public void testKeepAliveMultipleMapAttemptIds() throws Exception {
-    final int mapOutputContentLength = 11;
-    final int mapOutputCount = 2;
-
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
-    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
-        TEST_EXECUTION.getKeepAliveTimeout());
-    ResponseConfig responseConfig = new ResponseConfig(HEADER_WRITE_COUNT,
-        mapOutputCount, mapOutputContentLength);
-    ShuffleHandlerForKeepAliveTests shuffleHandler = new ShuffleHandlerForKeepAliveTests(
-        ATTEMPT_ID, responseConfig);
-    shuffleHandler.mapOutputSender.additionalMapOutputSenderOperations =
-        new AdditionalMapOutputSenderOperations() {
-          @Override
-          public ChannelFuture perform(ChannelHandlerContext ctx, Channel ch) throws IOException {
-            File tmpFile = File.createTempFile("test", ".tmp");
-            Files.write(tmpFile.toPath(),
-                "dummytestcontent123456".getBytes(StandardCharsets.UTF_8));
-            final DefaultFileRegion partition = new DefaultFileRegion(tmpFile, 0,
-                mapOutputContentLength);
-            LOG.debug("Writing response partition: {}, channel: {}",
-                partition, ch.id());
-            return ch.writeAndFlush(partition)
-                .addListener((ChannelFutureListener) future ->
-                    LOG.debug("Finished Writing response partition: {}, channel: " +
-                        "{}", partition, ch.id()));
-          }
-        };
-    testKeepAliveWithHttpOk(conf, shuffleHandler,
-        ShuffleUrlType.WITH_KEEPALIVE_MULTIPLE_MAP_IDS,
-        ShuffleUrlType.WITH_KEEPALIVE_MULTIPLE_MAP_IDS);
-  }
-
-  @Test(timeout = 10000)
-  public void testKeepAliveWithoutMapAttemptIds() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
-    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
-        TEST_EXECUTION.getKeepAliveTimeout());
-    ResponseConfig responseConfig = new ResponseConfig(HEADER_WRITE_COUNT, 0, 0);
-    ShuffleHandlerForKeepAliveTests shuffleHandler = new ShuffleHandlerForKeepAliveTests(
-        ATTEMPT_ID, responseConfig);
-    shuffleHandler.setFailImmediatelyOnErrors(true);
-    //Closing channels caused Netty to open another channel
-    // so 1 request was handled with 2 separate channels,
-    // ultimately generating 2 * HTTP 400 errors.
-    // We'd like to avoid this so disabling closing the channel here.
-    shuffleHandler.setCloseChannelOnError(false);
-    testKeepAliveWithHttpBadRequest(conf, shuffleHandler, ShuffleUrlType.WITH_KEEPALIVE_NO_MAP_IDS);
-  }
-
-  private void testKeepAliveWithHttpOk(
-      Configuration conf,
-      ShuffleHandlerForKeepAliveTests shuffleHandler,
-      ShuffleUrlType... shuffleUrlTypes) throws IOException {
-    testKeepAliveWithHttpStatus(conf, shuffleHandler, shuffleUrlTypes, HttpURLConnection.HTTP_OK);
-  }
-
-  private void testKeepAliveWithHttpBadRequest(
-      Configuration conf,
-      ShuffleHandlerForKeepAliveTests shuffleHandler,
-      ShuffleUrlType... shuffleUrlTypes) throws IOException {
-    testKeepAliveWithHttpStatus(conf, shuffleHandler, shuffleUrlTypes,
-        HttpURLConnection.HTTP_BAD_REQUEST);
-  }
-
-  private void testKeepAliveWithHttpStatus(Configuration conf,
-      ShuffleHandlerForKeepAliveTests shuffleHandler,
-      ShuffleUrlType[] shuffleUrlTypes,
-      int expectedHttpStatus) throws IOException {
-    if (expectedHttpStatus != HttpURLConnection.HTTP_BAD_REQUEST) {
-      assertTrue("Expected at least two shuffle URL types ",
-          shuffleUrlTypes.length >= 2);
-    }
-    shuffleHandler.init(conf);
-    shuffleHandler.start();
-
-    String[] urls = new String[shuffleUrlTypes.length];
-    for (int i = 0; i < shuffleUrlTypes.length; i++) {
-      ShuffleUrlType url = shuffleUrlTypes[i];
-      if (url == ShuffleUrlType.SIMPLE) {
-        urls[i] = getShuffleUrl(shuffleHandler, ATTEMPT_ID, ATTEMPT_ID);
-      } else if (url == ShuffleUrlType.WITH_KEEPALIVE) {
-        urls[i] = getShuffleUrlWithKeepAlive(shuffleHandler, ATTEMPT_ID, ATTEMPT_ID);
-      } else if (url == ShuffleUrlType.WITH_KEEPALIVE_MULTIPLE_MAP_IDS) {
-        urls[i] = getShuffleUrlWithKeepAlive(shuffleHandler, ATTEMPT_ID, ATTEMPT_ID, ATTEMPT_ID_2);
-      } else if (url == ShuffleUrlType.WITH_KEEPALIVE_NO_MAP_IDS) {
-        urls[i] = getShuffleUrlWithKeepAlive(shuffleHandler, ATTEMPT_ID);
-      }
-    }
-    HttpConnectionHelper connHelper;
-    try {
-      connHelper = new HttpConnectionHelper(shuffleHandler.lastSocketAddress);
-      connHelper.connectToUrls(urls, shuffleHandler.responseConfig, expectedHttpStatus);
-      if (expectedHttpStatus == HttpURLConnection.HTTP_BAD_REQUEST) {
-        assertEquals(1, shuffleHandler.failures.size());
-        assertThat(shuffleHandler.failures.get(0).getMessage(),
-            CoreMatchers.containsString("Status: 400 Bad Request, " +
-                "message: Required param job, map and reduce"));
-      }
-    } finally {
-      shuffleHandler.stop();
-    }
-
-    //Verify expectations
-    int configuredTimeout = TEST_EXECUTION.getKeepAliveTimeout();
-    int expectedTimeout = configuredTimeout < 0 ? 1 : configuredTimeout;
-
-    connHelper.validate(connData -> {
-      HttpConnectionAssert.create(connData)
-          .expectKeepAliveWithTimeout(expectedTimeout)
-          .expectResponseContentLength(shuffleHandler.responseConfig.contentLengthOfResponse);
-    });
-    if (expectedHttpStatus == HttpURLConnection.HTTP_OK) {
-      HttpConnectionAssert.assertKeepAliveConnectionsAreSame(connHelper);
-      assertEquals("Unexpected ShuffleHandler failure", Collections.emptyList(),
-          shuffleHandler.failures);
-    }
-  }
-
-  @Test(timeout = 10000)
-  public void testSocketKeepAlive() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
-    // try setting to negative keep alive timeout.
-    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT,
-        ARBITRARY_NEGATIVE_TIMEOUT_SECONDS);
-    HttpURLConnection conn = null;
-    MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
-    AuxiliaryLocalPathHandler pathHandler =
-        mock(AuxiliaryLocalPathHandler.class);
-    when(pathHandler.getLocalPathForRead(anyString())).thenThrow(
-        new DiskChecker.DiskErrorException("Test"));
-    shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
-    try {
-      shuffleHandler.init(conf);
-      shuffleHandler.start();
-
-      String shuffleBaseURL = "http://127.0.0.1:"
-              + shuffleHandler.getConfig().get(
-                ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
-      URL url =
-          new URL(shuffleBaseURL + "/mapOutput?job=job_12345_1&reduce=1&"
-              + "map=attempt_12345_1_m_1_0");
-      conn = TEST_EXECUTION.openConnection(url);
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      conn.connect();
-      int rc = conn.getResponseCode();
-      conn.getInputStream();
-      assertEquals(HttpURLConnection.HTTP_OK, rc);
-      assertTrue("socket should be set KEEP_ALIVE",
-          shuffleHandler.isSocketKeepAlive());
-    } finally {
-      if (conn != null) {
-        conn.disconnect();
-      }
-      shuffleHandler.stop();
-    }
-    assertEquals("Should have no caught exceptions",
-        Collections.emptyList(), shuffleHandler.failures);
-  }
-
-  /**
-   * Simulate a reducer that sends an invalid shuffle-header - sometimes a wrong
-   * header_name and sometimes a wrong version.
-   * 
-   * @throws Exception exception
-   */
-  @Test (timeout = 10000)
-  public void testIncompatibleShuffleVersion() throws Exception {
-    final int failureNum = 3;
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    ShuffleHandler shuffleHandler = new ShuffleHandlerForTests();
-    shuffleHandler.init(conf);
-    shuffleHandler.start();
-
-    // simulate a reducer that closes early by reading a single shuffle header
-    // then closing the connection
-    URL url = new URL("http://127.0.0.1:"
-        + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-        + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_1_0");
-    for (int i = 0; i < failureNum; ++i) {
-      HttpURLConnection conn = TEST_EXECUTION.openConnection(url);
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          i == 0 ? "mapreduce" : "other");
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          i == 1 ? "1.0.0" : "1.0.1");
-      conn.connect();
-      assertEquals(
-          HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
-    }
-
-    shuffleHandler.stop();
-    shuffleHandler.close();
-  }
-
   /**
    * Validate the limit on number of shuffle connections.
-   * 
+   *
    * @throws Exception exception
    */
-  @Test (timeout = 10000)
+  @Test(timeout = 10000)
   public void testMaxConnections() throws Exception {
-    final ArrayList<Throwable> failures = new ArrayList<>();
     final int maxAllowedConnections = 3;
     final int notAcceptedConnections = 1;
     final int connAttempts = maxAllowedConnections + notAcceptedConnections;
-    
+
     Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, maxAllowedConnections);
-    ShuffleHandler shuffleHandler = new ShuffleHandler() {
-      @Override
-      protected Shuffle getShuffle(Configuration conf) {
-        // replace the shuffle handler with one stubbed for testing
-        return new Shuffle(conf) {
-          @Override
-          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
-              String jobId, String user) {
-            // Do nothing.
-            return null;
-          }
-          @Override
-          protected void populateHeaders(List<String> mapIds, String jobId,
-              String user, int reduce, HttpRequest request,
-              HttpResponse response, boolean keepAliveParam,
-              Map<String, MapOutputInfo> infoMap) {
-            // Do nothing.
-          }
-          @Override
-          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-              HttpRequest request, HttpResponse response, URL requestUri) {
-            // Do nothing.
-          }
-          @Override
-          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String mapId, int reduce,
-              MapOutputInfo info)
-                  throws IOException {
-            // send a shuffle header and a lot of data down the channel
-            // to trigger a broken pipe
-            ShuffleHeader header =
-                new ShuffleHeader("dummy_header", 5678, 5678, 1);
-            DataOutputBuffer dob = new DataOutputBuffer();
-            header.write(dob);
-            ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-            dob = new DataOutputBuffer();
-            for (int i=0; i<100000; ++i) {
-              header.write(dob);
-            }
-            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-          }
-
-          @Override
-          public void exceptionCaught(ChannelHandlerContext ctx,
-              Throwable cause) throws Exception {
-            LOG.debug("ExceptionCaught");
-            failures.add(cause);
-            super.exceptionCaught(ctx, cause);
-          }
-        };
-      }
-    };
-    shuffleHandler.setUseOutboundExceptionHandler(true);
+    ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock();
     shuffleHandler.init(conf);
     shuffleHandler.start();
+    final String port = shuffleHandler.getConfig().get(SHUFFLE_PORT_CONFIG_KEY);
+    final SecretKey secretKey = shuffleHandler.addTestApp();
 
     // setup connections
     HttpURLConnection[] conns = new HttpURLConnection[connAttempts];
 
     for (int i = 0; i < connAttempts; i++) {
-      String urlString = "http://127.0.0.1:" 
-           + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-           + "/mapOutput?job=job_12345_1&reduce=1&map=attempt_12345_1_m_"
-           + i + "_0";
-      URL url = new URL(urlString);
-      conns[i] = TEST_EXECUTION.openConnection(url);
-      conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      conns[i].setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      conns[i] = createRequest(
+          geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true),
+          secretKey);
     }
 
     // Try to open numerous connections
@@ -1381,7 +199,7 @@ public class TestShuffleHandler {
             HttpURLConnection.HTTP_OK,
             ShuffleHandler.TOO_MANY_REQ_STATUS.code()),
         mapOfConnections.keySet());
-    
+
     List<HttpURLConnection> successfulConnections =
         mapOfConnections.get(HttpURLConnection.HTTP_OK);
     assertEquals(String.format("Expected exactly %d requests " +
@@ -1405,307 +223,196 @@ public class TestShuffleHandler {
     assertTrue("The backoff value cannot be negative.", backoff > 0);
 
     shuffleHandler.stop();
+  }
+
+  /**
+   * Validate the limit on number of shuffle connections.
+   *
+   * @throws Exception exception
+   */
+  @Test(timeout = 10000)
+  public void testKeepAlive() throws Exception {
+    Configuration conf = new Configuration();
+    ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock();
+    shuffleHandler.init(conf);
+    shuffleHandler.start();
+    final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+    final SecretKey secretKey = shuffleHandler.addTestApp();
+
+    HttpURLConnection conn1 = createRequest(
+        geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), true),
+        secretKey);
+    conn1.connect();
+    verifyContent(conn1, TEST_DATA_A);
+
+    HttpURLConnection conn2 = createRequest(
+        geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_2), true),
+        secretKey);
+    conn2.connect();
+    verifyContent(conn2, TEST_DATA_B);
+
+    HttpURLConnection conn3 = createRequest(
+        geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_3), false),
+        secretKey);
+    conn3.connect();
+    verifyContent(conn3, TEST_DATA_C);
+
+    shuffleHandler.stop();
 
-    //It's okay to get a ClosedChannelException.
-    //All other kinds of exceptions means something went wrong
-    assertEquals("Should have no caught exceptions",
-        Collections.emptyList(), failures.stream()
-            .filter(f -> !(f instanceof ClosedChannelException))
-            .collect(toList()));
+    List<String> actual = matchLogs("connections=\\d+");
+    assertEquals("only one connection was used",
+        Arrays.asList("connections=1", "connections=0"), actual);
   }
 
   /**
    * Validate the ownership of the map-output files being pulled in. The
    * local-file-system owner of the file should match the user component in the
    *
-   * @throws Exception exception
+   * @throws IOException exception
    */
   @Test(timeout = 100000)
   public void testMapFileAccess() throws IOException {
-    final ArrayList<Throwable> failures = new ArrayList<>();
     // This will run only in NativeIO is enabled as SecureIOUtils need it
     assumeTrue(NativeIO.isAvailable());
     Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
-    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-        "kerberos");
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos");
     UserGroupInformation.setConfiguration(conf);
-    conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
-    ApplicationId appId = ApplicationId.newInstance(12345, 1);
-    LOG.info(appId.toString());
-    String appAttemptId = "attempt_12345_1_m_1_0";
-    String user = "randomUser";
-    String reducerId = "0";
-    List<File> fileMap = new ArrayList<>();
-    createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId,
-        conf, fileMap);
-    ShuffleHandler shuffleHandler = new ShuffleHandler() {
-      @Override
-      protected Shuffle getShuffle(Configuration conf) {
-        // replace the shuffle handler with one stubbed for testing
-        return new Shuffle(conf) {
-
-          @Override
-          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-              HttpRequest request, HttpResponse response, URL requestUri) {
-            // Do nothing.
-          }
 
-          @Override
-          public void exceptionCaught(ChannelHandlerContext ctx,
-              Throwable cause) throws Exception {
-            LOG.debug("ExceptionCaught");
-            failures.add(cause);
-            super.exceptionCaught(ctx, cause);
-          }
-
-          @Override
-          public void channelActive(ChannelHandlerContext ctx) throws Exception {
-            ctx.pipeline().replace(HttpResponseEncoder.class,
-                "loggingResponseEncoder",
-                new LoggingHttpResponseEncoder(false));
-            LOG.debug("Modified pipeline: {}", ctx.pipeline());
-            super.channelActive(ctx);
-          }
-        };
-      }
-    };
-    AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
-    shuffleHandler.setUseOutboundExceptionHandler(true);
-    shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
+    ShuffleHandlerMock shuffleHandler = new ShuffleHandlerMock();
     shuffleHandler.init(conf);
     try {
       shuffleHandler.start();
-      DataOutputBuffer outputBuffer = new DataOutputBuffer();
-      outputBuffer.reset();
-      Token<JobTokenIdentifier> jt =
-          new Token<>("identifier".getBytes(),
-              "password".getBytes(), new Text(user), new Text("shuffleService"));
-      jt.write(outputBuffer);
-      shuffleHandler
-          .initializeApplication(new ApplicationInitializationContext(user,
-              appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
-              outputBuffer.getLength())));
-      URL url =
-          new URL(
-              "http://127.0.0.1:"
-                  + shuffleHandler.getConfig().get(
-                      ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-                  + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
-                  + "&map=attempt_12345_1_m_1_0");
-      HttpURLConnection conn = TEST_EXECUTION.openConnection(url);
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      final String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+      final SecretKey secretKey = shuffleHandler.addTestApp();
+
+      HttpURLConnection conn = createRequest(
+          geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), false),
+          secretKey);
       conn.connect();
-      DataInputStream is = new DataInputStream(conn.getInputStream());
-      InputStreamReadResult result = HttpConnectionHelper.readDataFromInputStream(is);
-      String receivedString = result.asString;
+      BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream()));
+      StringBuilder builder = new StringBuilder();
+      String inputLine;
+      while ((inputLine = in.readLine()) != null) {
+        System.out.println(inputLine);
+        builder.append(inputLine);
+      }
+      String receivedString = builder.toString();
 
       //Retrieve file owner name
-      FileInputStream fis = new FileInputStream(fileMap.get(0));
-      String owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner();
-      fis.close();
+      String indexFilePath = getIndexFile(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1);
+      String owner;
+      try (FileInputStream fis = new FileInputStream(indexFilePath)) {
+        owner = NativeIO.POSIX.getFstat(fis.getFD()).getOwner();
+      }
 
       String message =
-          "Owner '" + owner + "' for path " + fileMap.get(0).getAbsolutePath()
-              + " did not match expected owner '" + user + "'";
+          "Owner '" + owner + "' for path " + indexFilePath
+              + " did not match expected owner '" + TEST_USER + "'";
       assertTrue(String.format("Received string '%s' should contain " +
-          "message '%s'", receivedString, message),
+              "message '%s'", receivedString, message),
           receivedString.contains(message));
       assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
       LOG.info("received: " + receivedString);
       assertNotEquals("", receivedString);
     } finally {
       shuffleHandler.stop();
-      FileUtil.fullyDelete(ABS_LOG_DIR);
-    }
-
-    assertEquals("Should have no caught exceptions",
-        Collections.emptyList(), failures);
-  }
-
-  private static void createShuffleHandlerFiles(File logDir, String user,
-      String appId, String appAttemptId, Configuration conf,
-      List<File> fileMap) throws IOException {
-    String attemptDir =
-        StringUtils.join(Path.SEPARATOR,
-            new String[] {logDir.getAbsolutePath(),
-                ContainerLocalizer.USERCACHE, user,
-                ContainerLocalizer.APPCACHE, appId, "output", appAttemptId });
-    File appAttemptDir = new File(attemptDir);
-    appAttemptDir.mkdirs();
-    System.out.println(appAttemptDir.getAbsolutePath());
-    File indexFile = new File(appAttemptDir, "file.out.index");
-    fileMap.add(indexFile);
-    createIndexFile(indexFile, conf);
-    File mapOutputFile = new File(appAttemptDir, "file.out");
-    fileMap.add(mapOutputFile);
-    createMapOutputFile(mapOutputFile, conf);
-  }
-
-  private static void createMapOutputFile(File mapOutputFile, Configuration conf)
-          throws IOException {
-    FileOutputStream out = new FileOutputStream(mapOutputFile);
-    out.write("Creating new dummy map output file. Used only for testing"
-        .getBytes());
-    out.flush();
-    out.close();
-  }
-
-  private static void createIndexFile(File indexFile, Configuration conf)
-      throws IOException {
-    if (indexFile.exists()) {
-      System.out.println("Deleting existing file");
-      indexFile.delete();
     }
-    indexFile.createNewFile();
-    FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
-        new Path(indexFile.getAbsolutePath()));
-    Checksum crc = new PureJavaCrc32();
-    crc.reset();
-    CheckedOutputStream chk = new CheckedOutputStream(output, crc);
-    String msg = "Writing new index file. This file will be used only " +
-        "for the testing.";
-    chk.write(Arrays.copyOf(msg.getBytes(),
-        MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
-    output.writeLong(chk.getChecksum().getValue());
-    output.close();
   }
 
   @Test
   public void testRecovery() throws IOException {
-    final String user = "someuser";
-    final ApplicationId appId = ApplicationId.newInstance(12345, 1);
-    final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
     final File tmpDir = new File(System.getProperty("test.build.data",
         System.getProperty("java.io.tmpdir")),
         TestShuffleHandler.class.getName());
-    ShuffleHandler shuffle = new ShuffleHandlerForTests();
-    AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
-    shuffle.setAuxiliaryLocalPathHandler(pathHandler);
+    ShuffleHandlerMock shuffle = new ShuffleHandlerMock();
     Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
-    conf.set(YarnConfiguration.NM_LOCAL_DIRS,
-        ABS_LOG_DIR.getAbsolutePath());
     // emulate aux services startup with recovery enabled
     shuffle.setRecoveryPath(new Path(tmpDir.toString()));
-    tmpDir.mkdirs();
+    assertTrue(tmpDir.mkdirs());
     try {
       shuffle.init(conf);
       shuffle.start();
-
-      // set up a shuffle token for an application
-      DataOutputBuffer outputBuffer = new DataOutputBuffer();
-      outputBuffer.reset();
-      Token<JobTokenIdentifier> jt = new Token<>(
-          "identifier".getBytes(), "password".getBytes(), new Text(user),
-          new Text("shuffleService"));
-      jt.write(outputBuffer);
-      shuffle.initializeApplication(new ApplicationInitializationContext(user,
-          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
-            outputBuffer.getLength())));
+      final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+      final SecretKey secretKey = shuffle.addTestApp();
 
       // verify we are authorized to shuffle
-      int rc = getShuffleResponseCode(shuffle, jt);
+      int rc = getShuffleResponseCode(port, secretKey);
       assertEquals(HttpURLConnection.HTTP_OK, rc);
 
       // emulate shuffle handler restart
       shuffle.close();
-      shuffle = new ShuffleHandlerForTests();
-      shuffle.setAuxiliaryLocalPathHandler(pathHandler);
+      shuffle = new ShuffleHandlerMock();
       shuffle.setRecoveryPath(new Path(tmpDir.toString()));
       shuffle.init(conf);
       shuffle.start();
 
       // verify we are still authorized to shuffle to the old application
-      rc = getShuffleResponseCode(shuffle, jt);
+      rc = getShuffleResponseCode(port, secretKey);
       assertEquals(HttpURLConnection.HTTP_OK, rc);
 
       // shutdown app and verify access is lost
-      shuffle.stopApplication(new ApplicationTerminationContext(appId));
-      rc = getShuffleResponseCode(shuffle, jt);
+      shuffle.stopApplication(new ApplicationTerminationContext(TEST_APP_ID));
+      rc = getShuffleResponseCode(port, secretKey);
       assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
 
       // emulate shuffle handler restart
       shuffle.close();
-      shuffle = new ShuffleHandlerForTests();
+      shuffle = new ShuffleHandlerMock();
       shuffle.setRecoveryPath(new Path(tmpDir.toString()));
       shuffle.init(conf);
       shuffle.start();
 
       // verify we still don't have access
-      rc = getShuffleResponseCode(shuffle, jt);
+      rc = getShuffleResponseCode(port, secretKey);
       assertEquals(HttpURLConnection.HTTP_UNAUTHORIZED, rc);
     } finally {
-      if (shuffle != null) {
-        shuffle.close();
-      }
+      shuffle.close();
       FileUtil.fullyDelete(tmpDir);
     }
   }
-  
+
   @Test
   public void testRecoveryFromOtherVersions() throws IOException {
-    final String user = "someuser";
-    final ApplicationId appId = ApplicationId.newInstance(12345, 1);
     final File tmpDir = new File(System.getProperty("test.build.data",
         System.getProperty("java.io.tmpdir")),
         TestShuffleHandler.class.getName());
     Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
-    ShuffleHandler shuffle = new ShuffleHandlerForTests();
-    AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
-    shuffle.setAuxiliaryLocalPathHandler(pathHandler);
-    conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
+    ShuffleHandlerMock shuffle = new ShuffleHandlerMock();
     // emulate aux services startup with recovery enabled
     shuffle.setRecoveryPath(new Path(tmpDir.toString()));
-    tmpDir.mkdirs();
+    assertTrue(tmpDir.mkdirs());
     try {
       shuffle.init(conf);
       shuffle.start();
-
-      // set up a shuffle token for an application
-      DataOutputBuffer outputBuffer = new DataOutputBuffer();
-      outputBuffer.reset();
-      Token<JobTokenIdentifier> jt = new Token<>(
-          "identifier".getBytes(), "password".getBytes(), new Text(user),
-          new Text("shuffleService"));
-      jt.write(outputBuffer);
-      shuffle.initializeApplication(new ApplicationInitializationContext(user,
-          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
-              outputBuffer.getLength())));
+      final String port = shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
+      final SecretKey secretKey = shuffle.addTestApp();
 
       // verify we are authorized to shuffle
-      int rc = getShuffleResponseCode(shuffle, jt);
+      int rc = getShuffleResponseCode(port, secretKey);
       assertEquals(HttpURLConnection.HTTP_OK, rc);
 
       // emulate shuffle handler restart
       shuffle.close();
-      shuffle = new ShuffleHandlerForTests();
-      shuffle.setAuxiliaryLocalPathHandler(pathHandler);
+      shuffle = new ShuffleHandlerMock();
       shuffle.setRecoveryPath(new Path(tmpDir.toString()));
       shuffle.init(conf);
       shuffle.start();
 
       // verify we are still authorized to shuffle to the old application
-      rc = getShuffleResponseCode(shuffle, jt);
+      rc = getShuffleResponseCode(port, secretKey);
       assertEquals(HttpURLConnection.HTTP_OK, rc);
       Version version = Version.newInstance(1, 0);
       assertEquals(version, shuffle.getCurrentVersion());
-    
+
       // emulate shuffle handler restart with compatible version
       Version version11 = Version.newInstance(1, 1);
       // update version info before close shuffle
       shuffle.storeVersion(version11);
       assertEquals(version11, shuffle.loadVersion());
       shuffle.close();
-      shuffle = new ShuffleHandlerForTests();
-      shuffle.setAuxiliaryLocalPathHandler(pathHandler);
+      shuffle = new ShuffleHandlerMock();
       shuffle.setRecoveryPath(new Path(tmpDir.toString()));
       shuffle.init(conf);
       shuffle.start();
@@ -1713,309 +420,99 @@ public class TestShuffleHandler {
       // successfully.
       assertEquals(version, shuffle.loadVersion());
       // verify we are still authorized to shuffle to the old application
-      rc = getShuffleResponseCode(shuffle, jt);
+      rc = getShuffleResponseCode(port, secretKey);
       assertEquals(HttpURLConnection.HTTP_OK, rc);
-    
+
       // emulate shuffle handler restart with incompatible version
       Version version21 = Version.newInstance(2, 1);
       shuffle.storeVersion(version21);
       assertEquals(version21, shuffle.loadVersion());
       shuffle.close();
-      shuffle = new ShuffleHandlerForTests();
-      shuffle.setAuxiliaryLocalPathHandler(pathHandler);
+      shuffle = new ShuffleHandlerMock();
       shuffle.setRecoveryPath(new Path(tmpDir.toString()));
       shuffle.init(conf);
-    
+
       try {
         shuffle.start();
         fail("Incompatible version, should expect fail here.");
       } catch (ServiceStateException e) {
         assertTrue("Exception message mismatch",
             e.getMessage().contains("Incompatible version for state DB schema:"));
-      } 
-    
-    } finally {
-      if (shuffle != null) {
-        shuffle.close();
       }
+
+    } finally {
+      shuffle.close();
       FileUtil.fullyDelete(tmpDir);
     }
   }
 
-  private static int getShuffleResponseCode(ShuffleHandler shuffle,
-      Token<JobTokenIdentifier> jt) throws IOException {
-    URL url = new URL("http://127.0.0.1:"
-        + shuffle.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-        + "/mapOutput?job=job_12345_0001&reduce=0&map=attempt_12345_1_m_1_0");
-    HttpURLConnection conn = TEST_EXECUTION.openConnection(url);
-    String encHash = SecureShuffleUtils.hashFromString(
-        SecureShuffleUtils.buildMsgFrom(url),
-        JobTokenSecretManager.createSecretKey(jt.getPassword()));
-    conn.addRequestProperty(
-        SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
-    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-    conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+  private static void verifyContent(HttpURLConnection conn,
+                                    String expectedContent) throws IOException {
+    DataInputStream input = new DataInputStream(conn.getInputStream());
+    ShuffleHeader header = new ShuffleHeader();
+    header.readFields(input);
+    byte[] data = new byte[expectedContent.length()];
+    assertEquals(expectedContent.length(), input.read(data));
+    assertEquals(expectedContent, new String(data));
+  }
+
+  private static int getShuffleResponseCode(String port, SecretKey key) throws IOException {
+    HttpURLConnection conn = createRequest(
+        geURL(port, TEST_JOB_ID, 0, Collections.singletonList(TEST_ATTEMPT_1), false),
+        key);
     conn.connect();
     int rc = conn.getResponseCode();
     conn.disconnect();
     return rc;
   }
 
-  @Test(timeout = 100000)
-  public void testGetMapOutputInfo() throws Exception {
-    final ArrayList<Throwable> failures = new ArrayList<>(1);
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
-    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
-        "simple");
-    UserGroupInformation.setConfiguration(conf);
-    conf.set(YarnConfiguration.NM_LOCAL_DIRS, ABS_LOG_DIR.getAbsolutePath());
-    ApplicationId appId = ApplicationId.newInstance(12345, 1);
-    String appAttemptId = "attempt_12345_1_m_1_0";
-    String user = "randomUser";
-    String reducerId = "0";
-    List<File> fileMap = new ArrayList<>();
-    createShuffleHandlerFiles(ABS_LOG_DIR, user, appId.toString(), appAttemptId,
-        conf, fileMap);
-    AuxiliaryLocalPathHandler pathHandler = new TestAuxiliaryLocalPathHandler();
-    ShuffleHandler shuffleHandler = new ShuffleHandler() {
-      @Override
-      protected Shuffle getShuffle(Configuration conf) {
-        // replace the shuffle handler with one stubbed for testing
-        return new Shuffle(conf) {
-          @Override
-          protected void populateHeaders(List<String> mapIds,
-              String outputBaseStr, String user, int reduce,
-              HttpRequest request, HttpResponse response,
-              boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
-              throws IOException {
-            // Only set response headers and skip everything else
-            // send some dummy value for content-length
-            super.setResponseHeaders(response, keepAliveParam, 100);
-          }
-          @Override
-          protected void verifyRequest(String appid,
-              ChannelHandlerContext ctx, HttpRequest request,
-              HttpResponse response, URL requestUri) {
-            // Do nothing.
-          }
-          @Override
-          protected void sendError(ChannelHandlerContext ctx, String message,
-              HttpResponseStatus status) {
-            if (failures.size() == 0) {
-              failures.add(new Error(message));
-              ctx.channel().close();
-            }
-          }
-          @Override
-          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
-              Channel ch, String user, String mapId, int reduce,
-              MapOutputInfo info) throws IOException {
-            // send a shuffle header
-            ShuffleHeader header =
-                new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
-            DataOutputBuffer dob = new DataOutputBuffer();
-            header.write(dob);
-            return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
-          }
-        };
-      }
-    };
-    shuffleHandler.setUseOutboundExceptionHandler(true);
-    shuffleHandler.setAuxiliaryLocalPathHandler(pathHandler);
-    shuffleHandler.init(conf);
-    try {
-      shuffleHandler.start();
-      DataOutputBuffer outputBuffer = new DataOutputBuffer();
-      outputBuffer.reset();
-      Token<JobTokenIdentifier> jt =
-          new Token<>("identifier".getBytes(),
-              "password".getBytes(), new Text(user), new Text("shuffleService"));
-      jt.write(outputBuffer);
-      shuffleHandler
-          .initializeApplication(new ApplicationInitializationContext(user,
-          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
-          outputBuffer.getLength())));
-      URL url =
-          new URL(
-              "http://127.0.0.1:"
-                  + shuffleHandler.getConfig().get(
-                      ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
-                  + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
-                  + "&map=attempt_12345_1_m_1_0");
-      HttpURLConnection conn = TEST_EXECUTION.openConnection(url);
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
-      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
-          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
-      conn.connect();
-      try {
-        DataInputStream is = new DataInputStream(conn.getInputStream());
-        ShuffleHeader header = new ShuffleHeader();
-        header.readFields(is);
-        is.close();
-      } catch (EOFException e) {
-        // ignore
-      }
-      assertEquals("sendError called due to shuffle error",
-          0, failures.size());
-    } finally {
-      shuffleHandler.stop();
-      FileUtil.fullyDelete(ABS_LOG_DIR);
-    }
+  private static URL geURL(String port, String jobId, int reduce, List<String> maps,
+                           boolean keepAlive) throws MalformedURLException {
+    return new URL(getURLString(port, getUri(jobId, reduce, maps, keepAlive)));
   }
 
-  @Test(timeout = 4000)
-  public void testSendMapCount() throws Exception {
-    final List<ShuffleHandler.ReduceMapFileCount> listenerList =
-        new ArrayList<>();
-    int connectionKeepAliveTimeOut = 5; //arbitrary value
-    final ChannelHandlerContext mockCtx =
-        mock(ChannelHandlerContext.class);
-    final Channel mockCh = mock(AbstractChannel.class);
-    final ChannelPipeline mockPipeline = mock(ChannelPipeline.class);
-
-    // Mock HttpRequest and ChannelFuture
-    final HttpRequest mockHttpRequest = createMockHttpRequest();
-    final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
-        listenerList);
-    final ShuffleHandler.TimeoutHandler timerHandler =
-        new ShuffleHandler.TimeoutHandler(connectionKeepAliveTimeOut);
-
-    // Mock Netty Channel Context and Channel behavior
-    Mockito.doReturn(mockCh).when(mockCtx).channel();
-    when(mockCh.pipeline()).thenReturn(mockPipeline);
-    when(mockPipeline.get(
-        Mockito.any(String.class))).thenReturn(timerHandler);
-    when(mockCtx.channel()).thenReturn(mockCh);
-    Mockito.doReturn(mockFuture).when(mockCh).writeAndFlush(Mockito.any(Object.class));
-
-    final MockShuffleHandler sh = new MockShuffleHandler();
-    Configuration conf = new Configuration();
-    sh.init(conf);
-    sh.start();
-    int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
-        ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
-    sh.getShuffle(conf).channelRead(mockCtx, mockHttpRequest);
-    assertTrue("Number of Open files should not exceed the configured " +
-            "value!-Not Expected",
-        listenerList.size() <= maxOpenFiles);
-    while(!listenerList.isEmpty()) {
-      listenerList.remove(0).operationComplete(mockFuture);
-      assertTrue("Number of Open files should not exceed the configured " +
-              "value!-Not Expected",
-          listenerList.size() <= maxOpenFiles);
-    }
-    sh.close();
-    sh.stop();
-
-    assertEquals("Should have no caught exceptions",
-        Collections.emptyList(), sh.failures);
-  }
-
-  @Test(timeout = 10000)
-  public void testIdleStateHandlingSpecifiedTimeout() throws Exception {
-    int timeoutSeconds = 4;
-    int expectedTimeoutSeconds = timeoutSeconds;
-    testHandlingIdleState(timeoutSeconds, expectedTimeoutSeconds);
+  private static String getURLString(String port, String uri) {
+    return String.format("http://127.0.0.1:%s%s", port, uri);
   }
 
-  @Test(timeout = 10000)
-  public void testIdleStateHandlingNegativeTimeoutDefaultsTo1Second() throws Exception {
-    int expectedTimeoutSeconds = 1; //expected by production code
-    testHandlingIdleState(ARBITRARY_NEGATIVE_TIMEOUT_SECONDS, expectedTimeoutSeconds);
+  private static HttpURLConnection createRequest(URL url, SecretKey secretKey) throws IOException {
+    HttpURLConnection connection = (HttpURLConnection) url.openConnection();
+    connection.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+    connection.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+        ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+    String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
+    connection.setRequestProperty(HTTP_HEADER_URL_HASH,
+        SecureShuffleUtils.hashFromString(msgToEncode, secretKey));
+    return connection;
   }
 
-  private String getShuffleUrlWithKeepAlive(ShuffleHandler shuffleHandler, long jobId,
-      long... attemptIds) {
-    String url = getShuffleUrl(shuffleHandler, jobId, attemptIds);
-    return url + "&keepAlive=true";
-  }
+  class ShuffleHandlerMock extends ShuffleHandler {
 
-  private String getShuffleUrl(ShuffleHandler shuffleHandler, long jobId, long... attemptIds) {
-    String port = shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY);
-    String shuffleBaseURL = "http://127.0.0.1:" + port;
+    public SecretKey addTestApp() throws IOException {
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt = new Token<>(
+          "identifier".getBytes(), "password".getBytes(), new Text(TEST_USER),
+          new Text("shuffleService"));
+      jt.write(outputBuffer);
+      initializeApplication(new ApplicationInitializationContext(TEST_USER, TEST_APP_ID,
+          ByteBuffer.wrap(outputBuffer.getData(), 0,
+              outputBuffer.getLength())));
 
-    StringBuilder mapAttemptIds = new StringBuilder();
-    for (int i = 0; i < attemptIds.length; i++) {
-      if (i == 0) {
-        mapAttemptIds.append("&map=");
-      } else {
-        mapAttemptIds.append(",");
-      }
-      mapAttemptIds.append(String.format("attempt_%s_1_m_1_0", attemptIds[i]));
+      return JobTokenSecretManager.createSecretKey(jt.getPassword());
     }
 
-    String location = String.format("/mapOutput" +
-        "?job=job_%s_1" +
-        "&reduce=1" +
-        "%s", jobId, mapAttemptIds);
-    return shuffleBaseURL + location;
-  }
-
-  private void testHandlingIdleState(int configuredTimeoutSeconds, int expectedTimeoutSeconds)
-      throws IOException,
-      InterruptedException {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, TEST_EXECUTION.shuffleHandlerPort());
-    conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
-    conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, configuredTimeoutSeconds);
-
-    final CountDownLatch countdownLatch = new CountDownLatch(1);
-    ResponseConfig responseConfig = new ResponseConfig(HEADER_WRITE_COUNT, 0, 0);
-    ShuffleHandlerForKeepAliveTests shuffleHandler = new ShuffleHandlerForKeepAliveTests(
-        ATTEMPT_ID, responseConfig,
-        event -> countdownLatch.countDown());
-    shuffleHandler.init(conf);
-    shuffleHandler.start();
-
-    String shuffleUrl = getShuffleUrl(shuffleHandler, ATTEMPT_ID, ATTEMPT_ID);
-    String[] urls = new String[] {shuffleUrl};
-    HttpConnectionHelper httpConnectionHelper = new HttpConnectionHelper(
-        shuffleHandler.lastSocketAddress);
-    long beforeConnectionTimestamp = System.currentTimeMillis();
-    httpConnectionHelper.connectToUrls(urls, shuffleHandler.responseConfig);
-    countdownLatch.await();
-    long channelClosedTimestamp = System.currentTimeMillis();
-    long secondsPassed =
-        TimeUnit.SECONDS.convert(channelClosedTimestamp - beforeConnectionTimestamp,
-            TimeUnit.MILLISECONDS);
-    assertTrue(String.format("Expected at least %s seconds of timeout. " +
-            "Actual timeout seconds: %s", expectedTimeoutSeconds, secondsPassed),
-        secondsPassed >= expectedTimeoutSeconds);
-    shuffleHandler.stop();
-  }
-
-  public ChannelFuture createMockChannelFuture(Channel mockCh,
-      final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
-    final ChannelFuture mockFuture = mock(ChannelFuture.class);
-    when(mockFuture.channel()).thenReturn(mockCh);
-    Mockito.doReturn(true).when(mockFuture).isSuccess();
-    Mockito.doAnswer(invocation -> {
-      //Add ReduceMapFileCount listener to a list
-      if (invocation.getArguments()[0].getClass() == ShuffleHandler.ReduceMapFileCount.class) {
-        listenerList.add((ShuffleHandler.ReduceMapFileCount)
-            invocation.getArguments()[0]);
-      }
-      return null;
-    }).when(mockFuture).addListener(Mockito.any(
-        ShuffleHandler.ReduceMapFileCount.class));
-    return mockFuture;
-  }
-
-  public HttpRequest createMockHttpRequest() {
-    HttpRequest mockHttpRequest = mock(HttpRequest.class);
-    Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).method();
-    Mockito.doAnswer(invocation -> {
-      String uri = "/mapOutput?job=job_12345_1&reduce=1";
-      for (int i = 0; i < 100; i++) {
-        uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
-      }
-      return uri;
-    }).when(mockHttpRequest).uri();
-    return mockHttpRequest;
-  }
-}
+    @Override
+    protected ShuffleChannelHandlerContext createHandlerContext() {
+      return new ShuffleChannelHandlerContext(getConfig(),
+          userRsrc,
+          secretManager,
+          createLoadingCache(),
+          new IndexCache(new JobConf(getConfig())),
+          ms.register(new ShuffleHandler.ShuffleMetrics()),
+          allChannels
+      );
+    }
+  }
+}
\ No newline at end of file
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java
new file mode 100644
index 00000000000..1bce443381d
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandlerBase.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import io.netty.util.ResourceLeakDetector;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.io.PrintWriter;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.annotation.Nonnull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheBuilder;
+import org.apache.hadoop.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hadoop.thirdparty.com.google.common.cache.LoadingCache;
+import org.apache.hadoop.thirdparty.com.google.common.cache.RemovalListener;
+import org.junit.After;
+import org.junit.Before;
+
+import static io.netty.util.ResourceLeakDetector.Level.PARANOID;
+import static org.apache.hadoop.io.MapFile.DATA_FILE_NAME;
+import static org.apache.hadoop.io.MapFile.INDEX_FILE_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestShuffleHandlerBase {
+  public static final String TEST_ATTEMPT_1 = "attempt_1111111111111_0001_m_000001_0";
+  public static final String TEST_ATTEMPT_2 = "attempt_1111111111111_0002_m_000002_0";
+  public static final String TEST_ATTEMPT_3 = "attempt_1111111111111_0003_m_000003_0";
+  public static final String TEST_JOB_ID = "job_1111111111111_0001";
+  public static final String TEST_USER = "testUser";
+  public static final String TEST_DATA_A = "aaaaa";
+  public static final String TEST_DATA_B = "bbbbb";
+  public static final String TEST_DATA_C = "ccccc";
+
+  private final PrintStream standardOut = System.out;
+  private final ByteArrayOutputStream outputStreamCaptor = new ByteArrayOutputStream();
+  @SuppressWarnings("checkstyle:VisibilityModifier")
+  protected java.nio.file.Path tempDir;
+
+  @Before
+  public void setup() throws IOException {
+    tempDir = Files.createTempDirectory("test-shuffle-channel-handler");
+    tempDir.toFile().deleteOnExit();
+
+    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_1,
+        Arrays.asList(TEST_DATA_A, TEST_DATA_B, TEST_DATA_C));
+    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_2,
+        Arrays.asList(TEST_DATA_B, TEST_DATA_A, TEST_DATA_C));
+    generateMapOutput(tempDir.toAbsolutePath().toString(), TEST_ATTEMPT_3,
+        Arrays.asList(TEST_DATA_C, TEST_DATA_B, TEST_DATA_A));
+
+    outputStreamCaptor.reset();
+    ResourceLeakDetector.setLevel(PARANOID);
+    System.setOut(new PrintStream(outputStreamCaptor));
+  }
+
+  @After
+  public void teardown() {
+    System.setOut(standardOut);
+    System.out.print(outputStreamCaptor);
+    // For this to work ch.qos.logback.classic is needed for some reason
+    assertFalse(outputStreamCaptor.toString()
+        .contains("LEAK: ByteBuf.release() was not called before"));
+  }
+
+  public List<String> matchLogs(String pattern) {
+    String logs = outputStreamCaptor.toString();
+    Matcher m = Pattern.compile(pattern).matcher(logs);
+    List<String> allMatches = new ArrayList<>();
+    while (m.find()) {
+      allMatches.add(m.group());
+    }
+    return allMatches;
+  }
+
+  public static void generateMapOutput(String tempDir, String attempt, List<String> maps)
+      throws IOException {
+    SpillRecord record = new SpillRecord(maps.size());
+
+    assertTrue(new File(getBasePath(tempDir, attempt)).mkdirs());
+    try (PrintWriter writer = new PrintWriter(getDataFile(tempDir, attempt), "UTF-8")) {
+      long startOffset = 0;
+      int partition = 0;
+      for (String map : maps) {
+        record.putIndex(new IndexRecord(
+                startOffset,
+                map.length() * 2L, // doesn't matter in this test
+                map.length()),
+            partition);
+        startOffset += map.length() + 1;
+        partition++;
+        writer.write(map);
+      }
+      record.writeToFile(new Path(getIndexFile(tempDir, attempt)),
+          new JobConf(new Configuration()));
+    }
+  }
+
+  public static String getIndexFile(String tempDir, String attempt) {
+    return String.format("%s/%s", getBasePath(tempDir, attempt), INDEX_FILE_NAME);
+  }
+
+  public static String getDataFile(String tempDir, String attempt) {
+    return String.format("%s/%s", getBasePath(tempDir, attempt), DATA_FILE_NAME);
+  }
+
+  private static String getBasePath(String tempDir, String attempt) {
+    return String.format("%s/%s/%s/%s", tempDir, TEST_JOB_ID, TEST_USER, attempt);
+  }
+
+  public static String getUri(String jobId, int reduce, List<String> maps, boolean keepAlive) {
+    return String.format("/mapOutput?job=%s&reduce=%d&map=%s%s",
+        jobId, reduce, String.join(",", maps),
+        keepAlive ? "&keepAlive=true" : "");
+  }
+
+  public LoadingCache<ShuffleHandler.AttemptPathIdentifier,
+      ShuffleHandler.AttemptPathInfo> createLoadingCache() {
+    return CacheBuilder.newBuilder().expireAfterAccess(
+            5,
+            TimeUnit.MINUTES).softValues().concurrencyLevel(16).
+        removalListener(
+            (RemovalListener<ShuffleHandler.AttemptPathIdentifier,
+                ShuffleHandler.AttemptPathInfo>) notification -> {
+            }
+        ).maximumWeight(10 * 1024 * 1024).weigher(
+            (key, value) -> key.jobId.length() + key.user.length() +
+                key.attemptId.length() +
+                value.indexPath.toString().length() +
+                value.dataPath.toString().length()
+        ).build(new CacheLoader<ShuffleHandler.AttemptPathIdentifier,
+            ShuffleHandler.AttemptPathInfo>() {
+          @Override
+          public ShuffleHandler.AttemptPathInfo load(
+              @Nonnull ShuffleHandler.AttemptPathIdentifier key) {
+            String base = String.format("%s/%s/%s/", tempDir, key.jobId, key.user);
+            String attemptBase = base + key.attemptId;
+            Path indexFileName = new Path(attemptBase + "/" + INDEX_FILE_NAME);
+            Path mapOutputFileName = new Path(attemptBase + "/" + DATA_FILE_NAME);
+            return new ShuffleHandler.AttemptPathInfo(indexFileName, mapOutputFileName);
+          }
+        });
+  }
+}
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/cert.pem b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/cert.pem
new file mode 100644
index 00000000000..ec32a67152a
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/cert.pem
@@ -0,0 +1,27 @@
+-----BEGIN CERTIFICATE-----
+MIIEpDCCAowCCQDDMEtH5Wp0qTANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
+b2NhbGhvc3QwHhcNMjMwMTE2MTI0NjQ4WhcNMzMwMTEzMTI0NjQ4WjAUMRIwEAYD
+VQQDDAlsb2NhbGhvc3QwggIiMA0GCSqGSIb3DQEBAQUAA4ICDwAwggIKAoICAQDO
+FiF+sfoJYHPMPx4jaU11mCupytAFJzz9igaiaKAZCjVHBVWC31KDxHmRdKD066DO
+clOJORNOe8Oe4aB5Lbu6wgKtlHEtKmqAU2WrYAEl0oXrZKEL0Xgs1KTTChbVSJ/I
+m1WwmEthriQSul0WaEncNpS5NV4PORhiGu0plw+SWSJBFsbl29K6oHE1ClgVjm8j
+iu4Y1NAilOPcjmhCmwRq5eq5H0mJ5LWxfvjLIJ9cPpMLG9eVLQkOIE9I01DJ37WM
+OvljUMpmhxWDq2oZEmeyCJUFSUh1IlcUM1hTmRUzU/Vcf7EhpAYZxphvSIvDQkAw
+cmnn0LQZmORCMP0HurR1o3NnzAVf/ahfpXwvA/BuCsEcW1Le+WATtxa2EvRCnEPa
+I76W35FY69t/WYZNIzPgo9eYD7iDBbqxuBH+GlDuwWU6mjEc0nL11uGtcRPrXzKa
+QhRMqAtwNW5I5S5HgPLbMiu/n+PpX6+S431eLHFHJ6WUvcwOIK4ZqLH4/Piks1fV
+0Svdo47Jymlt6dOvYm85tFsWkYcmldO6aQilRuGBYdXJ06xDyH7EaD0Z2PmPjhl9
+zkt3gpaXxBn0gsJIn++qZ26pXFxVewlJi0m84Xd3x10h9MvpP8AZMhFkLWXR2nqw
+eCfell4jzGNXBDLEcspv6HmuTvP7+gqgRCuFLrjOiQIDAQABMA0GCSqGSIb3DQEB
+CwUAA4ICAQAexU5VTmT5VAqau0TGTGEgStGPWoliV4b+d8AcdJvOd1qmetgFhJ+X
+TSutcFPdascOys0+tUV2GZwew3P8yTQyd35LDUC4OjGt2kISDplGAtTdDD19u5/R
+hQf7VxJImOxsg2lPPRv3RXMbNF64reP9YsM9osWgJOFzrDf9FkP2HByslU2v7ler
+sWQVu+Ei7r3/ZMOI7hHnN8MLqcj+BJwEHCTa8HPmr0Ic3lJ86vUVR4QQE5LgNvSu
+oSOZlALsMNVx2rxmirhC6guLwPh7HylDFMzyVedCzmqwQ0R8SSU6SmJvXNLeBFLw
+F5mZRh1jabiqsMTGnmMQ97GPs0q78M2sw3TjI+nexCcYZ3jQfR+1eFSg4DlSd55x
+BMVfT2kYThzxOw3brtygXjl6gGr8v5M6PzOvbLyDtEa3iDp7Mslw2tJ5OmxxJV9g
+QVvBQL1L2nySFk0ij2bIjD7fdpF/EpxrNf4IATOAf5YvxELUeXnyuqJZBtgC8b3I
+qXHJIpGM7N4jdwhe0sMVH0OWlqzsL14QZCE6YdvXBEksJ/HBVUie6afYAZrUwUP1
+gtcq9uFpPteg9PsBLZ7hGfNt2278EXhPBtlIpeiPE8X19Lr3bCmCO1PbWNCTkweb
+tGfwnH46DmWYUqYrofnKso1mq56yEbbuDy7a2FeHJ2d+18Fh97WnUw==
+-----END CERTIFICATE-----
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/key.pem b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/key.pem
new file mode 100644
index 00000000000..e064e5e8d03
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/key.pem
@@ -0,0 +1,52 @@
+-----BEGIN PRIVATE KEY-----
+MIIJRAIBADANBgkqhkiG9w0BAQEFAASCCS4wggkqAgEAAoICAQDOFiF+sfoJYHPM
+Px4jaU11mCupytAFJzz9igaiaKAZCjVHBVWC31KDxHmRdKD066DOclOJORNOe8Oe
+4aB5Lbu6wgKtlHEtKmqAU2WrYAEl0oXrZKEL0Xgs1KTTChbVSJ/Im1WwmEthriQS
+ul0WaEncNpS5NV4PORhiGu0plw+SWSJBFsbl29K6oHE1ClgVjm8jiu4Y1NAilOPc
+jmhCmwRq5eq5H0mJ5LWxfvjLIJ9cPpMLG9eVLQkOIE9I01DJ37WMOvljUMpmhxWD
+q2oZEmeyCJUFSUh1IlcUM1hTmRUzU/Vcf7EhpAYZxphvSIvDQkAwcmnn0LQZmORC
+MP0HurR1o3NnzAVf/ahfpXwvA/BuCsEcW1Le+WATtxa2EvRCnEPaI76W35FY69t/
+WYZNIzPgo9eYD7iDBbqxuBH+GlDuwWU6mjEc0nL11uGtcRPrXzKaQhRMqAtwNW5I
+5S5HgPLbMiu/n+PpX6+S431eLHFHJ6WUvcwOIK4ZqLH4/Piks1fV0Svdo47Jymlt
+6dOvYm85tFsWkYcmldO6aQilRuGBYdXJ06xDyH7EaD0Z2PmPjhl9zkt3gpaXxBn0
+gsJIn++qZ26pXFxVewlJi0m84Xd3x10h9MvpP8AZMhFkLWXR2nqweCfell4jzGNX
+BDLEcspv6HmuTvP7+gqgRCuFLrjOiQIDAQABAoICAQDAe6UfK2YIugCN5OnmUyUY
+z18AwD/YgFSTzgXyTNwzZvhp9A5xJNpx3eFZvN/Uwfs4t0lUom1o4WnNjJkQdWmg
+vjI4I6wtbi942evcy9dmlyGjwSI14phm7tlfj03SOXmbqZG4VhYaDsb8gvoMwq0x
+s/zmm3TVrRMcFmAqd0ABBaVbu8VbzRweWVpDGv04bQda4BkQMjyQABZu2seAZj8T
+BNldvF44H9igBqKjPj35rywxtPh/CUgq3HyQ3WXYl0x+xFpHq57Pch3jFAgNkMYv
+X5qoDFFTrhY89NPriNBnV2SU12L+s69LBdU8Izr+zXMcjNBjxudf/RA8znqWbIi8
+pbwXOwBUD4XP3coAzipVOJfeXb5OAkq+wjHnKb4YXJ5mNFb7LcRy6MJVqyxPNJGh
+UlfGxsJ441K/9e+aoDEyB0xbjeZ+2yP021P2eObwj46M5sxP2BFSe8E1DUpQ5+ZX
+kKipKDZETLc2e4G37Hziw2Wa5X0AAbKgSh1a5AMd0GUbrmJQzO0dok1ujJNu+zpn
+K0qAG0c/HD+egIKPEc03+81fLzXKxGHOxrTnHPInWLeGjxEL3oM2Tl5QkYSjm8qg
+uIY5up5K//R+fDy45/XRACPOo+yf2RTFnIjfIhxJaB2M7BrBUpWvX1xLJQfDS3Gb
+4Rfo2Qlgh/adrNkr2m0NHQKCAQEA8KZK7eugKV/Gk5L0j5E59qytlVZTUoDWdbAq
+vMnAgU6BGiTy/Av4jPCH5HDYD5ZX5nqD+GVkXSh2fiT8eSpgtIuBEdeiHHZXwCcb
+IK7vKxSePQrs0aA53uk7LY0LGPMRhDheYTItTE+6pRp2HswDgRBw+1fm6Yt1ps32
+oqM7bOUSg6eCKISmAP8UV9ac1l6ZHLdhTIzrVnOc/YqIW4ruNbwpSK1fI7uTWH4i
+5JqfPtTa7anJrt080vVUi6cS22G8QhlW3q6fo1GrH8QM4gInRF/4MwkAAP8p1787
+KlFHXxS0bWnJZhjKvh7naeVJi5EaMCWJ1gKF/OcvQYONrA6zdwKCAQEA2ztvxaPy
+j4Pq2wpYWlHueCPPn5yMDQQqCLlcG50HzPbquSdZe7o0MEWqV8MaXB6fw1xLwCC4
+i5+HnL72KaBu6DVIhMYDmPzhiw4GbCASfR4v/biQ+047KfnQiHPUEhUCxGvHhcDR
+Y3Zhzax6mC79Mfz2gunEx2ZI1MURn/sO+3tQtx+Gzsoj/W4LHpeEQGKsUhcIN48v
+XAmeWqVwwmr0twQygnOQyvgZxtiunnIADVxJJh4AQLWGagDiMjaWJ4fZ7q8aBMLY
+SFBlDqzf5xssraUAiaawsaRL0jliy0y8WXwSJHb4WKebH2QQcUq22c2y8IbKMcsz
+AjLHf1nG0oEN/wKCAQEAypfkutnEEzDbVz+/feIQPzfuRqvPJ8TpR1jREfBzjSiP
+DXiPy1s0m0qfzNSnQLAcPh9kXMjXLooy/02Z81OW6EgNl/zGMFn80/rYHrLHDKil
+8tPwvSW7sor9VALKle2EEKD367G3augwRHC7gn/Ub2JtC1xcPL84g/4fJZpwG+PZ
+q1ZpAD10F6Cm+q/lh59KHV/QnQaB1V0tjFGFLDQRCNHom5PBZa6zhCtkqrn1WIsP
+6EcpUHpWi28YBx3XhTOJrsfwVzYBlRfbDboZ8mdHsYttw2FPTIeELWqDn8OfZ09h
++j6126sBe/8+aOsr+EBnIKNpn+6t6FSkdu4OZgxWTwKCAQEAxjRXWjVeUBgxFfWl
+aYsXcXDW/nP3PrFcG2UR/DkdW1aFYf5MbYssMdRaLFxNEanyQRrJtZsfncQORE11
+mq7cdn3t4XRwvjkq5KA6eDkK3imTs+zQzxOsc8fSm/s0aWCrjs/upGNuK2FVDTD5
+6WraKP6OFE+rQ6ebAxpkU+IUapLTp6wOIhkpLq/1x6OuwtMy/kiqeiiu5aQgkc1v
+Q6aVNn3J+Jzo9EgYbelq/f8NQwcDbz3Cdr5nFqFT35fway7sflm6yUErbz2YEAuF
+ppiv7RH3iXu11fU3Q4n0Yt8ujiyY7nTNFMH7ggbiwrqM1B+fvsvuM9SFemBUczPE
+iH57GwKCAQAdLm1mSeUPn3qEXy/ui7M7GPK43r1l2fn6UJhEGckm4YJ2DOlWisNW
+2ilyzfdlYF1Cq22iKxi3/mZdNojKKL7yFCTwx2evHsSIt2vcyD25sFVh5u9O/xFa
+1Zk3Pzq6XpaAfZCY4OizJb5zraWYWVNAP1DI4qT0Kg6LvNWZ5G9Dh+tptTmB9E05
+5GiBWD3OfWH5AMQ2UmprEivbaqN8Gm/W6m6Hraf+LbP4aFORwElNAZTymeNcW5O5
+ha2XU2TAINmhgPm1IZEGiSah+A+s2uW4Ox4nQJfksy+rtJOPRcnK4aIhURhzwJv/
+8JszrQ2Tq9fN/cO50CDeipqAtKkcWNjE
+-----END PRIVATE KEY-----
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties
index b7d8ad36efc..471993fd590 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/log4j.properties
@@ -17,5 +17,5 @@ log4j.threshold=ALL
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2} (%F:%M(%L)) - %m%n
-log4j.logger.io.netty=INFO
-log4j.logger.org.apache.hadoop.mapred=INFO
\ No newline at end of file
+log4j.logger.io.netty=TRACE
+log4j.logger.org.apache.hadoop.mapred=TRACE
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org