You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2023/02/28 08:36:45 UTC

[tez] branch master updated: TEZ-4460: Read timed out in shuffle handler - incorrect usage of EMPTY_LAST_CONTENT and channel write (#257) (Laszlo Bodor reviewed by Rajesh Balamohan, Syed Shameerur Rahman)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bd6f9cdc TEZ-4460: Read timed out in shuffle handler - incorrect usage of EMPTY_LAST_CONTENT and channel write (#257) (Laszlo Bodor reviewed by Rajesh Balamohan, Syed Shameerur Rahman)
6bd6f9cdc is described below

commit 6bd6f9cdcf47176ab78fb5aea555583af50cc2d3
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Tue Feb 28 09:36:40 2023 +0100

    TEZ-4460: Read timed out in shuffle handler - incorrect usage of EMPTY_LAST_CONTENT and channel write (#257) (Laszlo Bodor reviewed by Rajesh Balamohan, Syed Shameerur Rahman)
---
 .../org/apache/tez/auxservices/ShuffleHandler.java |  31 +--
 .../apache/tez/auxservices/TestShuffleHandler.java | 288 +++++++++++++--------
 2 files changed, 196 insertions(+), 123 deletions(-)

diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index c6657750f..8b1b1aee0 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -129,7 +129,6 @@ import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.codec.TooLongFrameException;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 import io.netty.handler.codec.http.DefaultHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
 import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpObjectAggregator;
@@ -306,21 +305,28 @@ public class ShuffleHandler extends AuxiliaryService {
 
     @Override
     public void operationComplete(ChannelFuture future) throws Exception {
+      Channel ch = future.channel();
       if (!future.isSuccess()) {
-        future.channel().close();
+        ch.close();
         return;
       }
       int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
       if (waitCount == 0) {
+        LOG.debug("Finished with all map outputs");
+        /*
+         * LastHttpContent.EMPTY_LAST_CONTENT can only be written when there are no remaining maps to send,
+         * this is the only time we can finish the HTTP response.
+         */
+        ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
         metrics.operationComplete(future);
         // Let the idle timer handler close keep-alive connections
         if (reduceContext.getKeepAlive()) {
-          ChannelPipeline pipeline = future.channel().pipeline();
+          ChannelPipeline pipeline = ch.pipeline();
           TimeoutHandler timeoutHandler =
               (TimeoutHandler) pipeline.get(TIMEOUT_HANDLER);
           timeoutHandler.setEnabledTimeout(true);
         } else {
-          future.channel().close();
+          ch.close();
         }
       } else {
         SHUFFLE.sendMap(reduceContext);
@@ -993,12 +999,11 @@ public class ShuffleHandler extends AuxiliaryService {
     @Override
     public void channelRead(ChannelHandlerContext ctx, Object message)
         throws Exception {
-      FullHttpRequest request = (FullHttpRequest) message;
+      HttpRequest request = (HttpRequest) message;
       handleRequest(ctx, request);
-      request.release();
     }
 
-    private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request)
+    private void handleRequest(ChannelHandlerContext ctx, HttpRequest request)
         throws IOException, Exception {
       if (request.getMethod() != GET) {
           sendError(ctx, METHOD_NOT_ALLOWED);
@@ -1123,13 +1128,9 @@ public class ShuffleHandler extends AuxiliaryService {
       for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
         ChannelFuture nextMap = sendMap(reduceContext);
         if(nextMap == null) {
-          // by this special message flushed, we can make sure the whole response is finished
-          ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
           return;
         }
       }
-      // by this special message flushed, we can make sure the whole response is finished
-      ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
     }
 
     private boolean isNullOrEmpty(List<String> entries) {
@@ -1496,7 +1497,7 @@ public class ShuffleHandler extends AuxiliaryService {
       DataOutputBuffer dobRange = new DataOutputBuffer();
       // Indicate how many record to be written
       WritableUtils.writeVInt(dobRange, reduceRange.getLast() - reduceRange.getFirst() + 1);
-      ch.write(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
+      ch.writeAndFlush(wrappedBuffer(dobRange.getData(), 0, dobRange.getLength()));
       for (int reduce = reduceRange.getFirst(); reduce <= reduceRange.getLast(); reduce++) {
         TezIndexRecord index = outputInfo.getIndex(reduce);
         // Records are only valid if they have a non-zero part length
@@ -1511,7 +1512,7 @@ public class ShuffleHandler extends AuxiliaryService {
         DataOutputBuffer dob = new DataOutputBuffer();
         header.write(dob);
         // Free the memory needed to store the spill and index records
-        ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+        ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
       }
       outputInfo.finish();
 
@@ -1531,14 +1532,14 @@ public class ShuffleHandler extends AuxiliaryService {
             rangeOffset, rangePartLength, manageOsCache, readaheadLength,
             readaheadPool, spillFile.getAbsolutePath(),
             shuffleBufferSize, shuffleTransferToAllowed);
-        writeFuture = ch.write(partition);
+        writeFuture = ch.writeAndFlush(partition);
       } else {
         // HTTPS cannot be done with zero copy.
         final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
             rangeOffset, rangePartLength, sslFileBufferSize,
             manageOsCache, readaheadLength, readaheadPool,
             spillFile.getAbsolutePath());
-        writeFuture = ch.write(chunk);
+        writeFuture = ch.writeAndFlush(chunk);
       }
       metrics.shuffleConnections.incr();
       metrics.shuffleOutputBytes.incr(rangePartLength); // optimistic
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index 40b16857a..d0d0a381e 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -48,6 +48,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import java.util.zip.Checksum;
 
 import org.apache.hadoop.conf.Configuration;
@@ -109,6 +111,8 @@ public class TestShuffleHandler {
   private static final File TEST_DIR = new File(System.getProperty("test.build.data"),
       TestShuffleHandler.class.getName()).getAbsoluteFile();
   private static final String HADOOP_TMP_DIR = "hadoop.tmp.dir";
+  private static final String TEST_PARTITION_DATA_STRING = "0123456789";
+
   class MockShuffleHandler extends org.apache.tez.auxservices.ShuffleHandler {
     @Override
     protected Shuffle getShuffle(final Configuration conf) {
@@ -283,9 +287,7 @@ public class TestShuffleHandler {
   @Test (timeout = 10000)
   public void testClientClosesConnection() throws Exception {
     final AtomicBoolean failureEncountered = new AtomicBoolean(false);
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     ShuffleHandler shuffleHandler = new ShuffleHandler() {
       @Override
       protected Shuffle getShuffle(Configuration conf) {
@@ -387,9 +389,7 @@ public class TestShuffleHandler {
   @Test(timeout = 10000)
   public void testKeepAlive() throws Exception {
     final AtomicBoolean failureEncountered = new AtomicBoolean(false);
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
     // try setting to -ve keep alive timeout.
     conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
@@ -535,9 +535,7 @@ public class TestShuffleHandler {
 
   @Test
   public void testSocketKeepAlive() throws Exception {
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setBoolean(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED, true);
     // try setting to -ve keep alive timeout.
     conf.setInt(ShuffleHandler.SHUFFLE_CONNECTION_KEEP_ALIVE_TIME_OUT, -100);
@@ -579,9 +577,7 @@ public class TestShuffleHandler {
   @Test (timeout = 10000)
   public void testIncompatibleShuffleVersion() throws Exception {
     final int failureNum = 3;
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     ShuffleHandler shuffleHandler = new ShuffleHandler();
     shuffleHandler.init(conf);
     shuffleHandler.start();
@@ -613,9 +609,7 @@ public class TestShuffleHandler {
   @Test (timeout = 10000)
   public void testMaxConnections() throws Exception {
 
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
     ShuffleHandler shuffleHandler = new ShuffleHandler() {
       @Override
@@ -722,9 +716,7 @@ public class TestShuffleHandler {
    */
   @Test(timeout = 10000)
   public void testRangedFetch() throws IOException {
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "simple");
@@ -741,23 +733,7 @@ public class TestShuffleHandler {
     List<File> fileMap = new ArrayList<>();
     createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
         conf, fileMap);
-    ShuffleHandler shuffleHandler = new ShuffleHandler() {
-
-      @Override
-      protected Shuffle getShuffle(Configuration conf) {
-        // replace the shuffle handler with one stubbed for testing
-        return new Shuffle(conf) {
-
-          @Override
-          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-                                       HttpRequest request, HttpResponse response, URL requestUri)
-              throws IOException {
-            // Do nothing.
-          }
-
-        };
-      }
-    };
+    ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify();
     shuffleHandler.init(conf);
     try {
       shuffleHandler.start();
@@ -814,6 +790,104 @@ public class TestShuffleHandler {
     }
   }
 
+  /**
+   * Validate the ranged fetch works as expected for different amount of map attempts and reduce ranges.
+   */
+  @Test(timeout = 30000)
+  public void testRangedFetchMultipleAttempts() throws IOException {
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/1);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/1);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/1);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/1);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/5);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/5);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/5);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/5);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/10);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/10);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/10);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/10);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/1, /*reduceRange*/100);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/5, /*reduceRange*/100);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/10, /*reduceRange*/100);
+    runMultiAttemptMultiRangeShuffleTest(/*attemptRange*/100, /*reduceRange*/100);
+  }
+
+  private void runMultiAttemptMultiRangeShuffleTest(int attemptRange, int reduceRange) throws IOException {
+    Random random = new Random();
+    String user = "randomUser";
+    int firstAttempt = random.nextInt(10);
+    int reducerIdStart = random.nextInt(10);
+    int reducerIdEnd = reducerIdStart + reduceRange - 1;
+
+    Configuration conf = getInitialConf();
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    conf.setInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES, 3);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "simple");
+    UserGroupInformation.setConfiguration(conf);
+    File absLogDir = new File("target", TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+    ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    LOG.info(appId.toString());
+    List<String> attemptIds = IntStream.range(firstAttempt, firstAttempt + attemptRange)
+        .mapToObj(i -> "attempt_12345_1_m_" + i + "_0").collect(Collectors.toList());
+    List<File> fileMap = new ArrayList<>();
+    for (String attemptId : attemptIds) {
+      createShuffleHandlerFiles(absLogDir, user, appId.toString(), attemptId, conf, fileMap, reducerIdStart,
+          reducerIdEnd);
+    }
+    ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify();
+    shuffleHandler.init(conf);
+    try {
+      shuffleHandler.start();
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt = new Token<JobTokenIdentifier>("identifier".getBytes(), "password".getBytes(),
+          new Text(user), new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffleHandler.initializeApplication(new ApplicationInitializationContext(user, appId,
+          ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength())));
+      URL url = new URL("http://127.0.0.1:" + shuffleHandler.getConfig().get(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+          + "/mapOutput?job=job_12345_0001&dag=1&reduce=" + reducerIdStart + "-" + reducerIdEnd + "&map="
+          + String.join(",", attemptIds));
+      LOG.info("Calling shuffle URL: {}", url);
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME, ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION, ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      conn.connect();
+      boolean succeeded = false;
+      try {
+        DataInputStream is = new DataInputStream(conn.getInputStream());
+        for (String attempt : attemptIds) {
+          int partitionCount = WritableUtils.readVInt(is);
+          List<ShuffleHeader> headers = new ArrayList<>(partitionCount);
+          for (int i = reducerIdStart; i <= reducerIdEnd; i++) {
+            ShuffleHeader header = new ShuffleHeader();
+            header.readFields(is);
+            Assert.assertEquals("Incorrect map id", attempt, header.getMapId());
+            Assert.assertEquals("Incorrect reduce id", i, header.getPartition());
+            headers.add(header);
+          }
+          for (ShuffleHeader header : headers) {
+            byte[] bytes = new byte[(int) header.getCompressedLength()];
+            is.read(bytes);
+            Assert.assertEquals(TEST_PARTITION_DATA_STRING, new String(bytes));
+          }
+        }
+        succeeded = true;
+        // Read one more byte to force EOF
+        is.readByte();
+        Assert.fail("More fetch bytes that expected in stream");
+      } catch (EOFException e) {
+        Assert.assertTrue("Failed to copy ranged fetch", succeeded);
+      }
+
+    } finally {
+      shuffleHandler.close();
+      FileUtil.fullyDelete(absLogDir);
+    }
+  }
+
   /**
    * Validate the ownership of the map-output files being pulled in. The
    * local-file-system owner of the file should match the user component in the
@@ -824,9 +898,7 @@ public class TestShuffleHandler {
   public void testMapFileAccess() throws IOException {
     // This will run only in NativeIO is enabled as SecureIOUtils need it
     assumeTrue(NativeIO.isAvailable());
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "kerberos");
@@ -842,23 +914,7 @@ public class TestShuffleHandler {
     List<File> fileMap = new ArrayList<File>();
     createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
         conf, fileMap);
-    ShuffleHandler shuffleHandler = new ShuffleHandler() {
-
-      @Override
-      protected Shuffle getShuffle(Configuration conf) {
-        // replace the shuffle handler with one stubbed for testing
-        return new Shuffle(conf) {
-
-          @Override
-          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
-              HttpRequest request, HttpResponse response, URL requestUri)
-              throws IOException {
-            // Do nothing.
-          }
-
-        };
-      }
-    };
+    ShuffleHandler shuffleHandler = getShuffleHandlerWithNoVerify();
     shuffleHandler.init(conf);
     try {
       shuffleHandler.start();
@@ -907,48 +963,55 @@ public class TestShuffleHandler {
     }
   }
 
-  private static void createShuffleHandlerFiles(File logDir, String user,
-      String appId, String appAttemptId, Configuration conf,
-      List<File> fileMap) throws IOException {
-    String attemptDir =
-        StringUtils.join(Path.SEPARATOR,
-            new String[] { logDir.getAbsolutePath(),
-                ShuffleHandler.USERCACHE, user,
-                ShuffleHandler.APPCACHE, appId,"dag_1/" + "output",
-                appAttemptId });
+  private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId,
+      Configuration conf, List<File> fileMap) throws IOException {
+    createShuffleHandlerFiles(logDir, user, appId, appAttemptId, conf, fileMap, 0, 1);
+  }
+
+  private static void createShuffleHandlerFiles(File logDir, String user, String appId, String appAttemptId,
+      Configuration conf, List<File> fileMap, int reduceStart, int reduceEnd) throws IOException {
+    String attemptDir = StringUtils.join(Path.SEPARATOR, new String[] { logDir.getAbsolutePath(),
+        ShuffleHandler.USERCACHE, user, ShuffleHandler.APPCACHE, appId, "dag_1/" + "output", appAttemptId });
     File appAttemptDir = new File(attemptDir);
     appAttemptDir.mkdirs();
-    System.out.println(appAttemptDir.getAbsolutePath());
+    LOG.info(appAttemptDir.getAbsolutePath());
     File indexFile = new File(appAttemptDir, "file.out.index");
     fileMap.add(indexFile);
-    createIndexFile(indexFile, conf);
+    createIndexFile(indexFile, conf, reduceStart, reduceEnd);
     File mapOutputFile = new File(appAttemptDir, "file.out");
     fileMap.add(mapOutputFile);
-    createMapOutputFile(mapOutputFile, conf);
+    createMapOutputFile(mapOutputFile, conf, reduceEnd - reduceStart + 1);
   }
 
-  private static void
-    createMapOutputFile(File mapOutputFile, Configuration conf)
-          throws IOException {
+  private static void createMapOutputFile(File mapOutputFile, Configuration conf, int partitionCount)
+      throws IOException {
     FileOutputStream out = new FileOutputStream(mapOutputFile);
-    out.write("Creating new dummy map output file. Used only for testing"
-        .getBytes());
+
+    StringBuilder b = new StringBuilder(partitionCount * TEST_PARTITION_DATA_STRING.length());
+    for (int i = 0; i < partitionCount; i++) {
+      b.append(TEST_PARTITION_DATA_STRING);
+    }
+
+    out.write(b.toString().getBytes());
     out.flush();
     out.close();
   }
 
-  private static void createIndexFile(File indexFile, Configuration conf)
+  private static void createIndexFile(File indexFile, Configuration conf, int reduceStart, int reduceEnd)
       throws IOException {
     if (indexFile.exists()) {
-      System.out.println("Deleting existing file");
+      LOG.info("Deleting existing file");
       indexFile.delete();
     }
     Checksum crc = new PureJavaCrc32();
-    TezSpillRecord tezSpillRecord = new TezSpillRecord(2);
-    tezSpillRecord.putIndex(new TezIndexRecord(0, 10, 10), 0);
-    tezSpillRecord.putIndex(new TezIndexRecord(10, 10, 10), 1);
-    tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf,
-        FileSystem.getLocal(conf).getRaw(), crc);
+    TezSpillRecord tezSpillRecord = new TezSpillRecord(reduceEnd + 1);
+    int offset = 0;
+    for (int i = reduceStart; i <= reduceEnd; i++) {
+      tezSpillRecord.putIndex(
+          new TezIndexRecord(offset, TEST_PARTITION_DATA_STRING.length(), TEST_PARTITION_DATA_STRING.length()), i);
+      offset += TEST_PARTITION_DATA_STRING.length();
+    }
+    tezSpillRecord.writeToFile(new Path(indexFile.getAbsolutePath()), conf, FileSystem.getLocal(conf).getRaw(), crc);
   }
 
   @Test
@@ -958,9 +1021,7 @@ public class TestShuffleHandler {
     final File tmpDir = new File(System.getProperty("test.build.data",
         System.getProperty("java.io.tmpdir")),
         TestShuffleHandler.class.getName());
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
     ShuffleHandler shuffle = new ShuffleHandler();
     // emulate aux services startup with recovery enabled
@@ -1026,9 +1087,7 @@ public class TestShuffleHandler {
     final File tmpDir = new File(System.getProperty("test.build.data",
         System.getProperty("java.io.tmpdir")),
         TestShuffleHandler.class.getName());
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
     ShuffleHandler shuffle = new ShuffleHandler();
     // emulate aux services startup with recovery enabled
@@ -1133,9 +1192,7 @@ public class TestShuffleHandler {
   @Test(timeout = 100000)
   public void testGetMapOutputInfo() throws Exception {
     final AtomicBoolean failureEncountered = new AtomicBoolean(false);
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "simple");
@@ -1237,10 +1294,8 @@ public class TestShuffleHandler {
   @Test(timeout = 5000)
   public void testDagDelete() throws Exception {
     final AtomicBoolean failureEncountered = new AtomicBoolean(false);
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "simple");
     UserGroupInformation.setConfiguration(conf);
@@ -1318,9 +1373,8 @@ public class TestShuffleHandler {
   @Test
   public void testVertexShuffleDelete() throws Exception {
     final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
-    Configuration conf = new Configuration();
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
             "simple");
     UserGroupInformation.setConfiguration(conf);
@@ -1387,7 +1441,7 @@ public class TestShuffleHandler {
         fail("Encountered Exception!" + e.getMessage());
       }
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
       FileUtil.fullyDelete(absLogDir);
     }
   }
@@ -1395,9 +1449,8 @@ public class TestShuffleHandler {
   @Test(timeout = 5000)
   public void testFailedTaskAttemptDelete() throws Exception {
     final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
-    Configuration conf = new Configuration();
+    Configuration conf = getInitialConf();
     conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
         "simple");
     UserGroupInformation.setConfiguration(conf);
@@ -1469,7 +1522,7 @@ public class TestShuffleHandler {
       Assert.assertEquals("sendError called due to shuffle error",
           0, failures.size());
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
       FileUtil.fullyDelete(absLogDir);
     }
   }
@@ -1500,10 +1553,7 @@ public class TestShuffleHandler {
     when(mockCh.writeAndFlush(Object.class)).thenReturn(mockFuture);
 
     final ShuffleHandler sh = new MockShuffleHandler();
-    Configuration conf = new Configuration();
-    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
-    // The Shuffle handler port associated with the service is bound to but not used.
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     sh.init(conf);
     sh.start();
     int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
@@ -1523,8 +1573,7 @@ public class TestShuffleHandler {
 
   @Test
   public void testShuffleHandlerSendsDiskError() throws Exception {
-    Configuration conf = new Configuration();
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
 
     DataInputStream input = null;
     MockShuffleHandlerWithFatalDiskError shuffleHandler =
@@ -1609,22 +1658,45 @@ public class TestShuffleHandler {
       shuffleHandler.serviceStart();
       Assert.assertEquals(port, shuffleHandler.getPort());
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
     }
   }
 
   @Test
   public void testConfigPortDynamic() throws Exception {
-    Configuration conf = new Configuration();
-    // 0 as config, should be dynamically chosen by netty
-    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    Configuration conf = getInitialConf();
     MockShuffleHandler2 shuffleHandler = new MockShuffleHandler2();
     shuffleHandler.serviceInit(conf);
     try {
       shuffleHandler.serviceStart();
       Assert.assertTrue("ShuffleHandler should use a random chosen port", shuffleHandler.getPort() > 0);
     } finally {
-      shuffleHandler.stop();
+      shuffleHandler.close();
     }
   }
+
+  private Configuration getInitialConf() {
+    Configuration conf = new Configuration();
+    conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
+    // 0 as config, should be dynamically chosen by netty
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    return conf;
+  }
+
+  private ShuffleHandler getShuffleHandlerWithNoVerify() {
+    return new ShuffleHandler() {
+
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected void verifyRequest(String appid, ChannelHandlerContext ctx, HttpRequest request,
+              HttpResponse response, URL requestUri) throws IOException {
+            // Do nothing.
+          }
+        };
+      }
+    };
+  }
 }