You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2020/12/23 18:43:14 UTC

[spark] branch branch-3.1 updated: [SPARK-32916][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Ensure the number of chunks in meta file and index file are equal

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new b174ac7  [SPARK-32916][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Ensure the number of chunks in meta file and index file are equal
b174ac7 is described below

commit b174ac7fa1ca7969ccb4db06da132bea36947942
Author: Chandni Singh <si...@gmail.com>
AuthorDate: Wed Dec 23 12:42:18 2020 -0600

    [SPARK-32916][SHUFFLE][TEST-MAVEN][TEST-HADOOP2.7] Ensure the number of chunks in meta file and index file are equal
    
    ### What changes were proposed in this pull request?
    1. Fixes for bugs in `RemoteBlockPushResolver` where the number of chunks in meta file and index file are inconsistent due to exceptions while writing to either index file or meta file. This java class was introduced in https://github.com/apache/spark/pull/30062.
     - If the writing to index file fails, the position of meta file is not reset. This means that the number of chunks in meta file is inconsistent with index file.
    - During the exception handling while writing to index/meta file, we just set the pointer to the start position. If the files are closed just after this then it doesn't get rid of any the extra bytes written to it.
    2. Adds an IOException threshold. If the `RemoteBlockPushResolver` encounters IOExceptions greater than this threshold  while updating data/meta/index file of a shuffle partition, then it responds to the client with  exception- `IOExceptions exceeded the threshold` so that client can stop pushing data for this shuffle partition.
    3. When the update to metadata fails, exception is not propagated back to the client. This results in the increased size of the current chunk. However, with (2) in place, the current chunk will still be of a manageable size.
    
    ### Why are the changes needed?
    This fix is needed for the bugs mentioned above.
    1. Moved writing to meta file after index file. This fixes the issue because if there is an exception writing to meta file, then the index file position is not updated. With this change, if there is an exception writing to index file, then none of the files are effectively updated and the same is true vice-versa.
    2. Truncating the lengths of data/index/meta files when the partition is finalized.
    3. When the IOExceptions have reached the threshold, it is most likely that future blocks will also face the issue. So, it is better to let the clients know so that they can stop pushing the blocks for that partition.
    4. When just the meta update fails, client retries pushing the block which was successfully merged to data file. This can be avoided by letting the chunk grow slightly.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Added unit tests for all the bugs and threshold.
    
    Closes #30433 from otterc/SPARK-32916-followup.
    
    Authored-by: Chandni Singh <si...@gmail.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit 0677c39009de0830d995da77332f0756c76d6b56)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../apache/spark/network/util/TransportConf.java   |  10 +
 .../apache/spark/network/shuffle/ErrorHandler.java |   9 +
 .../network/shuffle/RemoteBlockPushResolver.java   | 301 ++++++++++++----
 .../shuffle/RemoteBlockPushResolverSuite.java      | 380 +++++++++++++++++++++
 4 files changed, 629 insertions(+), 71 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index fd287b0..d305dfa 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -398,4 +398,14 @@ public class TransportConf {
     return JavaUtils.byteStringAsBytes(
       conf.get("spark.shuffle.server.mergedIndexCacheSize", "100m"));
   }
+
+  /**
+   * The threshold for number of IOExceptions while merging shuffle blocks to a shuffle partition.
+   * When the number of IOExceptions while writing to merged shuffle data/index/meta file exceed
+   * this threshold then the shuffle server will respond back to client to stop pushing shuffle
+   * blocks for this shuffle partition.
+   */
+  public int ioExceptionsThresholdDuringMerge() {
+    return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
+  }
 }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
index d13a027..968777f 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ErrorHandler.java
@@ -71,6 +71,15 @@ public interface ErrorHandler {
     public static final String BLOCK_APPEND_COLLISION_DETECTED_MSG_PREFIX =
       "Couldn't find an opportunity to write block";
 
+    /**
+     * String constant used for generating exception messages indicating the server encountered
+     * IOExceptions multiple times, greater than the configured threshold, while trying to merged
+     * shuffle blocks of the same shuffle partition. When the client receives this this response,
+     * it will stop pushing any more blocks for the same shuffle partition.
+     */
+    public static final String IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX =
+      "IOExceptions exceeded the threshold";
+
     @Override
     public boolean shouldRetryError(Throwable t) {
       // If it is a connection time out or a connection closed exception, no need to retry.
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 76abb05..0e23556 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -17,15 +17,16 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.DataOutputStream;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
@@ -45,6 +46,8 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.Weigher;
 import com.google.common.collect.Maps;
+import com.google.common.primitives.Ints;
+import com.google.common.primitives.Longs;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -78,6 +81,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   private final Executor directoryCleaner;
   private final TransportConf conf;
   private final int minChunkSize;
+  private final int ioExceptionsThresholdDuringMerge;
   private final ErrorHandler.BlockPushErrorHandler errorHandler;
 
   @SuppressWarnings("UnstableApiUsage")
@@ -92,6 +96,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       // Add `spark` prefix because it will run in NM in Yarn mode.
       NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
     this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
+    this.ioExceptionsThresholdDuringMerge = conf.ioExceptionsThresholdDuringMerge();
     CacheLoader<File, ShuffleIndexInformation> indexCacheLoader =
       new CacheLoader<File, ShuffleIndexInformation>() {
         public ShuffleIndexInformation load(File file) throws IOException {
@@ -132,7 +137,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         if (dataFile.exists()) {
           return null;
         } else {
-          return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile);
+          return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile);
         }
       } catch (IOException e) {
         logger.error(
@@ -146,6 +151,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     });
   }
 
+  @VisibleForTesting
+  AppShufflePartitionInfo newAppShufflePartitionInfo(
+      AppShuffleId appShuffleId,
+      int reduceId,
+      File dataFile,
+      File indexFile,
+      File metaFile) throws IOException {
+    return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile,
+      new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
+  }
+
   @Override
   public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) {
     AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
@@ -370,26 +386,19 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]);
     } else {
       Collection<AppShufflePartitionInfo> partitionsToFinalize = shufflePartitions.values();
-      int totalPartitions = partitionsToFinalize.size();
-      RoaringBitmap[] bitmaps = new RoaringBitmap[totalPartitions];
-      int[] reduceIds = new int[totalPartitions];
-      long[] sizes = new long[totalPartitions];
+      List<RoaringBitmap> bitmaps = new ArrayList<>(partitionsToFinalize.size());
+      List<Integer> reduceIds = new ArrayList<>(partitionsToFinalize.size());
+      List<Long> sizes = new ArrayList<>(partitionsToFinalize.size());
       Iterator<AppShufflePartitionInfo> partitionsIter = partitionsToFinalize.iterator();
-      int idx = 0;
       while (partitionsIter.hasNext()) {
         AppShufflePartitionInfo partition = partitionsIter.next();
         synchronized (partition) {
-          // Get rid of any partial block data at the end of the file. This could either
-          // be due to failure or a request still being processed when the shuffle
-          // merge gets finalized.
           try {
-            partition.dataChannel.truncate(partition.getPosition());
-            if (partition.getPosition() != partition.getLastChunkOffset()) {
-              partition.updateChunkInfo(partition.getPosition(), partition.lastMergedMapIndex);
-            }
-            bitmaps[idx] = partition.mapTracker;
-            reduceIds[idx] = partition.reduceId;
-            sizes[idx++] = partition.getPosition();
+            // This can throw IOException which will marks this shuffle partition as not merged.
+            partition.finalizePartition();
+            bitmaps.add(partition.mapTracker);
+            reduceIds.add(partition.reduceId);
+            sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
             logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId,
               msg.shuffleId, partition.reduceId, ioe);
@@ -401,7 +410,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           }
         }
       }
-      mergeStatuses = new MergeStatuses(msg.shuffleId, bitmaps, reduceIds, sizes);
+      mergeStatuses = new MergeStatuses(msg.shuffleId,
+        bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds),
+        Longs.toArray(sizes));
     }
     partitions.remove(appShuffleId);
     logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId);
@@ -450,6 +461,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       this.streamId = streamId;
       this.partitionInfo = Preconditions.checkNotNull(partitionInfo);
       this.mapIndex = mapIndex;
+      abortIfNecessary();
     }
 
     @Override
@@ -466,11 +478,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     private void writeBuf(ByteBuffer buf) throws IOException {
       while (buf.hasRemaining()) {
         if (partitionInfo.isEncounteredFailure()) {
-          long updatedPos = partitionInfo.getPosition() + length;
+          long updatedPos = partitionInfo.getDataFilePos() + length;
           logger.debug(
             "{} shuffleId {} reduceId {} encountered failure current pos {} updated pos {}",
             partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
-            partitionInfo.reduceId, partitionInfo.getPosition(), updatedPos);
+            partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos);
           length += partitionInfo.dataChannel.write(buf, updatedPos);
         } else {
           length += partitionInfo.dataChannel.write(buf);
@@ -510,15 +522,35 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
      * This is only invoked when the stream is able to write. The stream first writes any deferred
      * block parts buffered in memory.
      */
-    private void writeAnyDeferredBufs() throws IOException {
-      if (deferredBufs != null && !deferredBufs.isEmpty()) {
-        for (ByteBuffer deferredBuf : deferredBufs) {
-          writeBuf(deferredBuf);
-        }
+    private void writeDeferredBufs() throws IOException {
+      for (ByteBuffer deferredBuf : deferredBufs) {
+        writeBuf(deferredBuf);
+      }
+      deferredBufs = null;
+    }
+
+    /**
+     * This throws RuntimeException if the number of IOExceptions have exceeded threshold.
+     */
+    private void abortIfNecessary() {
+      if (partitionInfo.shouldAbort(mergeManager.ioExceptionsThresholdDuringMerge)) {
         deferredBufs = null;
+        throw new RuntimeException(String.format("%s when merging %s",
+          ErrorHandler.BlockPushErrorHandler.IOEXCEPTIONS_EXCEEDED_THRESHOLD_PREFIX,
+          streamId));
       }
     }
 
+    /**
+     * This increments the number of IOExceptions and throws RuntimeException if it exceeds the
+     * threshold which will abort the merge of a particular shuffle partition.
+     */
+    private void incrementIOExceptionsAndAbortIfNecessary() {
+      // Update the count of IOExceptions
+      partitionInfo.incrementIOExceptions();
+      abortIfNecessary();
+    }
+
     @Override
     public void onData(String streamId, ByteBuffer buf) throws IOException {
       // When handling the block data using StreamInterceptor, it can help to reduce the amount
@@ -556,6 +588,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             deferredBufs = null;
             return;
           }
+          abortIfNecessary();
           logger.trace("{} shuffleId {} reduceId {} onData writable",
             partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
             partitionInfo.reduceId);
@@ -565,8 +598,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
           // If we got here, it's safe to write the block data to the merged shuffle file. We
           // first write any deferred block.
-          writeAnyDeferredBufs();
-          writeBuf(buf);
+          try {
+            if (deferredBufs != null && !deferredBufs.isEmpty()) {
+              writeDeferredBufs();
+            }
+            writeBuf(buf);
+          } catch (IOException ioe) {
+            incrementIOExceptionsAndAbortIfNecessary();
+            // If the above doesn't throw a RuntimeException, then we propagate the IOException
+            // back to the client so the block could be retried.
+            throw ioe;
+          }
           // If we got here, it means we successfully write the current chunk of block to merged
           // shuffle file. If we encountered failure while writing the previous block, we should
           // reset the file channel position and the status of partitionInfo to indicate that we
@@ -574,7 +616,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           // position tracked by partitionInfo here. That is only updated while the entire block
           // is successfully written to merged shuffle file.
           if (partitionInfo.isEncounteredFailure()) {
-            partitionInfo.dataChannel.position(partitionInfo.getPosition() + length);
+            partitionInfo.dataChannel.position(partitionInfo.getDataFilePos() + length);
             partitionInfo.setEncounteredFailure(false);
           }
         } else {
@@ -636,15 +678,33 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             return;
           }
           if (partitionInfo.getCurrentMapIndex() < 0) {
-            writeAnyDeferredBufs();
+            try {
+              if (deferredBufs != null && !deferredBufs.isEmpty()) {
+                abortIfNecessary();
+                writeDeferredBufs();
+              }
+            } catch (IOException ioe) {
+              incrementIOExceptionsAndAbortIfNecessary();
+              // If the above doesn't throw a RuntimeException, then we propagate the IOException
+              // back to the client so the block could be retried.
+              throw ioe;
+            }
           }
-          long updatedPos = partitionInfo.getPosition() + length;
+          long updatedPos = partitionInfo.getDataFilePos() + length;
           boolean indexUpdated = false;
           if (updatedPos - partitionInfo.getLastChunkOffset() >= mergeManager.minChunkSize) {
-            partitionInfo.updateChunkInfo(updatedPos, mapIndex);
-            indexUpdated = true;
+            try {
+              partitionInfo.updateChunkInfo(updatedPos, mapIndex);
+              indexUpdated = true;
+            } catch (IOException ioe) {
+              incrementIOExceptionsAndAbortIfNecessary();
+              // If the above doesn't throw a RuntimeException, then we do not propagate the
+              // IOException to the client. This may increase the chunk size however the increase is
+              // still limited because of the limit on the number of IOExceptions for a
+              // particular shuffle partition.
+            }
           }
-          partitionInfo.setPosition(updatedPos);
+          partitionInfo.setDataFilePos(updatedPos);
           partitionInfo.setCurrentMapIndex(-1);
 
           // update merged results
@@ -687,6 +747,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         }
       }
     }
+
+    @VisibleForTesting
+    AppShufflePartitionInfo getPartitionInfo() {
+      return partitionInfo;
+    }
   }
 
   /**
@@ -736,7 +801,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     // The merged shuffle data file channel
     public FileChannel dataChannel;
     // Location offset of the last successfully merged block for this shuffle partition
-    private long position;
+    private long dataFilePos;
     // Indicating whether failure was encountered when merging the previous block
     private boolean encounteredFailure;
     // Track the map index whose block is being merged for this shuffle partition
@@ -744,44 +809,46 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     // Bitmap tracking which mapper's blocks have been merged for this shuffle partition
     private RoaringBitmap mapTracker;
     // The index file for a particular merged shuffle contains the chunk offsets.
-    private RandomAccessFile indexFile;
+    private MergeShuffleFile indexFile;
     // The meta file for a particular merged shuffle contains all the map indices that belong to
     // every chunk. The entry per chunk is a serialized bitmap.
-    private RandomAccessFile metaFile;
+    private MergeShuffleFile metaFile;
     // The offset for the last chunk tracked in the index file for this shuffle partition
     private long lastChunkOffset;
     private int lastMergedMapIndex = -1;
     // Bitmap tracking which mapper's blocks are in the current shuffle chunk
     private RoaringBitmap chunkTracker;
+    private int numIOExceptions = 0;
+    private boolean indexMetaUpdateFailed;
 
     AppShufflePartitionInfo(
         AppShuffleId appShuffleId,
         int reduceId,
         File dataFile,
-        File indexFile,
-        File metaFile) throws IOException {
+        MergeShuffleFile indexFile,
+        MergeShuffleFile metaFile) throws IOException {
       this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app shuffle id");
       this.reduceId = reduceId;
       this.dataChannel = new FileOutputStream(dataFile).getChannel();
-      this.indexFile = new RandomAccessFile(indexFile, "rw");
-      this.metaFile = new RandomAccessFile(metaFile, "rw");
+      this.indexFile = indexFile;
+      this.metaFile = metaFile;
       this.currentMapIndex = -1;
       // Writing 0 offset so that we can reuse ShuffleIndexInformation.getIndex()
       updateChunkInfo(0L, -1);
-      this.position = 0;
+      this.dataFilePos = 0;
       this.encounteredFailure = false;
       this.mapTracker = new RoaringBitmap();
       this.chunkTracker = new RoaringBitmap();
     }
 
-    public long getPosition() {
-      return position;
+    public long getDataFilePos() {
+      return dataFilePos;
     }
 
-    public void setPosition(long position) {
+    public void setDataFilePos(long dataFilePos) {
       logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appShuffleId.appId,
-        appShuffleId.shuffleId, reduceId, this.position, position);
-      this.position = position;
+        appShuffleId.shuffleId, reduceId, this.dataFilePos, dataFilePos);
+      this.dataFilePos = dataFilePos;
     }
 
     boolean isEncounteredFailure() {
@@ -825,25 +892,29 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
      * @param mapIndex the map index to be added to chunk tracker.
      */
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
-      long idxStartPos = -1;
       try {
-        // update the chunk tracker to meta file before index file
+        logger.trace("{} shuffleId {} reduceId {} index current {} updated {}",
+          appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, chunkOffset);
+        if (indexMetaUpdateFailed) {
+          indexFile.getChannel().position(indexFile.getPos());
+        }
+        indexFile.getDos().writeLong(chunkOffset);
+        // Chunk bitmap should be written to the meta file after the index file because if there are
+        // any exceptions during writing the offset to the index file, meta file should not be
+        // updated. If the update to the index file is successful but the update to meta file isn't
+        // then the index file position is not updated.
         writeChunkTracker(mapIndex);
-        idxStartPos = indexFile.getFilePointer();
-        logger.trace("{} shuffleId {} reduceId {} updated index current {} updated {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset,
-          chunkOffset);
-        indexFile.writeLong(chunkOffset);
+        indexFile.updatePos(8);
+        this.lastChunkOffset = chunkOffset;
+        indexMetaUpdateFailed = false;
       } catch (IOException ioe) {
-        if (idxStartPos != -1) {
-          // reset the position to avoid corrupting index files during exception.
-          logger.warn("{} shuffleId {} reduceId {} reset index to position {}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId, idxStartPos);
-          indexFile.seek(idxStartPos);
-        }
+        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appShuffleId.appId,
+          appShuffleId.shuffleId, reduceId);
+        indexMetaUpdateFailed = true;
+        // Any exception here is propagated to the caller and the caller can decide whether to
+        // abort or not.
         throw ioe;
       }
-      this.lastChunkOffset = chunkOffset;
     }
 
     private void writeChunkTracker(int mapIndex) throws IOException {
@@ -851,17 +922,38 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         return;
       }
       chunkTracker.add(mapIndex);
-      long metaStartPos = metaFile.getFilePointer();
-      try {
-        logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
-        chunkTracker.serialize(metaFile);
-      } catch (IOException ioe) {
-        logger.warn("{} shuffleId {} reduceId {} mapIndex {} reset position of meta file to {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex, metaStartPos);
-        metaFile.seek(metaStartPos);
-        throw ioe;
+      logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file",
+        appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
+      if (indexMetaUpdateFailed) {
+        metaFile.getChannel().position(metaFile.getPos());
+      }
+      chunkTracker.serialize(metaFile.getDos());
+      metaFile.updatePos(metaFile.getChannel().position() - metaFile.getPos());
+    }
+
+    private void incrementIOExceptions() {
+      numIOExceptions++;
+    }
+
+    private boolean shouldAbort(int ioExceptionsThresholdDuringMerge) {
+      return numIOExceptions > ioExceptionsThresholdDuringMerge;
+    }
+
+    private void finalizePartition() throws IOException {
+      if (dataFilePos != lastChunkOffset) {
+        try {
+          updateChunkInfo(dataFilePos, lastMergedMapIndex);
+        } catch (IOException ioe) {
+          // Any exceptions here while updating the meta files can be ignored. If the files
+          // aren't successfully updated they will be truncated.
+        }
       }
+      // Get rid of any partial block data at the end of the file. This could either
+      // be due to failure, or a request still being processed when the shuffle
+      // merge gets finalized, or any exceptions while updating index/meta files.
+      dataChannel.truncate(lastChunkOffset);
+      indexFile.getChannel().truncate(indexFile.getPos());
+      metaFile.getChannel().truncate(metaFile.getPos());
     }
 
     void closeAllFiles() {
@@ -877,7 +969,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       }
       if (metaFile != null) {
         try {
-          // if the stream is closed, channel get's closed as well.
           metaFile.close();
         } catch (IOException ioe) {
           logger.warn("Error closing meta file for {} shuffleId {} reduceId {}",
@@ -902,6 +993,26 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     protected void finalize() throws Throwable {
       closeAllFiles();
     }
+
+    @VisibleForTesting
+    MergeShuffleFile getIndexFile() {
+      return indexFile;
+    }
+
+    @VisibleForTesting
+    MergeShuffleFile getMetaFile() {
+      return metaFile;
+    }
+
+    @VisibleForTesting
+    FileChannel getDataChannel() {
+      return dataChannel;
+    }
+
+    @VisibleForTesting
+    int getNumIOExceptions() {
+      return numIOExceptions;
+    }
   }
 
   /**
@@ -931,4 +1042,52 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       }
     }
   }
+
+  @VisibleForTesting
+  static class MergeShuffleFile {
+    private FileChannel channel;
+    private DataOutputStream dos;
+    private long pos;
+
+    @VisibleForTesting
+    MergeShuffleFile(File file) throws IOException {
+      FileOutputStream fos = new FileOutputStream(file);
+      channel = fos.getChannel();
+      dos = new DataOutputStream(fos);
+    }
+
+    @VisibleForTesting
+    MergeShuffleFile(FileChannel channel, DataOutputStream dos) {
+      this.channel = channel;
+      this.dos = dos;
+    }
+
+    private void updatePos(long numBytes) {
+      pos += numBytes;
+    }
+
+    void close() throws IOException {
+      try {
+        dos.close();
+      } finally {
+        dos = null;
+        channel = null;
+      }
+    }
+
+    @VisibleForTesting
+    DataOutputStream getDos() {
+      return dos;
+    }
+
+    @VisibleForTesting
+    FileChannel getChannel() {
+      return channel;
+    }
+
+    @VisibleForTesting
+    long getPos() {
+      return pos;
+    }
+  }
 }
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 0f200dc..8c6f743 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -17,9 +17,12 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.DataOutputStream;
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -42,6 +45,7 @@ import static org.junit.Assert.*;
 
 import org.apache.spark.network.buffer.FileSegmentManagedBuffer;
 import org.apache.spark.network.client.StreamCallbackWithID;
+import org.apache.spark.network.shuffle.RemoteBlockPushResolver.MergeShuffleFile;
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo;
 import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
 import org.apache.spark.network.shuffle.protocol.MergeStatuses;
@@ -411,6 +415,347 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
+  @Test
+  public void testRecoverIndexFileAfterIOExceptions() throws IOException {
+    useTestFiles(true, false);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+    callback1.onComplete(callback1.getID());
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
+    // Close the index stream so it throws IOException
+    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
+    testIndexFile.close();
+    StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+    callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+    // This will complete without any IOExceptions because number of IOExceptions are less than
+    // the threshold but the update to index file will be unsuccessful.
+    callback2.onComplete(callback2.getID());
+    assertEquals("index position", 16, testIndexFile.getPos());
+    // Restore the index stream so it can write successfully again.
+    testIndexFile.restore();
+    StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+    callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
+    callback3.onComplete(callback3.getID());
+    assertEquals("index position", 24, testIndexFile.getPos());
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+      new FinalizeShuffleMerge(TEST_APP, 0));
+    validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
+    validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}});
+  }
+
+  @Test
+  public void testRecoverIndexFileAfterIOExceptionsInFinalize() throws IOException {
+    useTestFiles(true, false);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+    callback1.onComplete(callback1.getID());
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
+    // Close the index stream so it throws IOException
+    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
+    testIndexFile.close();
+    StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+    callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+    // This will complete without any IOExceptions because number of IOExceptions are less than
+    // the threshold but the update to index file will be unsuccessful.
+    callback2.onComplete(callback2.getID());
+    assertEquals("index position", 16, testIndexFile.getPos());
+    // The last update to index was unsuccessful however any further updates will be successful.
+    // Restore the index stream so it can write successfully again.
+    testIndexFile.restore();
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+      new FinalizeShuffleMerge(TEST_APP, 0));
+    assertEquals("index position", 24, testIndexFile.getPos());
+    validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
+    validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}});
+  }
+
+  @Test
+  public void testRecoverMetaFileAfterIOExceptions() throws IOException {
+    useTestFiles(false, true);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+    callback1.onComplete(callback1.getID());
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
+    // Close the meta stream so it throws IOException
+    TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile) partitionInfo.getMetaFile();
+    long metaPosBeforeClose = testMetaFile.getPos();
+    testMetaFile.close();
+    StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+    callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+    // This will complete without any IOExceptions because number of IOExceptions are less than
+    // the threshold but the update to index and meta file will be unsuccessful.
+    callback2.onComplete(callback2.getID());
+    assertEquals("index position", 16, partitionInfo.getIndexFile().getPos());
+    assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos());
+    // Restore the meta stream so it can write successfully again.
+    testMetaFile.restore();
+    StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+    callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
+    callback3.onComplete(callback3.getID());
+    assertEquals("index position", 24, partitionInfo.getIndexFile().getPos());
+    assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+      new FinalizeShuffleMerge(TEST_APP, 0));
+    validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
+    validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}});
+  }
+
+  @Test
+  public void testRecoverMetaFileAfterIOExceptionsInFinalize() throws IOException {
+    useTestFiles(false, true);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
+    callback1.onComplete(callback1.getID());
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
+    // Close the meta stream so it throws IOException
+    TestMergeShuffleFile testMetaFile = (TestMergeShuffleFile) partitionInfo.getMetaFile();
+    long metaPosBeforeClose = testMetaFile.getPos();
+    testMetaFile.close();
+    StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+    callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+    // This will complete without any IOExceptions because number of IOExceptions are less than
+    // the threshold but the update to index and meta file will be unsuccessful.
+    callback2.onComplete(callback2.getID());
+    MergeShuffleFile indexFile = partitionInfo.getIndexFile();
+    assertEquals("index position", 16, indexFile.getPos());
+    assertEquals("meta position", metaPosBeforeClose, testMetaFile.getPos());
+    // Restore the meta stream so it can write successfully again.
+    testMetaFile.restore();
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+      new FinalizeShuffleMerge(TEST_APP, 0));
+    assertEquals("index position", 24, indexFile.getPos());
+    assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
+    validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
+    MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
+    validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}});
+  }
+
+  @Test (expected = RuntimeException.class)
+  public void testIOExceptionsExceededThreshold() throws IOException {
+    RemoteBlockPushResolver.PushBlockStreamCallback callback =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
+    callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
+    callback.onComplete(callback.getID());
+    // Close the data stream so it throws continuous IOException
+    partitionInfo.getDataChannel().close();
+    for (int i = 1; i < 5; i++) {
+      RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+        (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+      try {
+        callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2]));
+      } catch (IOException ioe) {
+        // this will throw IOException so the client can retry.
+        callback1.onFailure(callback1.getID(), ioe);
+      }
+    }
+    assertEquals(4, partitionInfo.getNumIOExceptions());
+    // After 4 IOException, the server will respond with IOExceptions exceeded threshold
+    try {
+      RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
+        (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+      callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1]));
+    } catch (Throwable t) {
+      assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0",
+        t.getMessage());
+      throw t;
+    }
+  }
+
+  @Test (expected = RuntimeException.class)
+  public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOException {
+    useTestFiles(true, false);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
+    callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
+    callback.onComplete(callback.getID());
+    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
+    testIndexFile.close();
+    for (int i = 1; i < 5; i++) {
+      RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+        (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+      callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
+      // This will complete without any exceptions but the exception count is increased.
+      callback1.onComplete(callback1.getID());
+    }
+    assertEquals(4, partitionInfo.getNumIOExceptions());
+    // After 4 IOException, the server will respond with IOExceptions exceeded threshold for any
+    // new request for this partition.
+    try {
+      RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+      callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[4]));
+      callback2.onComplete(callback2.getID());
+    } catch (Throwable t) {
+      assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0",
+        t.getMessage());
+      throw t;
+    }
+  }
+
+  @Test (expected = RuntimeException.class)
+  public void testRequestForAbortedShufflePartitionThrowsException() {
+    try {
+      testIOExceptionsDuringMetaUpdateIncreasesExceptionCount();
+    } catch (Throwable t) {
+      // No more blocks can be merged to this partition.
+    }
+    try {
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 10, 0, 0));
+    } catch (Throwable t) {
+      assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_10_0",
+        t.getMessage());
+      throw t;
+    }
+  }
+
+  @Test (expected = RuntimeException.class)
+  public void testPendingBlockIsAbortedImmediately() throws IOException {
+    useTestFiles(true, false);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
+    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
+    testIndexFile.close();
+    for (int i = 1; i < 6; i++) {
+      RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+        (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+      try {
+        callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
+        // This will complete without any exceptions but the exception count is increased.
+        callback1.onComplete(callback1.getID());
+      } catch (Throwable t) {
+        callback1.onFailure(callback1.getID(), t);
+      }
+    }
+    assertEquals(5, partitionInfo.getNumIOExceptions());
+    // The server will respond with IOExceptions exceeded threshold for any additional attempts
+    // to write.
+    try {
+      callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
+    } catch (Throwable t) {
+      assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0",
+        t.getMessage());
+      throw t;
+    }
+  }
+
+  @Test (expected = RuntimeException.class)
+  public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException {
+    useTestFiles(true, false);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
+    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
+    testIndexFile.close();
+    for (int i = 1; i < 5; i++) {
+      RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
+        (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+      try {
+        callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
+        // This will complete without any exceptions but the exception count is increased.
+        callback1.onComplete(callback1.getID());
+      } catch (Throwable t) {
+        callback1.onFailure(callback1.getID(), t);
+      }
+    }
+    assertEquals(4, partitionInfo.getNumIOExceptions());
+    RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+    callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
+    // This is deferred
+    callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
+    // Callback2 completes which will throw another exception.
+    try {
+      callback2.onComplete(callback2.getID());
+    } catch (Throwable t) {
+      callback2.onFailure(callback2.getID(), t);
+    }
+    assertEquals(5, partitionInfo.getNumIOExceptions());
+    // Restore index file so that any further writes to it are successful and any exceptions are
+    // due to IOExceptions exceeding threshold.
+    testIndexFile.restore();
+    try {
+      callback.onComplete(callback.getID());
+    } catch (Throwable t) {
+      assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_0_0",
+        t.getMessage());
+      throw t;
+    }
+  }
+
+  @Test
+  public void testFailureWhileTruncatingFiles() throws IOException {
+    useTestFiles(true, false);
+    PushBlock[] pushBlocks = new PushBlock[] {
+      new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[2])),
+      new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[3])),
+      new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])),
+      new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3]))
+    };
+    pushBlockHelper(TEST_APP, pushBlocks);
+    RemoteBlockPushResolver.PushBlockStreamCallback callback =
+      (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+    callback.onData(callback.getID(), ByteBuffer.wrap(new byte[2]));
+    callback.onComplete(callback.getID());
+    RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
+    TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
+    // Close the index file so truncate throws IOException
+    testIndexFile.close();
+    MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
+      new FinalizeShuffleMerge(TEST_APP, 0));
+    validateMergeStatuses(statuses, new int[] {1}, new long[] {8});
+    MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 1);
+    validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}});
+  }
+
+  private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId,
+        File dataFile, File indexFile, File metaFile) throws IOException {
+        MergeShuffleFile mergedIndexFile = useTestIndexFile ? new TestMergeShuffleFile(indexFile)
+          : new MergeShuffleFile(indexFile);
+        MergeShuffleFile mergedMetaFile = useTestMetaFile ? new TestMergeShuffleFile(metaFile) :
+          new MergeShuffleFile(metaFile);
+        return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, mergedIndexFile,
+          mergedMetaFile);
+      }
+    };
+    registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+  }
+
   private Path[] createLocalDirs(int numLocalDirs) throws IOException {
     Path[] localDirs = new Path[numLocalDirs];
     for (int i = 0; i < localDirs.length; i++) {
@@ -493,4 +838,39 @@ public class RemoteBlockPushResolverSuite {
       this.buffer = buffer;
     }
   }
+
+  private static class TestMergeShuffleFile extends MergeShuffleFile {
+    private DataOutputStream activeDos;
+    private File file;
+    private FileChannel channel;
+
+    private TestMergeShuffleFile(File file) throws IOException {
+      super(null, null);
+      this.file = file;
+      FileOutputStream fos = new FileOutputStream(file);
+      channel = fos.getChannel();
+      activeDos = new DataOutputStream(fos);
+    }
+
+    @Override
+    DataOutputStream getDos() {
+      return activeDos;
+    }
+
+    @Override
+    FileChannel getChannel() {
+      return channel;
+    }
+
+    @Override
+    void close() throws IOException {
+      activeDos.close();
+    }
+
+    void restore() throws IOException {
+      FileOutputStream fos = new FileOutputStream(file, true);
+      channel = fos.getChannel();
+      activeDos = new DataOutputStream(fos);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org