You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2021/07/20 05:04:52 UTC

[spark] branch branch-3.2 updated: [SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 1907f0a  [SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way
1907f0a is described below

commit 1907f0ac572eb209ee7c8f9b364f1bdd8d615dcd
Author: Ye Zhou <ye...@linkedin.com>
AuthorDate: Tue Jul 20 00:03:30 2021 -0500

    [SPARK-35546][SHUFFLE] Enable push-based shuffle when multiple app attempts are enabled and manage concurrent access to the state in a better way
    
    ### What changes were proposed in this pull request?
    This is one of the patches for SPIP SPARK-30602 which is needed for push-based shuffle.
    
    ### Summary of the change:
    When Executor registers with Shuffle Service, it will encode the merged shuffle dir created and also the application attemptId into the ShuffleManagerMeta into Json. Then in Shuffle Service, it will decode the Json string and get the correct merged shuffle dir and also the attemptId. If the registration comes from a newer attempt, the merged shuffle information will be updated to store the information from the newer attempt.
    
    This PR also refactored the management of the merged shuffle information to avoid concurrency issues.
    ### Why are the changes needed?
    Refer to the SPIP in SPARK-30602.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added unit tests.
    The reference PR with the consolidated changes covering the complete implementation is also provided in SPARK-30602.
    We have already verified the functionality and the improved performance as documented in the SPIP doc.
    
    Closes #33078 from zhouyejoe/SPARK-35546.
    
    Authored-by: Ye Zhou <ye...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
    (cherry picked from commit c77acf0bbc25341de2636649fdd76f9bb4bdf4ed)
    Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
---
 .../apache/spark/network/util/TransportConf.java   |   7 +
 .../network/shuffle/ExternalBlockStoreClient.java  |   7 +-
 .../network/shuffle/OneForOneBlockPusher.java      |   8 +-
 .../network/shuffle/RemoteBlockPushResolver.java   | 546 ++++++++++++---------
 .../shuffle/protocol/ExecutorShuffleInfo.java      |   7 +-
 .../shuffle/protocol/FinalizeShuffleMerge.java     |  13 +-
 .../network/shuffle/protocol/PushBlockStream.java  |  21 +-
 .../network/shuffle/ExternalBlockHandlerSuite.java |   2 +-
 .../network/shuffle/OneForOneBlockPusherSuite.java |  22 +-
 .../shuffle/RemoteBlockPushResolverSuite.java      | 466 +++++++++++++-----
 .../main/scala/org/apache/spark/SparkContext.scala |   1 +
 .../org/apache/spark/internal/config/package.scala |  10 +
 .../org/apache/spark/storage/BlockManager.scala    |   9 +-
 .../apache/spark/storage/DiskBlockManager.scala    |  34 +-
 .../main/scala/org/apache/spark/util/Utils.scala   |  23 +-
 .../spark/storage/DiskBlockManagerSuite.scala      |  23 +-
 .../scala/org/apache/spark/util/UtilsSuite.scala   |   2 +-
 17 files changed, 810 insertions(+), 391 deletions(-)

diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
index f051042..8e7ecf5 100644
--- a/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
+++ b/common/network-common/src/main/java/org/apache/spark/network/util/TransportConf.java
@@ -419,4 +419,11 @@ public class TransportConf {
   public int ioExceptionsThresholdDuringMerge() {
     return conf.getInt("spark.shuffle.server.ioExceptionsThresholdDuringMerge", 4);
   }
+
+  /**
+   * The application attemptID assigned from Hadoop YARN.
+   */
+  public int appAttemptId() {
+    return conf.getInt("spark.app.attempt.id", -1);
+  }
 }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
index f44140b1..63bf787 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java
@@ -141,8 +141,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
       RetryingBlockFetcher.BlockFetchStarter blockPushStarter =
           (inputBlockId, inputListener) -> {
             TransportClient client = clientFactory.createClient(host, port);
-            new OneForOneBlockPusher(client, appId, inputBlockId, inputListener, buffersWithId)
-              .start();
+            new OneForOneBlockPusher(client, appId, conf.appAttemptId(), inputBlockId,
+              inputListener, buffersWithId).start();
           };
       int maxRetries = conf.maxIORetries();
       if (maxRetries > 0) {
@@ -168,7 +168,8 @@ public class ExternalBlockStoreClient extends BlockStoreClient {
     checkInit();
     try {
       TransportClient client = clientFactory.createClient(host, port);
-      ByteBuffer finalizeShuffleMerge = new FinalizeShuffleMerge(appId, shuffleId).toByteBuffer();
+      ByteBuffer finalizeShuffleMerge =
+        new FinalizeShuffleMerge(appId, conf.appAttemptId(), shuffleId).toByteBuffer();
       client.sendRpc(finalizeShuffleMerge, new RpcResponseCallback() {
         @Override
         public void onSuccess(ByteBuffer response) {
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
index 6ee95ef..b8b32e2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/OneForOneBlockPusher.java
@@ -45,6 +45,7 @@ public class OneForOneBlockPusher {
 
   private final TransportClient client;
   private final String appId;
+  private final int appAttemptId;
   private final String[] blockIds;
   private final BlockFetchingListener listener;
   private final Map<String, ManagedBuffer> buffers;
@@ -52,11 +53,13 @@ public class OneForOneBlockPusher {
   public OneForOneBlockPusher(
       TransportClient client,
       String appId,
+      int appAttemptId,
       String[] blockIds,
       BlockFetchingListener listener,
       Map<String, ManagedBuffer> buffers) {
     this.client = client;
     this.appId = appId;
+    this.appAttemptId = appAttemptId;
     this.blockIds = blockIds;
     this.listener = listener;
     this.buffers = buffers;
@@ -123,8 +126,9 @@ public class OneForOneBlockPusher {
         throw new IllegalArgumentException(
           "Unexpected shuffle push block id format: " + blockIds[i]);
       }
-      ByteBuffer header = new PushBlockStream(appId, Integer.parseInt(blockIdParts[1]),
-        Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
+      ByteBuffer header =
+        new PushBlockStream(appId, appAttemptId, Integer.parseInt(blockIdParts[1]),
+          Integer.parseInt(blockIdParts[2]), Integer.parseInt(blockIdParts[3]) , i).toByteBuffer();
       client.uploadStream(new NioManagedBuffer(header), buffers.get(blockIds[i]),
         new BlockPushCallback(i, blockIds[i]));
     }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index 47d2547..f88cfee 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -28,27 +28,26 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicReference;
 
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.cache.Weigher;
-import com.google.common.collect.Maps;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
-import org.apache.commons.lang3.builder.ToStringBuilder;
-import org.apache.commons.lang3.builder.ToStringStyle;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,14 +72,22 @@ import org.apache.spark.network.util.TransportConf;
 public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
   private static final Logger logger = LoggerFactory.getLogger(RemoteBlockPushResolver.class);
-  @VisibleForTesting
-  static final String MERGE_MANAGER_DIR = "merge_manager";
+
   public static final String MERGED_SHUFFLE_FILE_NAME_PREFIX = "shuffleMerged";
+  public static final String SHUFFLE_META_DELIMITER = ":";
+  public static final String MERGE_DIR_KEY = "mergeDir";
+  public static final String ATTEMPT_ID_KEY = "attemptId";
+  private static final int UNDEFINED_ATTEMPT_ID = -1;
 
-  private final ConcurrentMap<String, AppPathsInfo> appsPathInfo;
-  private final ConcurrentMap<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> partitions;
+  /**
+   * A concurrent hashmap where the key is the applicationId, and the value includes
+   * all the merged shuffle information for this application. AppShuffleInfo stores
+   * the application attemptId, merged shuffle local directories and the metadata
+   * for actively being merged shuffle partitions.
+   */
+  private final ConcurrentMap<String, AppShuffleInfo> appsShuffleInfo;
 
-  private final Executor directoryCleaner;
+  private final Executor mergedShuffleCleaner;
   private final TransportConf conf;
   private final int minChunkSize;
   private final int ioExceptionsThresholdDuringMerge;
@@ -92,9 +99,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   @SuppressWarnings("UnstableApiUsage")
   public RemoteBlockPushResolver(TransportConf conf) {
     this.conf = conf;
-    this.partitions = Maps.newConcurrentMap();
-    this.appsPathInfo = Maps.newConcurrentMap();
-    this.directoryCleaner = Executors.newSingleThreadExecutor(
+    this.appsShuffleInfo = new ConcurrentHashMap<>();
+    this.mergedShuffleCleaner = Executors.newSingleThreadExecutor(
       // Add `spark` prefix because it will run in NM in Yarn mode.
       NettyUtils.createThreadFactory("spark-shuffle-merged-shuffle-directory-cleaner"));
     this.minChunkSize = conf.minChunkSizeInMergedShuffleFile();
@@ -112,34 +118,59 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     this.errorHandler = new ErrorHandler.BlockPushErrorHandler();
   }
 
+  @VisibleForTesting
+  protected AppShuffleInfo validateAndGetAppShuffleInfo(String appId) {
+    // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
+    AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(appId);
+    Preconditions.checkArgument(appShuffleInfo != null,
+      "application " + appId + " is not registered or NM was restarted.");
+    return appShuffleInfo;
+  }
+
   /**
-   * Given the appShuffleId and reduceId that uniquely identifies a given shuffle partition of an
-   * application, retrieves the associated metadata. If not present and the corresponding merged
-   * shuffle does not exist, initializes the metadata.
+   * Given the appShuffleInfo, shuffleId and reduceId that uniquely identifies a given shuffle
+   * partition of an application, retrieves the associated metadata. If not present and the
+   * corresponding merged shuffle does not exist, initializes the metadata.
    */
   private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      AppShuffleInfo appShuffleInfo,
+      int shuffleId,
       int reduceId) {
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
-    if (!partitions.containsKey(appShuffleId) && dataFile.exists()) {
-      // If this partition is already finalized then the partitions map will not contain
-      // the appShuffleId but the data file would exist. In that case the block is considered late.
+    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, reduceId);
+    ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.partitions;
+    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+      partitions.compute(shuffleId, (id, map) -> {
+        if (map == null) {
+          // If this partition is already finalized then the partitions map will not contain the
+          // shuffleId but the data file would exist. In that case the block is considered late.
+          if (dataFile.exists()) {
+            return null;
+          }
+          return new ConcurrentHashMap<>();
+        } else {
+          return map;
+        }
+      });
+    if (shufflePartitions == null) {
       return null;
     }
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-      partitions.computeIfAbsent(appShuffleId, id -> Maps.newConcurrentMap());
+
     return shufflePartitions.computeIfAbsent(reduceId, key -> {
       // It only gets here when the key is not present in the map. This could either
       // be the first time the merge manager receives a pushed block for a given application
       // shuffle partition, or after the merged shuffle file is finalized. We handle these
       // two cases accordingly by checking if the file already exists.
-      File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
-      File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
+      File indexFile =
+        appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
+      File metaFile =
+        appShuffleInfo.getMergedShuffleMetaFile(shuffleId, reduceId);
       try {
         if (dataFile.exists()) {
           return null;
         } else {
-          return newAppShufflePartitionInfo(appShuffleId, reduceId, dataFile, indexFile, metaFile);
+          return newAppShufflePartitionInfo(
+            appShuffleInfo.appId, shuffleId, reduceId, dataFile, indexFile, metaFile);
         }
       } catch (IOException e) {
         logger.error(
@@ -148,26 +179,28 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             indexFile.getAbsolutePath(), metaFile.getAbsolutePath());
         throw new RuntimeException(
           String.format("Cannot initialize merged shuffle partition for appId %s shuffleId %s "
-          + "reduceId %s", appShuffleId.appId, appShuffleId.shuffleId, reduceId), e);
+            + "reduceId %s", appShuffleInfo.appId, shuffleId, reduceId), e);
       }
     });
   }
 
   @VisibleForTesting
   AppShufflePartitionInfo newAppShufflePartitionInfo(
-      AppShuffleId appShuffleId,
+      String appId,
+      int shuffleId,
       int reduceId,
       File dataFile,
       File indexFile,
       File metaFile) throws IOException {
-    return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile,
+    return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile,
       new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
   }
 
   @Override
   public MergedBlockMeta getMergedBlockMeta(String appId, int shuffleId, int reduceId) {
-    AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
-    File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File indexFile =
+      appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
     if (!indexFile.exists()) {
       throw new RuntimeException(String.format(
         "Merged shuffle index file %s not found", indexFile.getPath()));
@@ -175,7 +208,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     int size = (int) indexFile.length();
     // First entry is the zero offset
     int numChunks = (size / Long.BYTES) - 1;
-    File metaFile = getMergedShuffleMetaFile(appShuffleId, reduceId);
+    File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, reduceId);
     if (!metaFile.exists()) {
       throw new RuntimeException(String.format("Merged shuffle meta file %s not found",
         metaFile.getPath()));
@@ -190,13 +223,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   @SuppressWarnings("UnstableApiUsage")
   @Override
   public ManagedBuffer getMergedBlockData(String appId, int shuffleId, int reduceId, int chunkId) {
-    AppShuffleId appShuffleId = new AppShuffleId(appId, shuffleId);
-    File dataFile = getMergedShuffleDataFile(appShuffleId, reduceId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    File dataFile = appShuffleInfo.getMergedShuffleDataFile(shuffleId, reduceId);
     if (!dataFile.exists()) {
       throw new RuntimeException(String.format("Merged shuffle data file %s not found",
         dataFile.getPath()));
     }
-    File indexFile = getMergedShuffleIndexFile(appShuffleId, reduceId);
+    File indexFile =
+      appShuffleInfo.getMergedShuffleIndexFile(shuffleId, reduceId);
     try {
       // If we get here, the merged shuffle file should have been properly finalized. Thus we can
       // use the file length to determine the size of the merged shuffle block.
@@ -210,76 +244,51 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
-  /**
-   * The logic here is consistent with
-   * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
-   *      org.apache.spark.storage.BlockId, scala.Option)]]
-   */
-  private File getFile(String appId, String filename) {
-    // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
-    AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
-      appPathsInfo.subDirsPerLocalDir, filename);
-    logger.debug("Get merged file {}", targetFile.getAbsolutePath());
-    return targetFile;
-  }
-
-  private File getMergedShuffleDataFile(AppShuffleId appShuffleId, int reduceId) {
-    String fileName = String.format("%s.data", generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, fileName);
-  }
-
-  private File getMergedShuffleIndexFile(AppShuffleId appShuffleId, int reduceId) {
-    String indexName = String.format("%s.index", generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, indexName);
-  }
-
-  private File getMergedShuffleMetaFile(AppShuffleId appShuffleId, int reduceId) {
-    String metaName = String.format("%s.meta", generateFileName(appShuffleId, reduceId));
-    return getFile(appShuffleId.appId, metaName);
-  }
-
   @Override
   public String[] getMergedBlockDirs(String appId) {
-    AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.get(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    String[] activeLocalDirs = Preconditions.checkNotNull(appPathsInfo.activeLocalDirs,
-      "application " + appId
-      + " active local dirs list has not been updated by any executor registration");
-    return activeLocalDirs;
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    return appShuffleInfo.appPathsInfo.activeLocalDirs;
   }
 
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
-    // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
-    AppPathsInfo appPathsInfo = Preconditions.checkNotNull(appsPathInfo.remove(appId),
-      "application " + appId + " is not registered or NM was restarted.");
-    Iterator<Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>>> iterator =
-      partitions.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<AppShuffleId, Map<Integer, AppShufflePartitionInfo>> entry = iterator.next();
-      AppShuffleId appShuffleId = entry.getKey();
-      if (appId.equals(appShuffleId.appId)) {
-        iterator.remove();
-        for (AppShufflePartitionInfo partitionInfo : entry.getValue().values()) {
+    AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
+    if (null != appShuffleInfo) {
+      mergedShuffleCleaner.execute(
+        () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs));
+    }
+  }
+
+
+  /**
+   * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
+   * If cleanupLocalDirs is true, the merged shuffle files will also be deleted.
+   * The cleanup will be executed in a separate thread.
+   */
+  @VisibleForTesting
+  void closeAndDeletePartitionFilesIfNeeded(
+      AppShuffleInfo appShuffleInfo,
+      boolean cleanupLocalDirs) {
+    for (Map<Integer, AppShufflePartitionInfo> partitionMap : appShuffleInfo.partitions.values()) {
+      for (AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
+        synchronized (partitionInfo) {
           partitionInfo.closeAllFiles();
         }
       }
     }
     if (cleanupLocalDirs) {
-      Path[] dirs = Arrays.stream(appPathsInfo.activeLocalDirs)
-        .map(dir -> Paths.get(dir)).toArray(Path[]::new);
-      directoryCleaner.execute(() -> deleteExecutorDirs(dirs));
+      deleteExecutorDirs(appShuffleInfo);
     }
   }
 
   /**
-   * Serially delete local dirs, executed in a separate thread.
+   * Serially delete local dirs.
    */
   @VisibleForTesting
-  void deleteExecutorDirs(Path[] dirs) {
+  void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
+    Path[] dirs = Arrays.stream(appShuffleInfo.appPathsInfo.activeLocalDirs)
+      .map(dir -> Paths.get(dir)).toArray(Path[]::new);
     for (Path localDir : dirs) {
       try {
         if (Files.exists(localDir)) {
@@ -294,10 +303,22 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    final String streamId = String.format("%s_%d_%d_%d",
+      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, msg.shuffleId, msg.mapIndex,
+      msg.reduceId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      // If this Block belongs to a former application attempt, it is considered late,
+      // as only the blocks from the current application attempt will be merged
+      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      throw new IllegalArgumentException(
+        String.format("The attempt id %s in this PushBlockStream message does not match "
+          + "with the current attempt id %s stored in shuffle service for application %s",
+          msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
     // Retrieve merged shuffle file metadata
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
     AppShufflePartitionInfo partitionInfoBeforeCheck =
-      getOrCreateAppShufflePartitionInfo(appShuffleId, msg.reduceId);
+      getOrCreateAppShufflePartitionInfo(appShuffleInfo, msg.shuffleId, msg.reduceId);
     // Here partitionInfo will be null in 2 cases:
     // 1) The request is received for a block that has already been merged, this is possible due
     // to the retry logic.
@@ -338,11 +359,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     final AppShufflePartitionInfo partitionInfo = partitionInfoBeforeCheck != null
       && partitionInfoBeforeCheck.mapTracker.contains(msg.mapIndex) ? null
         : partitionInfoBeforeCheck;
-    final String streamId = String.format("%s_%d_%d_%d",
-      OneForOneBlockPusher.SHUFFLE_PUSH_BLOCK_PREFIX, appShuffleId.shuffleId, msg.mapIndex,
-      msg.reduceId);
     if (partitionInfo != null) {
-      return new PushBlockStreamCallback(this, streamId, partitionInfo, msg.mapIndex);
+      return new PushBlockStreamCallback(
+        this, appShuffleInfo, streamId, partitionInfo, msg.mapIndex);
     } else {
       // For a duplicate block or a block which is late, respond back with a callback that handles
       // them differently.
@@ -377,24 +396,31 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
-  @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @Override
   public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) throws IOException {
-    logger.info("Finalizing shuffle {} from Application {}.", msg.shuffleId, msg.appId);
-    AppShuffleId appShuffleId = new AppShuffleId(msg.appId, msg.shuffleId);
-    Map<Integer, AppShufflePartitionInfo> shufflePartitions = partitions.get(appShuffleId);
+    logger.info("Finalizing shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.appAttemptId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      // If this Block belongs to a former application attempt, it is considered late,
+      // as only the blocks from the current application attempt will be merged
+      // TODO: [SPARK-35548] Client should be updated to handle this error.
+      throw new IllegalArgumentException(
+        String.format("The attempt id %s in this FinalizeShuffleMerge message does not match "
+          + "with the current attempt id %s stored in shuffle service for application %s",
+          msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    Map<Integer, AppShufflePartitionInfo> shufflePartitions =
+      appShuffleInfo.partitions.remove(msg.shuffleId);
     MergeStatuses mergeStatuses;
     if (shufflePartitions == null || shufflePartitions.isEmpty()) {
       mergeStatuses =
         new MergeStatuses(msg.shuffleId, new RoaringBitmap[0], new int[0], new long[0]);
     } else {
-      Collection<AppShufflePartitionInfo> partitionsToFinalize = shufflePartitions.values();
-      List<RoaringBitmap> bitmaps = new ArrayList<>(partitionsToFinalize.size());
-      List<Integer> reduceIds = new ArrayList<>(partitionsToFinalize.size());
-      List<Long> sizes = new ArrayList<>(partitionsToFinalize.size());
-      Iterator<AppShufflePartitionInfo> partitionsIter = partitionsToFinalize.iterator();
-      while (partitionsIter.hasNext()) {
-        AppShufflePartitionInfo partition = partitionsIter.next();
+      List<RoaringBitmap> bitmaps = new ArrayList<>(shufflePartitions.size());
+      List<Integer> reduceIds = new ArrayList<>(shufflePartitions.size());
+      List<Long> sizes = new ArrayList<>(shufflePartitions.size());
+      for (AppShufflePartitionInfo partition: shufflePartitions.values()) {
         synchronized (partition) {
           try {
             // This can throw IOException which will marks this shuffle partition as not merged.
@@ -403,13 +429,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             reduceIds.add(partition.reduceId);
             sizes.add(partition.getLastChunkOffset());
           } catch (IOException ioe) {
-            logger.warn("Exception while finalizing shuffle partition {} {} {}", msg.appId,
-              msg.shuffleId, partition.reduceId, ioe);
+            logger.warn("Exception while finalizing shuffle partition {}_{} {} {}", msg.appId,
+              msg.appAttemptId, msg.shuffleId, partition.reduceId, ioe);
           } finally {
             partition.closeAllFiles();
-            // The partition should be removed after the files are written so that any new stream
-            // for the same reduce partition will see that the data file exists.
-            partitionsIter.remove();
           }
         }
       }
@@ -417,8 +440,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         bitmaps.toArray(new RoaringBitmap[bitmaps.size()]), Ints.toArray(reduceIds),
         Longs.toArray(sizes));
     }
-    partitions.remove(appShuffleId);
-    logger.info("Finalized shuffle {} from Application {}.", msg.shuffleId, msg.appId);
+    logger.info("Finalized shuffle {} from Application {}_{}.",
+      msg.shuffleId, msg.appId, msg.appAttemptId);
     return mergeStatuses;
   }
 
@@ -426,15 +449,68 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
     if (logger.isDebugEnabled()) {
       logger.debug("register executor with RemoteBlockPushResolver {} local-dirs {} "
-        + "num sub-dirs {}", appId, Arrays.toString(executorInfo.localDirs),
-          executorInfo.subDirsPerLocalDir);
+        + "num sub-dirs {} shuffleManager {}", appId, Arrays.toString(executorInfo.localDirs),
+        executorInfo.subDirsPerLocalDir, executorInfo.shuffleManager);
+    }
+    String shuffleManagerMeta = executorInfo.shuffleManager;
+    if (shuffleManagerMeta.contains(SHUFFLE_META_DELIMITER)) {
+      String mergeDirInfo =
+        shuffleManagerMeta.substring(shuffleManagerMeta.indexOf(SHUFFLE_META_DELIMITER) + 1);
+      try {
+        ObjectMapper mapper = new ObjectMapper();
+        TypeReference<Map<String, String>> typeRef
+          = new TypeReference<Map<String, String>>(){};
+        Map<String, String> metaMap = mapper.readValue(mergeDirInfo, typeRef);
+        String mergeDir = metaMap.get(MERGE_DIR_KEY);
+        int attemptId = Integer.valueOf(
+          metaMap.getOrDefault(ATTEMPT_ID_KEY, String.valueOf(UNDEFINED_ATTEMPT_ID)));
+        if (mergeDir == null) {
+          throw new IllegalArgumentException(
+            String.format("Failed to get the merge directory information from the " +
+              "shuffleManagerMeta %s in executor registration message", shuffleManagerMeta));
+        }
+        if (attemptId == UNDEFINED_ATTEMPT_ID) {
+          // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo.
+          // Only the first ExecutorRegister message can register the merge dirs
+          appsShuffleInfo.computeIfAbsent(appId, id ->
+            new AppShuffleInfo(
+              appId, UNDEFINED_ATTEMPT_ID,
+              new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDir, executorInfo.subDirsPerLocalDir)
+            ));
+        } else {
+          // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo.
+          // The first ExecutorRegister message from the same application attempt wil register
+          // the merge dirs in External Shuffle Service. Any later ExecutorRegister message
+          // from the same application attempt will not override the merge dirs. But it can
+          // be overridden by ExecutorRegister message from newer application attempt,
+          // and former attempts' shuffle partitions information will also be cleaned up.
+          AtomicReference<AppShuffleInfo> originalAppShuffleInfo = new AtomicReference<>();
+          appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
+            if (appShuffleInfo == null || attemptId > appShuffleInfo.attemptId) {
+              originalAppShuffleInfo.set(appShuffleInfo);
+              appShuffleInfo =
+                new AppShuffleInfo(
+                  appId, attemptId,
+                  new AppPathsInfo(appId, executorInfo.localDirs,
+                    mergeDir, executorInfo.subDirsPerLocalDir));
+            }
+            return appShuffleInfo;
+          });
+          if (originalAppShuffleInfo.get() != null) {
+            AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get();
+            logger.warn("Cleanup shuffle info and merged shuffle files for {}_{} as new " +
+                "application attempt registered", appId, appShuffleInfo.attemptId);
+            mergedShuffleCleaner.execute(
+              () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, true));
+          }
+        }
+      } catch (JsonProcessingException e) {
+        logger.warn("Failed to get the merge directory information from ExecutorShuffleInfo: ", e);
+      }
+    } else {
+      logger.warn("ExecutorShuffleInfo does not have the expected merge directory information");
     }
-    appsPathInfo.computeIfAbsent(appId, id -> new AppPathsInfo(appId, executorInfo.localDirs,
-      executorInfo.subDirsPerLocalDir));
-  }
-  private static String generateFileName(AppShuffleId appShuffleId, int reduceId) {
-    return String.format("%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appShuffleId.appId,
-      appShuffleId.shuffleId, reduceId);
   }
 
   /**
@@ -443,6 +519,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   static class PushBlockStreamCallback implements StreamCallbackWithID {
 
     private final RemoteBlockPushResolver mergeManager;
+    private final AppShuffleInfo appShuffleInfo;
     private final String streamId;
     private final int mapIndex;
     private final AppShufflePartitionInfo partitionInfo;
@@ -457,12 +534,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
     private PushBlockStreamCallback(
         RemoteBlockPushResolver mergeManager,
+        AppShuffleInfo appShuffleInfo,
         String streamId,
         AppShufflePartitionInfo partitionInfo,
         int mapIndex) {
-      this.mergeManager = Preconditions.checkNotNull(mergeManager);
+      Preconditions.checkArgument(mergeManager != null);
+      this.mergeManager = mergeManager;
+      Preconditions.checkArgument(appShuffleInfo != null);
+      this.appShuffleInfo = appShuffleInfo;
       this.streamId = streamId;
-      this.partitionInfo = Preconditions.checkNotNull(partitionInfo);
+      Preconditions.checkArgument(partitionInfo != null);
+      this.partitionInfo = partitionInfo;
       this.mapIndex = mapIndex;
       abortIfNecessary();
     }
@@ -482,7 +564,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       while (buf.hasRemaining()) {
         long updatedPos = partitionInfo.getDataFilePos() + length;
         logger.debug("{} shuffleId {} reduceId {} current pos {} updated pos {}",
-          partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
+          partitionInfo.appId, partitionInfo.shuffleId,
           partitionInfo.reduceId, partitionInfo.getDataFilePos(), updatedPos);
         length += partitionInfo.dataChannel.write(buf, updatedPos);
       }
@@ -567,7 +649,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       // memory, while still providing the necessary guarantee.
       synchronized (partitionInfo) {
         Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-          mergeManager.partitions.get(partitionInfo.appShuffleId);
+          appShuffleInfo.partitions.get(partitionInfo.shuffleId);
         // If the partitionInfo corresponding to (appId, shuffleId, reduceId) is no longer present
         // then it means that the shuffle merge has already been finalized. We should thus ignore
         // the data and just drain the remaining bytes of this message. This check should be
@@ -587,7 +669,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           }
           abortIfNecessary();
           logger.trace("{} shuffleId {} reduceId {} onData writable",
-            partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
+            partitionInfo.appId, partitionInfo.shuffleId,
             partitionInfo.reduceId);
           if (partitionInfo.getCurrentMapIndex() < 0) {
             partitionInfo.setCurrentMapIndex(mapIndex);
@@ -609,7 +691,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           }
         } else {
           logger.trace("{} shuffleId {} reduceId {} onData deferred",
-            partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
+            partitionInfo.appId, partitionInfo.shuffleId,
             partitionInfo.reduceId);
           // If we cannot write to disk, we buffer the current block chunk in memory so it could
           // potentially be written to disk later. We take our best effort without guarantee
@@ -644,10 +726,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     public void onComplete(String streamId) throws IOException {
       synchronized (partitionInfo) {
         logger.trace("{} shuffleId {} reduceId {} onComplete invoked",
-          partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
+          partitionInfo.appId, partitionInfo.shuffleId,
           partitionInfo.reduceId);
         Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-          mergeManager.partitions.get(partitionInfo.appShuffleId);
+          appShuffleInfo.partitions.get(partitionInfo.shuffleId);
         // When this request initially got to the server, the shuffle merge finalize request
         // was not received yet. By the time we finish reading this message, the shuffle merge
         // however is already finalized. We should thus respond RpcFailure to the client.
@@ -724,10 +806,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       if (isWriting) {
         synchronized (partitionInfo) {
           Map<Integer, AppShufflePartitionInfo> shufflePartitions =
-            mergeManager.partitions.get(partitionInfo.appShuffleId);
+            appShuffleInfo.partitions.get(partitionInfo.shuffleId);
           if (shufflePartitions != null && shufflePartitions.containsKey(partitionInfo.reduceId)) {
             logger.debug("{} shuffleId {} reduceId {} encountered failure",
-              partitionInfo.appShuffleId.appId, partitionInfo.appShuffleId.shuffleId,
+              partitionInfo.appId, partitionInfo.shuffleId,
               partitionInfo.reduceId);
             partitionInfo.setCurrentMapIndex(-1);
           }
@@ -742,63 +824,25 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
-  /**
-   * ID that uniquely identifies a shuffle for an application. This is used as a key in
-   * {@link #partitions}.
-   */
-  public static class AppShuffleId {
-    public final String appId;
-    public final int shuffleId;
-
-    AppShuffleId(String appId, int shuffleId) {
-      this.appId = appId;
-      this.shuffleId = shuffleId;
-    }
-
-    @Override
-    public boolean equals(Object o) {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-      AppShuffleId that = (AppShuffleId) o;
-      return shuffleId == that.shuffleId && Objects.equal(appId, that.appId);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hashCode(appId, shuffleId);
-    }
-
-    @Override
-    public String toString() {
-      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
-        .append("appId", appId)
-        .append("shuffleId", shuffleId)
-        .toString();
-    }
-  }
-
   /** Metadata tracked for an actively merged shuffle partition */
   public static class AppShufflePartitionInfo {
 
-    private final AppShuffleId appShuffleId;
+    private final String appId;
+    private final int shuffleId;
     private final int reduceId;
     // The merged shuffle data file channel
-    public FileChannel dataChannel;
+    public final FileChannel dataChannel;
+    // The index file for a particular merged shuffle contains the chunk offsets.
+    private final MergeShuffleFile indexFile;
+    // The meta file for a particular merged shuffle contains all the map indices that belong to
+    // every chunk. The entry per chunk is a serialized bitmap.
+    private final MergeShuffleFile metaFile;
     // Location offset of the last successfully merged block for this shuffle partition
     private long dataFilePos;
     // Track the map index whose block is being merged for this shuffle partition
     private int currentMapIndex;
     // Bitmap tracking which mapper's blocks have been merged for this shuffle partition
     private RoaringBitmap mapTracker;
-    // The index file for a particular merged shuffle contains the chunk offsets.
-    private MergeShuffleFile indexFile;
-    // The meta file for a particular merged shuffle contains all the map indices that belong to
-    // every chunk. The entry per chunk is a serialized bitmap.
-    private MergeShuffleFile metaFile;
     // The offset for the last chunk tracked in the index file for this shuffle partition
     private long lastChunkOffset;
     private int lastMergedMapIndex = -1;
@@ -808,12 +852,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     private boolean indexMetaUpdateFailed;
 
     AppShufflePartitionInfo(
-        AppShuffleId appShuffleId,
+        String appId,
+        int shuffleId,
         int reduceId,
         File dataFile,
         MergeShuffleFile indexFile,
         MergeShuffleFile metaFile) throws IOException {
-      this.appShuffleId = Preconditions.checkNotNull(appShuffleId, "app shuffle id");
+      Preconditions.checkArgument(appId != null, "app id is null");
+      this.appId = appId;
+      this.shuffleId = shuffleId;
       this.reduceId = reduceId;
       this.dataChannel = new FileOutputStream(dataFile).getChannel();
       this.indexFile = indexFile;
@@ -831,8 +878,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     public void setDataFilePos(long dataFilePos) {
-      logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appShuffleId.appId,
-        appShuffleId.shuffleId, reduceId, this.dataFilePos, dataFilePos);
+      logger.trace("{} shuffleId {} reduceId {} current pos {} update pos {}", appId,
+        shuffleId, reduceId, this.dataFilePos, dataFilePos);
       this.dataFilePos = dataFilePos;
     }
 
@@ -842,7 +889,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
     void setCurrentMapIndex(int mapIndex) {
       logger.trace("{} shuffleId {} reduceId {} updated mapIndex {} current mapIndex {}",
-        appShuffleId.appId, appShuffleId.shuffleId, reduceId, currentMapIndex, mapIndex);
+        appId, shuffleId, reduceId, currentMapIndex, mapIndex);
       this.currentMapIndex = mapIndex;
     }
 
@@ -851,8 +898,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     void blockMerged(int mapIndex) {
-      logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", appShuffleId.appId,
-        appShuffleId.shuffleId, reduceId, mapIndex);
+      logger.debug("{} shuffleId {} reduceId {} updated merging mapIndex {}", appId,
+        shuffleId, reduceId, mapIndex);
       mapTracker.add(mapIndex);
       chunkTracker.add(mapIndex);
       lastMergedMapIndex = mapIndex;
@@ -871,7 +918,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     void updateChunkInfo(long chunkOffset, int mapIndex) throws IOException {
       try {
         logger.trace("{} shuffleId {} reduceId {} index current {} updated {}",
-          appShuffleId.appId, appShuffleId.shuffleId, reduceId, this.lastChunkOffset, chunkOffset);
+          appId, shuffleId, reduceId, this.lastChunkOffset, chunkOffset);
         if (indexMetaUpdateFailed) {
           indexFile.getChannel().position(indexFile.getPos());
         }
@@ -885,8 +932,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         this.lastChunkOffset = chunkOffset;
         indexMetaUpdateFailed = false;
       } catch (IOException ioe) {
-        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appShuffleId.appId,
-          appShuffleId.shuffleId, reduceId);
+        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appId,
+          shuffleId, reduceId);
         indexMetaUpdateFailed = true;
         // Any exception here is propagated to the caller and the caller can decide whether to
         // abort or not.
@@ -900,7 +947,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       }
       chunkTracker.add(mapIndex);
       logger.trace("{} shuffleId {} reduceId {} mapIndex {} write chunk to meta file",
-        appShuffleId.appId, appShuffleId.shuffleId, reduceId, mapIndex);
+        appId, shuffleId, reduceId, mapIndex);
       if (indexMetaUpdateFailed) {
         metaFile.getChannel().position(metaFile.getPos());
       }
@@ -934,35 +981,25 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     void closeAllFiles() {
-      if (dataChannel != null) {
-        try {
+      try {
+        if (dataChannel.isOpen()) {
           dataChannel.close();
-        } catch (IOException ioe) {
-          logger.warn("Error closing data channel for {} shuffleId {} reduceId {}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId);
-        } finally {
-          dataChannel = null;
         }
+      } catch (IOException ioe) {
+        logger.warn("Error closing data channel for {} shuffleId {} reduceId {}",
+          appId, shuffleId, reduceId);
       }
-      if (metaFile != null) {
-        try {
-          metaFile.close();
-        } catch (IOException ioe) {
-          logger.warn("Error closing meta file for {} shuffleId {} reduceId {}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId);
-        } finally {
-          metaFile = null;
-        }
+      try {
+        metaFile.close();
+      } catch (IOException ioe) {
+        logger.warn("Error closing meta file for {} shuffleId {} reduceId {}",
+          appId, shuffleId, reduceId);
       }
-      if (indexFile != null) {
-        try {
-          indexFile.close();
-        } catch (IOException ioe) {
-          logger.warn("Error closing index file for {} shuffleId {} reduceId {}",
-            appShuffleId.appId, appShuffleId.shuffleId, reduceId);
-        } finally {
-          indexFile = null;
-        }
+      try {
+        indexFile.close();
+      } catch (IOException ioe) {
+        logger.warn("Error closing index file for {} shuffleId {} reduceId {}",
+          appId, shuffleId, reduceId);
       }
     }
 
@@ -1003,14 +1040,16 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     private AppPathsInfo(
         String appId,
         String[] localDirs,
+        String mergeDirectory,
         int subDirsPerLocalDir) {
       activeLocalDirs = Arrays.stream(localDirs)
         .map(localDir ->
           // Merge directory is created at the same level as block-manager directory. The list of
-          // local directories that we get from executorShuffleInfo are paths of each
-          // block-manager directory. To find out the merge directory location, we first find the
-          // parent dir and then append the "merger_manager" directory to it.
-          Paths.get(localDir).getParent().resolve(MERGE_MANAGER_DIR).toFile().getPath())
+          // local directories that we get from ExecutorShuffleInfo are paths of each
+          // block-manager directory. The mergeDirectory is the merge directory name that we get
+          // from ExecutorShuffleInfo. To find out the merge directory location, we first find the
+          // parent dir of the block-manager directory and then append merge directory name to it.
+          Paths.get(localDir).getParent().resolve(mergeDirectory).toFile().getPath())
         .toArray(String[]::new);
       this.subDirsPerLocalDir = subDirsPerLocalDir;
       if (logger.isInfoEnabled()) {
@@ -1020,10 +1059,76 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
+  /** Merged Shuffle related information tracked for a specific application attempt */
+  public static class AppShuffleInfo {
+
+    private final String appId;
+    private final int attemptId;
+    private final AppPathsInfo appPathsInfo;
+    private final ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> partitions;
+
+    AppShuffleInfo(
+        String appId,
+        int attemptId,
+        AppPathsInfo appPathsInfo) {
+      this.appId = appId;
+      this.attemptId = attemptId;
+      this.appPathsInfo = appPathsInfo;
+      partitions = new ConcurrentHashMap<>();
+    }
+
+    @VisibleForTesting
+    public ConcurrentMap<Integer, Map<Integer, AppShufflePartitionInfo>> getPartitions() {
+      return partitions;
+    }
+
+    /**
+     * The logic here is consistent with
+     * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
+     *      org.apache.spark.storage.BlockId, scala.Option)]]
+     */
+    private File getFile(String filename) {
+      // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
+      File targetFile = ExecutorDiskUtils.getFile(appPathsInfo.activeLocalDirs,
+        appPathsInfo.subDirsPerLocalDir, filename);
+      logger.debug("Get merged file {}", targetFile.getAbsolutePath());
+      return targetFile;
+    }
+
+    private String generateFileName(
+        String appId,
+        int shuffleId,
+        int reduceId) {
+      return String.format(
+        "%s_%s_%d_%d", MERGED_SHUFFLE_FILE_NAME_PREFIX, appId, shuffleId, reduceId);
+    }
+
+    public File getMergedShuffleDataFile(
+        int shuffleId,
+        int reduceId) {
+      String fileName = String.format("%s.data", generateFileName(appId, shuffleId, reduceId));
+      return getFile(fileName);
+    }
+
+    public File getMergedShuffleIndexFile(
+        int shuffleId,
+        int reduceId) {
+      String indexName = String.format("%s.index", generateFileName(appId, shuffleId, reduceId));
+      return getFile(indexName);
+    }
+
+    public File getMergedShuffleMetaFile(
+        int shuffleId,
+        int reduceId) {
+      String metaName = String.format("%s.meta", generateFileName(appId, shuffleId, reduceId));
+      return getFile(metaName);
+    }
+  }
+
   @VisibleForTesting
   static class MergeShuffleFile {
-    private FileChannel channel;
-    private DataOutputStream dos;
+    private final FileChannel channel;
+    private final DataOutputStream dos;
     private long pos;
 
     @VisibleForTesting
@@ -1044,11 +1149,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     void close() throws IOException {
-      try {
+      if (channel.isOpen()) {
         dos.close();
-      } finally {
-        dos = null;
-        channel = null;
       }
     }
 
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
index b4e7bc4..f123ccb 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/ExecutorShuffleInfo.java
@@ -35,7 +35,12 @@ public class ExecutorShuffleInfo implements Encodable {
   public final String[] localDirs;
   /** Number of subdirectories created within each localDir. */
   public final int subDirsPerLocalDir;
-  /** Shuffle manager (SortShuffleManager) that the executor is using. */
+  /**
+   * Shuffle manager (SortShuffleManager) that the executor is using.
+   * If this string contains semicolon, it will also include the meta information
+   * for push based shuffle in JSON format. Example of the string with semicolon would be:
+   * SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
+   */
   public final String shuffleManager;
 
   @JsonCreator
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
index 31efbb7..f6ab78b 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/FinalizeShuffleMerge.java
@@ -32,12 +32,15 @@ import org.apache.spark.network.protocol.Encoders;
  */
 public class FinalizeShuffleMerge extends BlockTransferMessage {
   public final String appId;
+  public final int appAttemptId;
   public final int shuffleId;
 
   public FinalizeShuffleMerge(
       String appId,
+      int appAttemptId,
       int shuffleId) {
     this.appId = appId;
+    this.appAttemptId = appAttemptId;
     this.shuffleId = shuffleId;
   }
 
@@ -48,13 +51,14 @@ public class FinalizeShuffleMerge extends BlockTransferMessage {
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(appId, shuffleId);
+    return Objects.hashCode(appId, appAttemptId, shuffleId);
   }
 
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
       .append("appId", appId)
+      .append("attemptId", appAttemptId)
       .append("shuffleId", shuffleId)
       .toString();
   }
@@ -64,6 +68,7 @@ public class FinalizeShuffleMerge extends BlockTransferMessage {
     if (other != null && other instanceof FinalizeShuffleMerge) {
       FinalizeShuffleMerge o = (FinalizeShuffleMerge) other;
       return Objects.equal(appId, o.appId)
+        && appAttemptId == appAttemptId
         && shuffleId == o.shuffleId;
     }
     return false;
@@ -71,18 +76,20 @@ public class FinalizeShuffleMerge extends BlockTransferMessage {
 
   @Override
   public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId) + 4;
+    return Encoders.Strings.encodedLength(appId) + 4 + 4;
   }
 
   @Override
   public void encode(ByteBuf buf) {
     Encoders.Strings.encode(buf, appId);
+    buf.writeInt(appAttemptId);
     buf.writeInt(shuffleId);
   }
 
   public static FinalizeShuffleMerge decode(ByteBuf buf) {
     String appId = Encoders.Strings.decode(buf);
+    int attemptId = buf.readInt();
     int shuffleId = buf.readInt();
-    return new FinalizeShuffleMerge(appId, shuffleId);
+    return new FinalizeShuffleMerge(appId, attemptId, shuffleId);
   }
 }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
index 559f88f..d5e1cf2 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/PushBlockStream.java
@@ -19,6 +19,7 @@ package org.apache.spark.network.shuffle.protocol;
 
 import com.google.common.base.Objects;
 import io.netty.buffer.ByteBuf;
+
 import org.apache.commons.lang3.builder.ToStringBuilder;
 import org.apache.commons.lang3.builder.ToStringStyle;
 
@@ -34,6 +35,7 @@ import org.apache.spark.network.protocol.Encoders;
  */
 public class PushBlockStream extends BlockTransferMessage {
   public final String appId;
+  public final int appAttemptId;
   public final int shuffleId;
   public final int mapIndex;
   public final int reduceId;
@@ -41,8 +43,15 @@ public class PushBlockStream extends BlockTransferMessage {
   // blocks to be pushed.
   public final int index;
 
-  public PushBlockStream(String appId, int shuffleId, int mapIndex, int reduceId, int index) {
+  public PushBlockStream(
+      String appId,
+      int appAttemptId,
+      int shuffleId,
+      int mapIndex,
+      int reduceId,
+      int index) {
     this.appId = appId;
+    this.appAttemptId = appAttemptId;
     this.shuffleId = shuffleId;
     this.mapIndex = mapIndex;
     this.reduceId = reduceId;
@@ -56,13 +65,14 @@ public class PushBlockStream extends BlockTransferMessage {
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(appId, shuffleId, mapIndex , reduceId, index);
+    return Objects.hashCode(appId, appAttemptId, shuffleId, mapIndex , reduceId, index);
   }
 
   @Override
   public String toString() {
     return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
       .append("appId", appId)
+      .append("attemptId", appAttemptId)
       .append("shuffleId", shuffleId)
       .append("mapIndex", mapIndex)
       .append("reduceId", reduceId)
@@ -75,6 +85,7 @@ public class PushBlockStream extends BlockTransferMessage {
     if (other != null && other instanceof PushBlockStream) {
       PushBlockStream o = (PushBlockStream) other;
       return Objects.equal(appId, o.appId)
+        && appAttemptId == o.appAttemptId
         && shuffleId == o.shuffleId
         && mapIndex == o.mapIndex
         && reduceId == o.reduceId
@@ -85,12 +96,13 @@ public class PushBlockStream extends BlockTransferMessage {
 
   @Override
   public int encodedLength() {
-    return Encoders.Strings.encodedLength(appId) + 16;
+    return Encoders.Strings.encodedLength(appId) + 4 + 4 + 4 + 4 + 4;
   }
 
   @Override
   public void encode(ByteBuf buf) {
     Encoders.Strings.encode(buf, appId);
+    buf.writeInt(appAttemptId);
     buf.writeInt(shuffleId);
     buf.writeInt(mapIndex);
     buf.writeInt(reduceId);
@@ -99,10 +111,11 @@ public class PushBlockStream extends BlockTransferMessage {
 
   public static PushBlockStream decode(ByteBuf buf) {
     String appId = Encoders.Strings.decode(buf);
+    int attemptId = buf.readInt();
     int shuffleId = buf.readInt();
     int mapIdx = buf.readInt();
     int reduceId = buf.readInt();
     int index = buf.readInt();
-    return new PushBlockStream(appId, shuffleId, mapIdx, reduceId, index);
+    return new PushBlockStream(appId, attemptId, shuffleId, mapIdx, reduceId, index);
   }
 }
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
index dc41e95..00756b1 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java
@@ -243,7 +243,7 @@ public class ExternalBlockHandlerSuite {
   public void testFinalizeShuffleMerge() throws IOException {
     RpcResponseCallback callback = mock(RpcResponseCallback.class);
 
-    FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 0);
+    FinalizeShuffleMerge req = new FinalizeShuffleMerge("app0", 1, 0);
     RoaringBitmap bitmap = RoaringBitmap.bitmapOf(0, 1, 2);
     MergeStatuses statuses = new MergeStatuses(0, new RoaringBitmap[]{bitmap},
       new int[]{3}, new long[]{30});
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
index 46a0f6c..e41198f 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/OneForOneBlockPusherSuite.java
@@ -51,7 +51,7 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0)));
+      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0)));
 
     verify(listener).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
   }
@@ -67,9 +67,9 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
-        new PushBlockStream("app-id", 0, 1, 0, 1),
-        new PushBlockStream("app-id", 0, 2, 0, 2)));
+      Arrays.asList(new PushBlockStream("app-id",0,  0, 0, 0, 0),
+        new PushBlockStream("app-id", 0, 0, 1, 0, 1),
+        new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_1_0"), any());
@@ -87,9 +87,9 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
-        new PushBlockStream("app-id", 0, 1, 0, 1),
-        new PushBlockStream("app-id", 0, 2, 0, 2)));
+      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
+        new PushBlockStream("app-id", 0, 0, 1, 0, 1),
+        new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
     verify(listener, times(1)).onBlockFetchFailure(eq("shufflePush_0_1_0"), any());
@@ -107,9 +107,9 @@ public class OneForOneBlockPusherSuite {
     BlockFetchingListener listener = pushBlocks(
       blocks,
       blockIds,
-      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0),
-        new PushBlockStream("app-id", 0, 1, 0, 1),
-        new PushBlockStream("app-id", 0, 2, 0, 2)));
+      Arrays.asList(new PushBlockStream("app-id", 0, 0, 0, 0, 0),
+        new PushBlockStream("app-id", 0, 0, 1, 0, 1),
+        new PushBlockStream("app-id", 0, 0, 2, 0, 2)));
 
     verify(listener, times(1)).onBlockFetchSuccess(eq("shufflePush_0_0_0"), any());
     verify(listener, times(0)).onBlockFetchSuccess(not(eq("shufflePush_0_0_0")), any());
@@ -130,7 +130,7 @@ public class OneForOneBlockPusherSuite {
     TransportClient client = mock(TransportClient.class);
     BlockFetchingListener listener = mock(BlockFetchingListener.class);
     OneForOneBlockPusher pusher =
-      new OneForOneBlockPusher(client, "app-id", blockIds, listener, blocks);
+      new OneForOneBlockPusher(client, "app-id", 0, blockIds, listener, blocks);
 
     Iterator<Map.Entry<String, ManagedBuffer>> blockIterator = blocks.entrySet().iterator();
     Iterator<BlockTransferMessage> msgIterator = expectMessages.iterator();
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 565d433..2a73aa5 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -22,11 +22,13 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.util.Arrays;
+import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -61,6 +63,17 @@ public class RemoteBlockPushResolverSuite {
 
   private static final Logger log = LoggerFactory.getLogger(RemoteBlockPushResolverSuite.class);
   private final String TEST_APP = "testApp";
+  private final String MERGE_DIRECTORY = "merge_manager";
+  private final int NO_ATTEMPT_ID = -1;
+  private final int ATTEMPT_ID_1 = 1;
+  private final int ATTEMPT_ID_2 = 2;
+  private final String MERGE_DIRECTORY_META = "shuffleManager:{\"mergeDir\": \"merge_manager\"}";
+  private final String MERGE_DIRECTORY_META_1 =
+    "shuffleManager:{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}";
+  private final String MERGE_DIRECTORY_META_2 =
+    "shuffleManager:{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}";
+  private final String INVALID_MERGE_DIRECTORY_META =
+          "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"}";
   private final String BLOCK_MANAGER_DIR = "blockmgr-193d8401";
 
   private TransportConf conf;
@@ -74,7 +87,7 @@ public class RemoteBlockPushResolverSuite {
       ImmutableMap.of("spark.shuffle.server.minChunkSizeInMergedShuffleFile", "4"));
     conf = new TransportConf("shuffle", provider);
     pushResolver = new RemoteBlockPushResolver(conf);
-    registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+    registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META);
   }
 
   @After
@@ -106,9 +119,9 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4])),
       new PushBlock(0, 1, 0, ByteBuffer.wrap(new byte[5]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 5}, new int[][]{{0}, {1}});
@@ -122,9 +135,9 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 2, 0, ByteBuffer.wrap(new byte[5])),
       new PushBlock(0, 3, 0, ByteBuffer.wrap(new byte[3]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {13});
     MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, meta, new int[]{5, 5, 3}, new int[][]{{0, 1}, {2}, {3}});
@@ -138,9 +151,9 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])),
       new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0, 1}, new long[] {5, 8});
     MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, meta, new int[]{5}, new int[][]{{0, 1}});
@@ -149,10 +162,12 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDeferredBufsAreWrittenDuringOnData() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
     // stream 1 now completes
@@ -161,7 +176,7 @@ public class RemoteBlockPushResolverSuite {
     // stream 2 has more data and then completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}});
   }
@@ -169,10 +184,12 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDeferredBufsAreWrittenDuringOnComplete() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[3]));
@@ -181,7 +198,7 @@ public class RemoteBlockPushResolverSuite {
     stream1.onComplete(stream1.getID());
     // stream 2 now completes completes
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4, 6}, new int[][]{{0}, {1}});
   }
@@ -189,17 +206,19 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDuplicateBlocksAreIgnoredWhenPrevStreamHasCompleted() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     // This should be ignored
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
   }
@@ -207,10 +226,12 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     // This should be ignored
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[2]));
@@ -219,7 +240,7 @@ public class RemoteBlockPushResolverSuite {
     stream1.onComplete(stream1.getID());
     // stream 2 now completes completes
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
   }
@@ -227,10 +248,11 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testFailureAfterData() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
   }
@@ -238,12 +260,13 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testFailureAfterMultipleDataBlocks() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     assertEquals("num-chunks", 0, blockMeta.getNumChunks());
   }
@@ -251,39 +274,39 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testFailureAfterComplete() throws IOException {
     StreamCallbackWithID stream =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[2]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[3]));
     stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
     stream.onComplete(stream.getID());
     stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
   }
 
-  @Test (expected = RuntimeException.class)
-  public void testTooLateArrival() throws IOException {
+  @Test(expected = RuntimeException.class)
+  public void testBlockReceivedAfterMergeFinalize() throws IOException {
     ByteBuffer[] blocks = new ByteBuffer[]{
       ByteBuffer.wrap(new byte[4]),
       ByteBuffer.wrap(new byte[5])
     };
     StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     for (ByteBuffer block : blocks) {
       stream.onData(stream.getID(), block);
     }
     stream.onComplete(stream.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[4]));
     try {
       stream1.onComplete(stream1.getID());
     } catch (RuntimeException re) {
       assertEquals(
-        "Block shufflePush_0_1_0 received after merged shuffle is finalized",
-          re.getMessage());
+        "Block shufflePush_0_1_0 received after merged shuffle is finalized", re.getMessage());
       MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
       validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{9}, new int[][]{{0}});
       throw re;
@@ -292,28 +315,31 @@ public class RemoteBlockPushResolverSuite {
 
   @Test
   public void testIncompleteStreamsAreOverwritten() throws IOException {
-    registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+    registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META);
     byte[] expectedBytes = new byte[4];
     ThreadLocalRandom.current().nextBytes(expectedBytes);
 
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     byte[] data = new byte[10];
     ThreadLocalRandom.current().nextBytes(data);
     stream1.onData(stream1.getID(), ByteBuffer.wrap(data));
     // There is a failure
     stream1.onFailure(stream1.getID(), new RuntimeException("forced error"));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     ByteBuffer nextBuf= ByteBuffer.wrap(expectedBytes, 0, 2);
     stream2.onData(stream2.getID(), nextBuf);
     stream2.onComplete(stream2.getID());
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     nextBuf =  ByteBuffer.wrap(expectedBytes, 2, 2);
     stream3.onData(stream3.getID(), nextBuf);
     stream3.onComplete(stream3.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[]{4}, new int[][]{{1, 2}});
     FileSegmentManagedBuffer mb =
@@ -321,13 +347,15 @@ public class RemoteBlockPushResolverSuite {
     assertArrayEquals(expectedBytes, mb.nioByteBuffer().array());
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testCollision() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // This should be deferred
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
     // Since stream2 didn't get any opportunity it will throw couldn't find opportunity error
@@ -341,17 +369,20 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testFailureInAStreamDoesNotInterfereWithStreamWhichIsWriting() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     // There is a failure with stream2
     stream2.onFailure(stream2.getID(), new RuntimeException("forced error"));
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     // This should be deferred
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[5]));
     // Since this stream didn't get any opportunity it will throw couldn't find opportunity error
@@ -368,7 +399,7 @@ public class RemoteBlockPushResolverSuite {
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
 
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4}, new int[][] {{0}});
     if (failedEx != null) {
@@ -376,28 +407,83 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test(expected = NullPointerException.class)
+  @Test(expected = IllegalArgumentException.class)
   public void testUpdateLocalDirsOnlyOnce() throws IOException {
     String testApp = "updateLocalDirsOnlyOnceTest";
     Path[] activeLocalDirs = createLocalDirs(1);
-    registerExecutor(testApp, prepareLocalDirs(activeLocalDirs));
+    registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, MERGE_DIRECTORY),
+      MERGE_DIRECTORY_META);
     assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
     assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
       activeLocalDirs[0].toFile().getPath()));
-    // Any later executor register from the same application should not change the active local
-    // dirs list
+    // Any later executor register from the same application attempt should not change the active
+    // local dirs list
     Path[] updatedLocalDirs = localDirs;
-    registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs));
+    registerExecutor(testApp, prepareLocalDirs(updatedLocalDirs, MERGE_DIRECTORY),
+      MERGE_DIRECTORY_META);
     assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
     assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
       activeLocalDirs[0].toFile().getPath()));
     removeApplication(testApp);
     try {
       pushResolver.getMergedBlockDirs(testApp);
-    } catch (Throwable e) {
-      assertTrue(e.getMessage()
-        .startsWith("application " + testApp + " is not registered or NM was restarted."));
-      Throwables.propagate(e);
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(),
+        "application " + testApp + " is not registered or NM was restarted.");
+      throw e;
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExecutorRegisterWithInvalidJsonForPushShuffle() throws IOException {
+    String testApp = "executorRegisterWithInvalidShuffleManagerMeta";
+    Path[] activeLocalDirs = createLocalDirs(1);
+    try {
+      registerExecutor(testApp, prepareLocalDirs(activeLocalDirs, MERGE_DIRECTORY),
+        INVALID_MERGE_DIRECTORY_META);
+    } catch (IllegalArgumentException re) {
+      assertEquals(
+        "Failed to get the merge directory information from the shuffleManagerMeta " +
+          "shuffleManager:{\"mergeDirInvalid\": \"merge_manager_2\", \"attemptId\": \"2\"} in " +
+          "executor registration message", re.getMessage());
+      throw re;
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testExecutorRegistrationFromTwoAppAttempts() throws IOException {
+    String testApp = "testExecutorRegistrationFromTwoAppAttempts";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
+    assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
+      attempt1LocalDirs[0].toFile().getPath()));
+    // Any later executor register from the same application attempt should not change the active
+    // local dirs list
+    Path[] attempt1UpdatedLocalDirs = localDirs;
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1UpdatedLocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 1);
+    assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
+      attempt1LocalDirs[0].toFile().getPath()));
+    // But a new attempt from the same application can change the active local dirs list
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    assertEquals(pushResolver.getMergedBlockDirs(testApp).length, 2);
+    assertTrue(pushResolver.getMergedBlockDirs(testApp)[0].contains(
+      attempt2LocalDirs[0].toFile().getPath()));
+    removeApplication(testApp);
+    try {
+      pushResolver.getMergedBlockDirs(testApp);
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(),
+        "application " + testApp + " is not registered or NM was restarted.");
+      throw e;
     }
   }
 
@@ -407,17 +493,18 @@ public class RemoteBlockPushResolverSuite {
     Semaphore deleted = new Semaphore(0);
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
-      void deleteExecutorDirs(Path[] dirs) {
-        super.deleteExecutorDirs(dirs);
+      void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
+        super.deleteExecutorDirs(appShuffleInfo);
         deleted.release();
       }
     };
+
     Path[] activeDirs = createLocalDirs(1);
-    registerExecutor(testApp, prepareLocalDirs(activeDirs));
+    registerExecutor(testApp, prepareLocalDirs(activeDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META);
     PushBlock[] pushBlocks = new PushBlock[] {
       new PushBlock(0, 0, 0, ByteBuffer.wrap(new byte[4]))};
-    pushBlockHelper(testApp, pushBlocks);
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, 0));
+    pushBlockHelper(testApp, NO_ATTEMPT_ID, pushBlocks);
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(testApp, 0, 0);
     validateChunks(testApp, 0, 0, blockMeta, new int[]{4}, new int[][]{{0}});
     String[] mergeDirs = pushResolver.getMergedBlockDirs(testApp);
@@ -435,7 +522,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
@@ -443,7 +530,7 @@ public class RemoteBlockPushResolverSuite {
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
     testIndexFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of IOExceptions are less than
     // the threshold but the update to index file will be unsuccessful.
@@ -452,12 +539,12 @@ public class RemoteBlockPushResolverSuite {
     // Restore the index stream so it can write successfully again.
     testIndexFile.restore();
     StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
     callback3.onComplete(callback3.getID());
     assertEquals("index position", 24, testIndexFile.getPos());
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}});
@@ -468,7 +555,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
@@ -476,7 +563,7 @@ public class RemoteBlockPushResolverSuite {
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
     testIndexFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of IOExceptions are less than
     // the threshold but the update to index file will be unsuccessful.
@@ -486,7 +573,7 @@ public class RemoteBlockPushResolverSuite {
     // Restore the index stream so it can write successfully again.
     testIndexFile.restore();
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     assertEquals("index position", 24, testIndexFile.getPos());
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
@@ -498,7 +585,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(false, true);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
@@ -507,7 +594,7 @@ public class RemoteBlockPushResolverSuite {
     long metaPosBeforeClose = testMetaFile.getPos();
     testMetaFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of IOExceptions are less than
     // the threshold but the update to index and meta file will be unsuccessful.
@@ -517,13 +604,13 @@ public class RemoteBlockPushResolverSuite {
     // Restore the meta stream so it can write successfully again.
     testMetaFile.restore();
     StreamCallbackWithID callback3 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     callback3.onData(callback3.getID(), ByteBuffer.wrap(new byte[2]));
     callback3.onComplete(callback3.getID());
     assertEquals("index position", 24, partitionInfo.getIndexFile().getPos());
     assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {0}, new long[] {11});
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 7}, new int[][] {{0}, {1, 2}});
@@ -534,7 +621,7 @@ public class RemoteBlockPushResolverSuite {
     useTestFiles(false, true);
     RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[4]));
     callback1.onComplete(callback1.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback1.getPartitionInfo();
@@ -543,7 +630,7 @@ public class RemoteBlockPushResolverSuite {
     long metaPosBeforeClose = testMetaFile.getPos();
     testMetaFile.close();
     StreamCallbackWithID callback2 = pushResolver.receiveBlockDataAsStream(
-      new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This will complete without any IOExceptions because number of IOExceptions are less than
     // the threshold but the update to index and meta file will be unsuccessful.
@@ -554,7 +641,7 @@ public class RemoteBlockPushResolverSuite {
     // Restore the meta stream so it can write successfully again.
     testMetaFile.restore();
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     assertEquals("index position", 24, indexFile.getPos());
     assertTrue("meta position", testMetaFile.getPos() > metaPosBeforeClose);
     validateMergeStatuses(statuses, new int[] {0}, new long[] {9});
@@ -562,11 +649,11 @@ public class RemoteBlockPushResolverSuite {
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {4, 5}, new int[][] {{0}, {1}});
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testIOExceptionsExceededThreshold() throws IOException {
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
     callback.onComplete(callback.getID());
@@ -575,7 +662,7 @@ public class RemoteBlockPushResolverSuite {
     for (int i = 1; i < 5; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       try {
         callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[2]));
       } catch (IOException ioe) {
@@ -588,7 +675,7 @@ public class RemoteBlockPushResolverSuite {
     try {
       RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0));
       callback2.onData(callback.getID(), ByteBuffer.wrap(new byte[1]));
     } catch (Throwable t) {
       assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_5_0",
@@ -597,12 +684,12 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testIOExceptionsDuringMetaUpdateIncreasesExceptionCount() throws IOException {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
     callback.onComplete(callback.getID());
@@ -611,7 +698,7 @@ public class RemoteBlockPushResolverSuite {
     for (int i = 1; i < 5; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
       // This will complete without any exceptions but the exception count is increased.
       callback1.onComplete(callback1.getID());
@@ -622,7 +709,7 @@ public class RemoteBlockPushResolverSuite {
     try {
       RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 5, 0, 0));
       callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[4]));
       callback2.onComplete(callback2.getID());
     } catch (Throwable t) {
@@ -632,7 +719,7 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testRequestForAbortedShufflePartitionThrowsException() {
     try {
       testIOExceptionsDuringMetaUpdateIncreasesExceptionCount();
@@ -641,7 +728,7 @@ public class RemoteBlockPushResolverSuite {
     }
     try {
       pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 10, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 10, 0, 0));
     } catch (Throwable t) {
       assertEquals("IOExceptions exceeded the threshold when merging shufflePush_0_10_0",
         t.getMessage());
@@ -649,19 +736,19 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testPendingBlockIsAbortedImmediately() throws IOException {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
     testIndexFile.close();
     for (int i = 1; i < 6; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       try {
         callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
         // This will complete without any exceptions but the exception count is increased.
@@ -682,19 +769,19 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testWritingPendingBufsIsAbortedImmediatelyDuringComplete() throws IOException {
     useTestFiles(true, false);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
     TestMergeShuffleFile testIndexFile = (TestMergeShuffleFile) partitionInfo.getIndexFile();
     testIndexFile.close();
     for (int i = 1; i < 5; i++) {
       RemoteBlockPushResolver.PushBlockStreamCallback callback1 =
         (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-          new PushBlockStream(TEST_APP, 0, i, 0, 0));
+          new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, i, 0, 0));
       try {
         callback1.onData(callback1.getID(), ByteBuffer.wrap(new byte[5]));
         // This will complete without any exceptions but the exception count is increased.
@@ -706,7 +793,7 @@ public class RemoteBlockPushResolverSuite {
     assertEquals(4, partitionInfo.getNumIOExceptions());
     RemoteBlockPushResolver.PushBlockStreamCallback callback2 =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 5, 0, 0));
+        new PushBlockStream(TEST_APP, 1, 0, 5, 0, 0));
     callback2.onData(callback2.getID(), ByteBuffer.wrap(new byte[5]));
     // This is deferred
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[4]));
@@ -738,10 +825,10 @@ public class RemoteBlockPushResolverSuite {
       new PushBlock(0, 0, 1, ByteBuffer.wrap(new byte[5])),
       new PushBlock(0, 1, 1, ByteBuffer.wrap(new byte[3]))
     };
-    pushBlockHelper(TEST_APP, pushBlocks);
+    pushBlockHelper(TEST_APP, NO_ATTEMPT_ID, pushBlocks);
     RemoteBlockPushResolver.PushBlockStreamCallback callback =
       (RemoteBlockPushResolver.PushBlockStreamCallback) pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     callback.onData(callback.getID(), ByteBuffer.wrap(new byte[2]));
     callback.onComplete(callback.getID());
     RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo = callback.getPartitionInfo();
@@ -749,7 +836,7 @@ public class RemoteBlockPushResolverSuite {
     // Close the index file so truncate throws IOException
     testIndexFile.close();
     MergeStatuses statuses = pushResolver.finalizeShuffleMerge(
-      new FinalizeShuffleMerge(TEST_APP, 0));
+      new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     validateMergeStatuses(statuses, new int[] {1}, new long[] {8});
     MergedBlockMeta meta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 1);
     validateChunks(TEST_APP, 0, 1, meta, new int[]{5, 3}, new int[][]{{0},{1}});
@@ -758,46 +845,53 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testOnFailureInvokedMoreThanOncePerBlock() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onFailure(stream1.getID(), new RuntimeException("forced error"));
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
     // On failure on stream1 gets invoked again and should cause no interference
     stream1.onFailure(stream1.getID(), new RuntimeException("2nd forced error"));
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 3, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 3, 0, 0));
     // This should be deferred as stream 2 is still the active stream
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
     // Stream 2 writes more and completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
     stream2.onComplete(stream2.getID());
     stream3.onComplete(stream3.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {9, 2}, new int[][] {{1},{3}});
     removeApplication(TEST_APP);
   }
 
-  @Test (expected = RuntimeException.class)
+  @Test(expected = RuntimeException.class)
   public void testFailureAfterDuplicateBlockDoesNotInterfereActiveStream() throws IOException {
     StreamCallbackWithID stream1 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     StreamCallbackWithID stream1Duplicate =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 0, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0));
     stream1.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
     stream1.onComplete(stream1.getID());
     stream1Duplicate.onData(stream1.getID(), ByteBuffer.wrap(new byte[2]));
 
     StreamCallbackWithID stream2 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 1, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 1, 0, 0));
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[5]));
     // Should not change the current map id of the reduce partition
     stream1Duplicate.onFailure(stream2.getID(), new RuntimeException("forced error"));
 
     StreamCallbackWithID stream3 =
-      pushResolver.receiveBlockDataAsStream(new PushBlockStream(TEST_APP, 0, 2, 0, 0));
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 2, 0, 0));
     // This should be deferred as stream 2 is still the active stream
     stream3.onData(stream3.getID(), ByteBuffer.wrap(new byte[2]));
     RuntimeException failedEx = null;
@@ -812,7 +906,7 @@ public class RemoteBlockPushResolverSuite {
     // Stream 2 writes more and completes
     stream2.onData(stream2.getID(), ByteBuffer.wrap(new byte[4]));
     stream2.onComplete(stream2.getID());
-    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, 0));
+    pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0));
     MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0);
     validateChunks(TEST_APP, 0, 0, blockMeta, new int[] {11}, new int[][] {{0, 1}});
     removeApplication(TEST_APP);
@@ -821,20 +915,165 @@ public class RemoteBlockPushResolverSuite {
     }
   }
 
+  @Test(expected = IllegalArgumentException.class)
+  public void testPushBlockFromPreviousAttemptIsRejected()
+      throws IOException, InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      void closeAndDeletePartitionFilesIfNeeded(
+        AppShuffleInfo appShuffleInfo,
+        boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs);
+        closed.release();
+      }
+    };
+    String testApp = "testPushBlockFromPreviousAttemptIsRejected";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream1.onData(stream1.getID(), block);
+    }
+    stream1.onComplete(stream1.getID());
+    RemoteBlockPushResolver.AppShuffleInfo appShuffleInfo =
+      pushResolver.validateAndGetAppShuffleInfo(testApp);
+    Map<Integer, Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo>> partitions =
+      appShuffleInfo.getPartitions();
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
+        assertTrue(partitionInfo.getDataChannel().isOpen());
+        assertTrue(partitionInfo.getMetaFile().getChannel().isOpen());
+        assertTrue(partitionInfo.getIndexFile().getChannel().isOpen());
+      }
+    }
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    StreamCallbackWithID stream2 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 2, 0, 1, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream2.onData(stream2.getID(), block);
+    }
+    stream2.onComplete(stream2.getID());
+    closed.acquire();
+    // Check if all the file channels created for the first attempt are safely closed.
+    for (Map<Integer, RemoteBlockPushResolver.AppShufflePartitionInfo> partitionMap :
+        partitions.values()) {
+      for (RemoteBlockPushResolver.AppShufflePartitionInfo partitionInfo : partitionMap.values()) {
+        assertFalse(partitionInfo.getDataChannel().isOpen());
+        assertFalse(partitionInfo.getMetaFile().getChannel().isOpen());
+        assertFalse(partitionInfo.getIndexFile().getChannel().isOpen());
+      }
+    }
+    try {
+      pushResolver.receiveBlockDataAsStream(
+        new PushBlockStream(testApp, 1, 0, 1, 0, 0));
+    } catch (IllegalArgumentException re) {
+      assertEquals(
+        "The attempt id 1 in this PushBlockStream message does not match " +
+          "with the current attempt id 2 stored in shuffle service for application " +
+          testApp, re.getMessage());
+      throw re;
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void testFinalizeShuffleMergeFromPreviousAttemptIsAborted()
+    throws IOException, InterruptedException {
+    String testApp = "testFinalizeShuffleMergeFromPreviousAttemptIsAborted";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    for (ByteBuffer block : blocks) {
+      stream1.onData(stream1.getID(), block);
+    }
+    stream1.onComplete(stream1.getID());
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    try {
+      pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(testApp, ATTEMPT_ID_1, 0));
+    } catch (IllegalArgumentException e) {
+      assertEquals(e.getMessage(),
+        String.format("The attempt id %s in this FinalizeShuffleMerge message does not " +
+          "match with the current attempt id %s stored in shuffle service for application %s",
+          ATTEMPT_ID_1, ATTEMPT_ID_2, testApp));
+      throw e;
+    }
+  }
+
+  @Test(expected = ClosedChannelException.class)
+  public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
+    throws IOException, InterruptedException {
+    Semaphore closed = new Semaphore(0);
+    pushResolver = new RemoteBlockPushResolver(conf) {
+      @Override
+      void closeAndDeletePartitionFilesIfNeeded(
+        AppShuffleInfo appShuffleInfo,
+        boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs);
+        closed.release();
+      }
+    };
+    String testApp = "testOngoingMergeOfBlockFromPreviousAttemptIsAborted";
+    Path[] attempt1LocalDirs = createLocalDirs(1);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt1LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_1),
+      MERGE_DIRECTORY_META_1);
+    ByteBuffer[] blocks = new ByteBuffer[]{
+      ByteBuffer.wrap(new byte[4]),
+      ByteBuffer.wrap(new byte[5]),
+      ByteBuffer.wrap(new byte[6]),
+      ByteBuffer.wrap(new byte[7])
+    };
+    StreamCallbackWithID stream1 = pushResolver.receiveBlockDataAsStream(
+      new PushBlockStream(testApp, 1, 0, 0, 0, 0));
+    // The onData callback should be called 4 times here before the onComplete callback. But a
+    // register executor message arrives in shuffle service after the 2nd onData callback. The 3rd
+    // onData callback should all throw ClosedChannelException as their channels are closed.
+    stream1.onData(stream1.getID(), blocks[0]);
+    stream1.onData(stream1.getID(), blocks[1]);
+    Path[] attempt2LocalDirs = createLocalDirs(2);
+    registerExecutor(testApp,
+      prepareLocalDirs(attempt2LocalDirs, MERGE_DIRECTORY + "_" + ATTEMPT_ID_2),
+      MERGE_DIRECTORY_META_2);
+    closed.acquire();
+    // Should throw ClosedChannelException here.
+    stream1.onData(stream1.getID(), blocks[3]);
+  }
+
   private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
     pushResolver = new RemoteBlockPushResolver(conf) {
       @Override
-      AppShufflePartitionInfo newAppShufflePartitionInfo(AppShuffleId appShuffleId, int reduceId,
-        File dataFile, File indexFile, File metaFile) throws IOException {
+      AppShufflePartitionInfo newAppShufflePartitionInfo(String appId, int shuffleId,
+          int reduceId, File dataFile, File indexFile, File metaFile) throws IOException {
         MergeShuffleFile mergedIndexFile = useTestIndexFile ? new TestMergeShuffleFile(indexFile)
           : new MergeShuffleFile(indexFile);
         MergeShuffleFile mergedMetaFile = useTestMetaFile ? new TestMergeShuffleFile(metaFile) :
           new MergeShuffleFile(metaFile);
-        return new AppShufflePartitionInfo(appShuffleId, reduceId, dataFile, mergedIndexFile,
+        return new AppShufflePartitionInfo(appId, shuffleId, reduceId, dataFile, mergedIndexFile,
           mergedMetaFile);
       }
     };
-    registerExecutor(TEST_APP, prepareLocalDirs(localDirs));
+    registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META);
   }
 
   private Path[] createLocalDirs(int numLocalDirs) throws IOException {
@@ -846,16 +1085,15 @@ public class RemoteBlockPushResolverSuite {
     return localDirs;
   }
 
-  private void registerExecutor(String appId, String[] localDirs) throws IOException {
-    ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, "mergedShuffle");
+  private void registerExecutor(String appId, String[] localDirs, String shuffleManagerMeta) {
+    ExecutorShuffleInfo shuffleInfo = new ExecutorShuffleInfo(localDirs, 1, shuffleManagerMeta);
     pushResolver.registerExecutor(appId, shuffleInfo);
   }
 
-  private String[] prepareLocalDirs(Path[] localDirs) throws IOException {
+  private String[] prepareLocalDirs(Path[] localDirs, String mergeDir) throws IOException {
     String[] blockMgrDirs = new String[localDirs.length];
     for (int i = 0; i< localDirs.length; i++) {
-      Files.createDirectories(localDirs[i].resolve(
-        RemoteBlockPushResolver.MERGE_MANAGER_DIR + File.separator + "00"));
+      Files.createDirectories(localDirs[i].resolve(mergeDir + File.separator + "00"));
       blockMgrDirs[i] = localDirs[i].toFile().getPath() + File.separator + BLOCK_MANAGER_DIR;
     }
     return blockMgrDirs;
@@ -898,10 +1136,12 @@ public class RemoteBlockPushResolverSuite {
 
   private void pushBlockHelper(
       String appId,
+      int attemptId,
       PushBlock[] blocks) throws IOException {
     for (int i = 0; i < blocks.length; i++) {
       StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream(
-        new PushBlockStream(appId, blocks[i].shuffleId, blocks[i].mapIndex, blocks[i].reduceId, 0));
+        new PushBlockStream(
+          appId, attemptId, blocks[i].shuffleId, blocks[i].mapIndex, blocks[i].reduceId, 0));
       stream.onData(stream.getID(), blocks[i].buffer);
       stream.onComplete(stream.getID());
     }
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ef47252..d11fa55 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -583,6 +583,7 @@ class SparkContext(config: SparkConf) extends Logging {
     _applicationId = _taskScheduler.applicationId()
     _applicationAttemptId = _taskScheduler.applicationAttemptId()
     _conf.set("spark.app.id", _applicationId)
+    _applicationAttemptId.foreach(attemptId => _conf.set(APP_ATTEMPT_ID, attemptId))
     if (_conf.get(UI_REVERSE_PROXY)) {
       val proxyUrl = _conf.get(UI_REVERSE_PROXY_URL.key, "").stripSuffix("/") +
         "/proxy/" + _applicationId
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 3ef964f..39c526c 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -2244,4 +2244,14 @@ package object config {
       .stringConf
       .toSequence
       .createWithDefault(Nil)
+
+  private[spark] val APP_ATTEMPT_ID =
+    ConfigBuilder("spark.app.attempt.id")
+      .internal()
+      .doc("The application attempt Id assigned from Hadoop YARN. " +
+        "When the application runs in cluster mode on YARN, there can be " +
+        "multiple attempts before failing the application")
+      .version("3.2.0")
+      .stringConf
+      .createOptional
 }
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 98d0949..43c7baf 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -535,10 +535,17 @@ private[spark] class BlockManager(
 
   private def registerWithExternalShuffleServer(): Unit = {
     logInfo("Registering executor with local external shuffle service.")
+    val shuffleManagerMeta =
+      if (Utils.isPushBasedShuffleEnabled(conf)) {
+        s"${shuffleManager.getClass.getName}:" +
+          s"${diskBlockManager.getMergeDirectoryAndAttemptIDJsonString()}}}"
+      } else {
+        shuffleManager.getClass.getName
+      }
     val shuffleConfig = new ExecutorShuffleInfo(
       diskBlockManager.localDirsString,
       diskBlockManager.subDirsPerLocalDir,
-      shuffleManager.getClass.getName)
+      shuffleManagerMeta)
 
     val MAX_ATTEMPTS = conf.get(config.SHUFFLE_REGISTRATION_MAX_ATTEMPTS)
     val SLEEP_TIME_SECS = 5
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index d49f43f..d92f686 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -21,11 +21,18 @@ import java.io.{File, IOException}
 import java.nio.file.Files
 import java.util.UUID
 
+import scala.collection.mutable.HashMap
+
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
 import org.apache.spark.SparkConf
 import org.apache.spark.executor.ExecutorExitCode
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.network.shuffle.ExecutorDiskUtils
-import org.apache.spark.storage.DiskBlockManager.MERGE_MANAGER_DIR
+import org.apache.spark.storage.DiskBlockManager.ATTEMPT_ID_KEY
+import org.apache.spark.storage.DiskBlockManager.MERGE_DIR_KEY
+import org.apache.spark.storage.DiskBlockManager.MERGE_DIRECTORY
 import org.apache.spark.util.{ShutdownHookManager, Utils}
 
 /**
@@ -57,6 +64,10 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo
   // of subDirs(i) is protected by the lock of subDirs(i)
   private val subDirs = Array.fill(localDirs.length)(new Array[File](subDirsPerLocalDir))
 
+  // Get merge directory name, append attemptId if there is any
+  private val mergeDirName =
+    s"$MERGE_DIRECTORY${conf.get(config.APP_ATTEMPT_ID).map(id => s"_$id").getOrElse("")}"
+
   // Create merge directories
   createLocalDirsForMergedShuffleBlocks()
 
@@ -200,12 +211,12 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo
       // Will create the merge_manager directory only if it doesn't exist under the local dir.
       Utils.getConfiguredLocalDirs(conf).foreach { rootDir =>
         try {
-          val mergeDir = new File(rootDir, MERGE_MANAGER_DIR)
+          val mergeDir = new File(rootDir, mergeDirName)
           if (!mergeDir.exists()) {
             // This executor does not find merge_manager directory, it will try to create
             // the merge_manager directory and the sub directories.
             logDebug(s"Try to create $mergeDir and its sub dirs since the " +
-              s"$MERGE_MANAGER_DIR dir does not exist")
+              s"$mergeDirName dir does not exist")
             for (dirNum <- 0 until subDirsPerLocalDir) {
               val subDir = new File(mergeDir, "%02x".format(dirNum))
               if (!subDir.exists()) {
@@ -219,7 +230,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo
         } catch {
           case e: IOException =>
             logError(
-              s"Failed to create $MERGE_MANAGER_DIR dir in $rootDir. Ignoring this directory.", e)
+              s"Failed to create $mergeDirName dir in $rootDir. Ignoring this directory.", e)
         }
       }
     }
@@ -264,6 +275,17 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo
     }
   }
 
+  def getMergeDirectoryAndAttemptIDJsonString(): String = {
+    val mergedMetaMap: HashMap[String, String] = new HashMap[String, String]()
+    mergedMetaMap.put(MERGE_DIR_KEY, mergeDirName)
+    conf.get(config.APP_ATTEMPT_ID).foreach(
+      attemptId => mergedMetaMap.put(ATTEMPT_ID_KEY, attemptId))
+    val mapper = new ObjectMapper()
+    mapper.registerModule(DefaultScalaModule)
+    val jsonString = mapper.writeValueAsString(mergedMetaMap)
+    jsonString
+  }
+
   private def addShutdownHook(): AnyRef = {
     logDebug("Adding shutdown hook") // force eager creation of logger
     ShutdownHookManager.addShutdownHook(ShutdownHookManager.TEMP_DIR_SHUTDOWN_PRIORITY + 1) { () =>
@@ -303,5 +325,7 @@ private[spark] class DiskBlockManager(conf: SparkConf, var deleteFilesOnStop: Bo
 }
 
 private[spark] object DiskBlockManager {
-  private[spark] val MERGE_MANAGER_DIR = "merge_manager"
+  val MERGE_DIRECTORY = "merge_manager"
+  val MERGE_DIR_KEY = "mergeDir"
+  val ATTEMPT_ID_KEY = "attemptId"
 }
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index b1df7bd..89aa299 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2592,32 +2592,13 @@ private[spark] object Utils extends Logging {
 
   /**
    * Push based shuffle can only be enabled when the application is submitted
-   * to run in YARN mode, with external shuffle service enabled and
-   * spark.yarn.maxAttempts or the yarn cluster default max attempts is set to 1.
-   * TODO: Remove the requirement on spark.yarn.maxAttempts after SPARK-35546
-   * Support push based shuffle with multiple app attempts
+   * to run in YARN mode, with external shuffle service enabled
    */
   def isPushBasedShuffleEnabled(conf: SparkConf): Boolean = {
     conf.get(PUSH_BASED_SHUFFLE_ENABLED) &&
       (conf.get(IS_TESTING).getOrElse(false) ||
         (conf.get(SHUFFLE_SERVICE_ENABLED) &&
-          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn" &&
-          getYarnMaxAttempts(conf) == 1))
-  }
-
-  /**
-   * Returns the maximum number of attempts to register the AM in YARN mode.
-   * TODO: Remove this method after SPARK-35546 Support push based shuffle
-   * with multiple app attempts
-   */
-  def getYarnMaxAttempts(conf: SparkConf): Int = {
-    val sparkMaxAttempts = conf.getOption("spark.yarn.maxAttempts").map(_.toInt)
-    val yarnMaxAttempts = getSparkOrYarnConfig(conf, YarnConfiguration.RM_AM_MAX_ATTEMPTS,
-      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS.toString).toInt
-    sparkMaxAttempts match {
-      case Some(x) => if (x <= yarnMaxAttempts) x else yarnMaxAttempts
-      case None => yarnMaxAttempts
-    }
+          conf.get(SparkLauncher.SPARK_MASTER, null) == "yarn"))
   }
 
   /**
diff --git a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
index 6397c96..0443c40 100644
--- a/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/DiskBlockManagerSuite.scala
@@ -20,7 +20,10 @@ package org.apache.spark.storage
 import java.io.{File, FileWriter}
 import java.nio.file.{Files, Paths}
 import java.nio.file.attribute.PosixFilePermissions
+import java.util.HashMap
 
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.commons.io.FileUtils
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
@@ -91,11 +94,11 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
   }
 
   test("should still create merge directories if one already exists under a local dir") {
-    val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_MANAGER_DIR)
+    val mergeDir0 = new File(rootDir0, DiskBlockManager.MERGE_DIRECTORY)
     if (!mergeDir0.exists()) {
       Files.createDirectories(mergeDir0.toPath)
     }
-    val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_MANAGER_DIR)
+    val mergeDir1 = new File(rootDir1, DiskBlockManager.MERGE_DIRECTORY)
     if (mergeDir1.exists()) {
       Utils.deleteRecursively(mergeDir1)
     }
@@ -104,7 +107,7 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
     testConf.set(config.Tests.IS_TESTING, true)
     diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
     assert(Utils.getConfiguredLocalDirs(testConf).map(
-      rootDir => new File(rootDir, DiskBlockManager.MERGE_MANAGER_DIR))
+      rootDir => new File(rootDir, DiskBlockManager.MERGE_DIRECTORY))
       .filter(mergeDir => mergeDir.exists()).length === 2)
     // mergeDir0 will be skipped as it already exists
     assert(mergeDir0.list().length === 0)
@@ -124,6 +127,20 @@ class DiskBlockManagerSuite extends SparkFunSuite with BeforeAndAfterEach with B
     FileUtils.deleteQuietly(testDir)
   }
 
+  test("Encode merged directory name and attemptId in shuffleManager field") {
+    testConf.set(config.APP_ATTEMPT_ID, "1");
+    diskBlockManager = new DiskBlockManager(testConf, deleteFilesOnStop = true)
+    val mergedShuffleMeta = diskBlockManager.getMergeDirectoryAndAttemptIDJsonString();
+    val mapper: ObjectMapper = new ObjectMapper
+    val typeRef: TypeReference[HashMap[String, String]] =
+      new TypeReference[HashMap[String, String]]() {}
+    val metaMap: HashMap[String, String] = mapper.readValue(mergedShuffleMeta, typeRef)
+    val mergeDir = metaMap.get(DiskBlockManager.MERGE_DIR_KEY)
+    assert(mergeDir.equals(DiskBlockManager.MERGE_DIRECTORY + "_1"))
+    val attemptId = metaMap.get(DiskBlockManager.ATTEMPT_ID_KEY)
+    assert(attemptId.equals("1"))
+  }
+
   def writeToFile(file: File, numBytes: Int): Unit = {
     val writer = new FileWriter(file, true)
     for (i <- 0 until numBytes) writer.write(i)
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index dba7e39..095dbef 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -1450,7 +1450,7 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     conf.set("spark.yarn.maxAttempts", "1")
     assert(Utils.isPushBasedShuffleEnabled(conf) === true)
     conf.set("spark.yarn.maxAttempts", "2")
-    assert(Utils.isPushBasedShuffleEnabled(conf) === false)
+    assert(Utils.isPushBasedShuffleEnabled(conf) === true)
   }
 }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org