You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2022/07/26 02:03:38 UTC

[spark] branch master updated: [SPARK-33236][SHUFFLE] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bb1b218f17 [SPARK-33236][SHUFFLE] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
7bb1b218f17 is described below

commit 7bb1b218f171aa7fcfd804d1eaedd2ff375fc035
Author: Ye Zhou <ye...@linkedin.com>
AuthorDate: Mon Jul 25 21:03:22 2022 -0500

    [SPARK-33236][SHUFFLE] Enable Push-based shuffle service to store state in NM level DB for work preserving restart
    
    ### What changes were proposed in this pull request?
    This PR adds the capability of storing the required information into LevelDB for push based shuffle.
    
    ### Why are the changes needed?
    Without this PR, all the information is currently only stored in memory for push based shuffle in shuffle services. During NodeManager restarts, all these information will be lost. Either all the former merged shuffle data won't be able to serve the fetch requests, nor the shuffle services cannot merge any new push blocks from existing applications. After this patch, those information will be stored in LevelDB, and all the information will be recovered during NodeManager restarts.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Unit testing.
    Deployed to clusters, and restart NMs in 3 different scenarios:
    While a Spark shell application is running, restart NMs in magnet partition in three different scenarios:
    1. After Spark-shell starts, but no scripts running, this will test the NM restart after application/executors register with NMs
    2. While there is on-going shuffle push to shuffle services
    3. While there is on-going merged shuffle fetch from shuffle services.
    Results of the large shuffle testing scripts is identical to the case when there is no NM restart.
    
    Closes #35906 from zhouyejoe/SPARK-33236.
    
    Authored-by: Ye Zhou <ye...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../network/shuffle/ExternalBlockHandler.java      |   5 +-
 .../network/shuffle/MergedShuffleFileManager.java  |   6 +
 .../shuffle/NoOpMergedShuffleFileManager.java      |   3 +-
 .../network/shuffle/RemoteBlockPushResolver.java   | 636 ++++++++++++++++---
 .../shuffle/RemoteBlockPushResolverSuite.java      |  93 ++-
 .../spark/network/yarn/YarnShuffleService.java     |  39 +-
 .../network/shuffle/ShuffleTestAccessor.scala      | 157 ++++-
 .../network/yarn/YarnShuffleServiceSuite.scala     | 690 +++++++++++++++++++--
 8 files changed, 1484 insertions(+), 145 deletions(-)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
index 9b2e7a46caa..4e40090b065 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java
@@ -80,7 +80,7 @@ public class ExternalBlockHandler extends RpcHandler
     throws IOException {
     this(new OneForOneStreamManager(),
       new ExternalShuffleBlockResolver(conf, registeredExecutorFile),
-      new NoOpMergedShuffleFileManager(conf));
+      new NoOpMergedShuffleFileManager(conf, null));
   }
 
   public ExternalBlockHandler(
@@ -101,7 +101,7 @@ public class ExternalBlockHandler extends RpcHandler
   public ExternalBlockHandler(
       OneForOneStreamManager streamManager,
       ExternalShuffleBlockResolver blockManager) {
-    this(streamManager, blockManager, new NoOpMergedShuffleFileManager(null));
+    this(streamManager, blockManager, new NoOpMergedShuffleFileManager(null, null));
   }
 
   /** Enables mocking out the StreamManager, BlockManager, and MergeManager. */
@@ -295,6 +295,7 @@ public class ExternalBlockHandler extends RpcHandler
 
   public void close() {
     blockManager.close();
+    mergeManager.close();
   }
 
   private void checkAuth(TransportClient client, String appId) {
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
index 630386d97da..051684a92d0 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/MergedShuffleFileManager.java
@@ -120,4 +120,10 @@ public interface MergedShuffleFileManager {
    * @param appId application ID
    */
   String[] getMergedBlockDirs(String appId);
+
+  /**
+   * Optionally close any resources associated the MergedShuffleFileManager, such as the
+   * leveldb for state persistence.
+   */
+  default void close() {}
 }
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
index f47bfc3077e..876b1009593 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/NoOpMergedShuffleFileManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.network.shuffle;
 
+import java.io.File;
 import java.io.IOException;
 
 import org.apache.spark.network.buffer.ManagedBuffer;
@@ -38,7 +39,7 @@ public class NoOpMergedShuffleFileManager implements MergedShuffleFileManager {
   // This constructor is needed because we use this constructor to instantiate an implementation
   // of MergedShuffleFileManager using reflection.
   // See YarnShuffleService#newMergedShuffleFileManagerInstance.
-  public NoOpMergedShuffleFileManager(TransportConf transportConf) {}
+  public NoOpMergedShuffleFileManager(TransportConf transportConf, File recoveryFile) {}
 
   @Override
   public StreamCallbackWithID receiveBlockDataAsStream(PushBlockStream msg) {
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
index f4de75e3fc6..677adc76bff 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java
@@ -23,6 +23,7 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
@@ -31,6 +32,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -39,6 +41,9 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 
 
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
@@ -50,6 +55,9 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.cache.Weigher;
 import com.google.common.primitives.Ints;
 import com.google.common.primitives.Longs;
+
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
 import org.roaringbitmap.RoaringBitmap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,6 +73,7 @@ import org.apache.spark.network.shuffle.protocol.FinalizeShuffleMerge;
 import org.apache.spark.network.shuffle.protocol.MergeStatuses;
 import org.apache.spark.network.shuffle.protocol.PushBlockStream;
 import org.apache.spark.network.util.JavaUtils;
+import org.apache.spark.network.util.LevelDBProvider;
 import org.apache.spark.network.util.NettyUtils;
 import org.apache.spark.network.util.TransportConf;
 
@@ -83,18 +92,32 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   public static final String MERGE_DIR_KEY = "mergeDir";
   public static final String ATTEMPT_ID_KEY = "attemptId";
   private static final int UNDEFINED_ATTEMPT_ID = -1;
+  private static final String DB_KEY_DELIMITER = ";";
   private static final ErrorHandler.BlockPushErrorHandler ERROR_HANDLER = createErrorHandler();
   // ByteBuffer to respond to client upon a successful merge of a pushed block
   private static final ByteBuffer SUCCESS_RESPONSE =
     new BlockPushReturnCode(ReturnCode.SUCCESS.id(), "").toByteBuffer().asReadOnlyBuffer();
 
+  private static final ObjectMapper mapper = new ObjectMapper();
+
+  /**
+   * This a common prefix to the key for each app shuffle partition we add to leveldb, so they
+   * are easy to find, since leveldb lets you search based on prefix.
+   */
+  private static final String APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX =
+      "AppAttemptShuffleFinalized";
+  private static final String APP_ATTEMPT_PATH_KEY_PREFIX = "AppAttemptPathInfo";
+  private static final LevelDBProvider.StoreVersion
+      CURRENT_VERSION = new LevelDBProvider.StoreVersion(1, 0);
+
   /**
    * A concurrent hashmap where the key is the applicationId, and the value includes
    * all the merged shuffle information for this application. AppShuffleInfo stores
    * the application attemptId, merged shuffle local directories and the metadata
    * for actively being merged shuffle partitions.
    */
-  private final ConcurrentMap<String, AppShuffleInfo> appsShuffleInfo;
+  @VisibleForTesting
+  final ConcurrentMap<String, AppShuffleInfo> appsShuffleInfo;
 
   private final Executor mergedShuffleCleaner;
   private final TransportConf conf;
@@ -104,8 +127,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   @SuppressWarnings("UnstableApiUsage")
   private final LoadingCache<String, ShuffleIndexInformation> indexCache;
 
+  @VisibleForTesting
+  final File recoveryFile;
+
+  @VisibleForTesting
+  final DB db;
+
   @SuppressWarnings("UnstableApiUsage")
-  public RemoteBlockPushResolver(TransportConf conf) {
+  public RemoteBlockPushResolver(TransportConf conf, File recoveryFile) throws IOException {
     this.conf = conf;
     this.appsShuffleInfo = new ConcurrentHashMap<>();
     this.mergedShuffleCleaner = Executors.newSingleThreadExecutor(
@@ -125,6 +154,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       .weigher((Weigher<String, ShuffleIndexInformation>)
         (filePath, indexInfo) -> indexInfo.getRetainedMemorySize())
       .build(indexCacheLoader);
+    this.recoveryFile = recoveryFile;
+    db = LevelDBProvider.initLevelDB(this.recoveryFile, CURRENT_VERSION, mapper);
+    if (db != null) {
+      reloadAndCleanUpAppShuffleInfo(db);
+    }
   }
 
   @VisibleForTesting
@@ -154,7 +188,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
    * a given shuffle partition of an application, retrieves the associated metadata. If not
    * present and the corresponding merged shuffle does not exist, initializes the metadata.
    */
-  private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
+  @VisibleForTesting
+  AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
       AppShuffleInfo appShuffleInfo,
       int shuffleId,
       int shuffleMergeId,
@@ -180,12 +215,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             // Higher shuffleMergeId seen for the shuffle ID meaning new stage attempt is being
             // run for the shuffle ID. Close and clean up old shuffleMergeId files,
             // happens in the indeterminate stage retries
-            logger.info("{} attempt {} shuffle {} shuffleMerge {}: creating a new shuffle " +
-                "merge metadata since received shuffleMergeId is higher than latest " +
-                "shuffleMergeId {}", appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId,
-                shuffleMergeId, latestShuffleMergeId);
-            mergedShuffleCleaner.execute(() ->
-                closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
+            AppAttemptShuffleMergeId appAttemptShuffleMergeId =
+                new AppAttemptShuffleMergeId(
+                    appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId);
+            logger.info("{}: creating a new shuffle merge metadata since received " +
+                "shuffleMergeId is higher than latest shuffleMergeId {}",
+                appAttemptShuffleMergeId, latestShuffleMergeId);
+            submitCleanupTask(() ->
+                closeAndDeleteOutdatedPartitions(
+                    appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
             return new AppShuffleMergePartitionsInfo(shuffleMergeId, false);
           } else {
             // The request is for block with same shuffleMergeId as the latest shuffleMergeId
@@ -211,8 +249,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       File metaFile =
         appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId);
       try {
-        return newAppShufflePartitionInfo(appShuffleInfo.appId, shuffleId, shuffleMergeId,
-          reduceId, dataFile, indexFile, metaFile);
+        return newAppShufflePartitionInfo(appShuffleInfo, shuffleId, shuffleMergeId, reduceId,
+            dataFile, indexFile, metaFile);
       } catch (IOException e) {
         logger.error("{} attempt {} shuffle {} shuffleMerge {}: cannot create merged shuffle " +
             "partition with data file {}, index file {}, and meta file {}", appShuffleInfo.appId,
@@ -228,15 +266,16 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
 
   @VisibleForTesting
   AppShufflePartitionInfo newAppShufflePartitionInfo(
-      String appId,
+      AppShuffleInfo appShuffleInfo,
       int shuffleId,
       int shuffleMergeId,
       int reduceId,
       File dataFile,
       File indexFile,
       File metaFile) throws IOException {
-    return new AppShufflePartitionInfo(appId, shuffleId, shuffleMergeId, reduceId, dataFile,
-      new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
+    return new AppShufflePartitionInfo(new AppAttemptShuffleMergeId(
+        appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId),
+        reduceId, dataFile, new MergeShuffleFile(indexFile), new MergeShuffleFile(metaFile));
   }
 
   @Override
@@ -313,25 +352,46 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     return appShuffleInfo.appPathsInfo.activeLocalDirs;
   }
 
+  private void removeOldApplicationAttemptsFromDb(AppShuffleInfo info) {
+    if (info.attemptId != UNDEFINED_ATTEMPT_ID) {
+      for (int formerAttemptId = 0; formerAttemptId < info.attemptId; formerAttemptId++) {
+        removeAppAttemptPathInfoFromDB(info.appId, formerAttemptId);
+      }
+    }
+  }
+
   @Override
   public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     logger.info("Application {} removed, cleanupLocalDirs = {}", appId, cleanupLocalDirs);
-    AppShuffleInfo appShuffleInfo = appsShuffleInfo.remove(appId);
+    // Cleanup the DB within critical section to gain the consistency between
+    // DB and in-memory hashmap.
+    AtomicReference<AppShuffleInfo> ref = new AtomicReference<>(null);
+    appsShuffleInfo.compute(appId, (id, info) -> {
+      if (null != info) {
+        // Try cleaning up this application attempt local paths information
+        // and also the local paths information from former attempts in DB.
+        removeAppAttemptPathInfoFromDB(info.appId, info.attemptId);
+        removeOldApplicationAttemptsFromDb(info);
+        ref.set(info);
+      }
+      // Return null to remove the entry
+      return null;
+    });
+    AppShuffleInfo appShuffleInfo = ref.get();
     if (null != appShuffleInfo) {
-      mergedShuffleCleaner.execute(
-        () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs));
+      submitCleanupTask(
+        () -> closeAndDeletePartitionsIfNeeded(appShuffleInfo, cleanupLocalDirs));
     }
   }
 
-
   /**
    * Clean up the AppShufflePartitionInfo for a specific AppShuffleInfo.
    * If cleanupLocalDirs is true, the merged shuffle files will also be deleted.
-   * The cleanup will be executed in a separate thread.
+   * The cleanup will be executed in the mergedShuffleCleaner thread.
    */
   @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
   @VisibleForTesting
-  void closeAndDeletePartitionFilesIfNeeded(
+  void closeAndDeletePartitionsIfNeeded(
       AppShuffleInfo appShuffleInfo,
       boolean cleanupLocalDirs) {
     appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions
@@ -343,15 +403,55 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    removeAppShuffleInfoFromDB(appShuffleInfo);
   }
 
   /**
-   * Clean up all the AppShufflePartitionInfo for a specific shuffleMergeId. This is done
-   * since there is a higher shuffleMergeId request made for a shuffleId, therefore clean
-   * up older shuffleMergeId partitions. The cleanup will be executed in a separate thread.
+   * Remove the application attempt local paths information from the DB. This method is being
+   * invoked within the lock from the ConcurrentHashmap appsShuffleInfo on the specific
+   * applicationId.
    */
   @VisibleForTesting
-  void closeAndDeletePartitionFiles(Map<Integer, AppShufflePartitionInfo> partitions) {
+  void removeAppAttemptPathInfoFromDB(String appId, int attemptId) {
+    AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+    if (db != null) {
+      try {
+        byte[] key = getDbAppAttemptPathsKey(appAttemptId);
+        if (db.get(key) != null) {
+          db.delete(key);
+        }
+      } catch (Exception e) {
+        logger.error("Failed to remove the application attempt {} local path in DB",
+            appAttemptId, e);
+      }
+    }
+  }
+
+  /**
+   * Remove the finalized shuffle partitions information for an application attempt from the DB
+   */
+  @VisibleForTesting
+  void removeAppShuffleInfoFromDB(AppShuffleInfo appShuffleInfo) {
+    if (db != null) {
+      appShuffleInfo.shuffles.forEach((shuffleId, shuffleInfo) ->
+          removeAppShufflePartitionInfoFromDB(
+              new AppAttemptShuffleMergeId(
+                  appShuffleInfo.appId, appShuffleInfo.attemptId,
+                  shuffleId, shuffleInfo.shuffleMergeId)));
+    }
+  }
+
+  /**
+   * Clean up all the AppShufflePartitionInfo and the finalized shuffle partitions in DB for
+   * a specific shuffleMergeId. This is done since there is a higher shuffleMergeId request made
+   * for a shuffleId, therefore clean up older shuffleMergeId partitions. The cleanup will be
+   * executed the mergedShuffleCleaner thread.
+   */
+  @VisibleForTesting
+  void closeAndDeleteOutdatedPartitions(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+      Map<Integer, AppShufflePartitionInfo> partitions) {
+    removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
     partitions
       .forEach((partitionId, partitionInfo) -> {
         synchronized (partitionInfo) {
@@ -360,6 +460,21 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       });
   }
 
+  /**
+   * Remove the finalized shuffle partition information for a specific appAttemptShuffleMergeId
+   * @param appAttemptShuffleMergeId
+   */
+  void removeAppShufflePartitionInfoFromDB(AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    if (db != null) {
+      try {
+        db.delete(getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
+      } catch (Exception e) {
+        logger.error("Error deleting {} from application shuffle merged partition info in DB",
+            appAttemptShuffleMergeId, e);
+      }
+    }
+  }
+
   /**
    * Serially delete local dirs.
    */
@@ -513,6 +628,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             + "with the current attempt id %s stored in shuffle service for application %s",
           msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
     }
+    AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
+        msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
     AtomicReference<Map<Integer, AppShufflePartitionInfo>> shuffleMergePartitionsRef =
       new AtomicReference<>(null);
     appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
@@ -526,8 +643,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
-          mergedShuffleCleaner.execute(() ->
-              closeAndDeletePartitionFiles(mergePartitionsInfo.shuffleMergePartitions));
+          submitCleanupTask(() ->
+              closeAndDeleteOutdatedPartitions(
+                  appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
         } else {
           // This block covers:
           //  1. finalization of determinate stage
@@ -536,8 +654,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
         }
       }
+      // Update the DB for the finalized shuffle
+      writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
       // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results
-      // sent to the driver will be empty. This cam happen when the service didn't receive any
+      // sent to the driver will be empty. This can happen when the service didn't receive any
       // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the
       // shuffle.
       return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
@@ -614,13 +734,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         }
         if (attemptId == UNDEFINED_ATTEMPT_ID) {
           // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo.
-          // Only the first ExecutorRegister message can register the merge dirs
-          appsShuffleInfo.computeIfAbsent(appId, id ->
-            new AppShuffleInfo(
-              appId, UNDEFINED_ATTEMPT_ID,
-              new AppPathsInfo(appId, executorInfo.localDirs,
-                mergeDir, executorInfo.subDirsPerLocalDir)
-            ));
+          // Only the first ExecutorRegister message can register the merge dirs.
+          // DB will also get updated with the registered local path information.
+          appsShuffleInfo.computeIfAbsent(appId, id -> {
+            AppPathsInfo appPathsInfo = new AppPathsInfo(appId, executorInfo.localDirs,
+                mergeDir, executorInfo.subDirsPerLocalDir);
+            writeAppPathsInfoToDb(appId, UNDEFINED_ATTEMPT_ID, appPathsInfo);
+            return new AppShuffleInfo(appId, UNDEFINED_ATTEMPT_ID, appPathsInfo);
+          });
         } else {
           // If attemptId is not -1, there is attemptId stored in the ExecutorShuffleInfo.
           // The first ExecutorRegister message from the same application attempt wil register
@@ -632,6 +753,14 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           appsShuffleInfo.compute(appId, (id, appShuffleInfo) -> {
             if (appShuffleInfo == null || attemptId > appShuffleInfo.attemptId) {
               originalAppShuffleInfo.set(appShuffleInfo);
+              AppPathsInfo appPathsInfo = new AppPathsInfo(appId, executorInfo.localDirs,
+                  mergeDir, executorInfo.subDirsPerLocalDir);
+              // Clean up the outdated App Attempt local path info in the DB and
+              // put the newly registered local path info from newer attempt into the DB.
+              if (appShuffleInfo != null) {
+                removeAppAttemptPathInfoFromDB(appId, appShuffleInfo.attemptId);
+              }
+              writeAppPathsInfoToDb(appId, attemptId, appPathsInfo);
               appShuffleInfo =
                 new AppShuffleInfo(
                   appId, attemptId,
@@ -644,8 +773,10 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
             AppShuffleInfo appShuffleInfo = originalAppShuffleInfo.get();
             logger.warn("Cleanup shuffle info and merged shuffle files for {}_{} as new " +
                 "application attempt registered", appId, appShuffleInfo.attemptId);
-            mergedShuffleCleaner.execute(
-              () -> closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, true));
+            // Clean up all the merge shuffle related information in the DB for the former attempt
+            submitCleanupTask(
+              () -> closeAndDeletePartitionsIfNeeded(appShuffleInfo, true)
+            );
           }
         }
       } catch (JsonProcessingException e) {
@@ -656,6 +787,238 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
+  /**
+   * Close the DB during shutdown
+   */
+  @Override
+  public void close() {
+    if (db != null) {
+      try {
+        db.close();
+      } catch (IOException e) {
+        logger.error("Exception closing leveldb with registered app paths info and "
+            + "shuffle partition info", e);
+      }
+    }
+  }
+
+  /**
+   * Write the application attempt's local path information to the DB
+   */
+  private void writeAppPathsInfoToDb(String appId, int attemptId, AppPathsInfo appPathsInfo) {
+    if (db != null) {
+      AppAttemptId appAttemptId = new AppAttemptId(appId, attemptId);
+      try {
+        byte[] key = getDbAppAttemptPathsKey(appAttemptId);
+        String valueStr = mapper.writeValueAsString(appPathsInfo);
+        byte[] value = valueStr.getBytes(StandardCharsets.UTF_8);
+        db.put(key, value);
+      } catch (Exception e) {
+        logger.error("Error saving registered app paths info for {}", appAttemptId, e);
+      }
+    }
+  }
+
+  /**
+   * Write the finalized shuffle merge partition information into the DB
+   */
+  private void writeAppAttemptShuffleMergeInfoToDB(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) {
+    if (db != null) {
+      // Write AppAttemptShuffleMergeId into LevelDB for finalized shuffles
+      try{
+        byte[] dbKey = getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId);
+        db.put(dbKey, new byte[0]);
+      } catch (Exception e) {
+        logger.error("Error saving active app shuffle partition {}", appAttemptShuffleMergeId, e);
+      }
+    }
+  }
+
+  /**
+   * Parse the DB key with the prefix and the expected return value type
+   */
+  private <T> T parseDbKey(String key, String prefix, Class<T> valueType) throws IOException {
+    String json = key.substring(prefix.length() + 1);
+    return mapper.readValue(json, valueType);
+  }
+
+  /**
+   * Generate AppAttemptId from the DB key
+   */
+  private AppAttemptId parseDbAppAttemptPathsKey(String key) throws IOException {
+    return parseDbKey(key, APP_ATTEMPT_PATH_KEY_PREFIX, AppAttemptId.class);
+  }
+
+  /**
+   * Generate AppAttemptShuffleMergeId from the DB key
+   */
+  private AppAttemptShuffleMergeId parseDbAppAttemptShufflePartitionKey(
+      String key) throws IOException {
+    return parseDbKey(
+        key, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX, AppAttemptShuffleMergeId.class);
+  }
+
+  /**
+   * Generate the DB key with the key object and the specified string prefix
+   */
+  private byte[] getDbKey(Object key, String prefix) throws IOException {
+    // We add a common prefix on all the keys so we can find them in the DB
+    String keyJsonString = prefix + DB_KEY_DELIMITER + mapper.writeValueAsString(key);
+    return keyJsonString.getBytes(StandardCharsets.UTF_8);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptShuffleMergeId object
+   */
+  private byte[] getDbAppAttemptShufflePartitionKey(
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId) throws IOException {
+    return getDbKey(appAttemptShuffleMergeId, APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX);
+  }
+
+  /**
+   * Generate the DB key from AppAttemptId object
+   */
+  private byte[] getDbAppAttemptPathsKey(AppAttemptId appAttemptId) throws IOException {
+    return getDbKey(appAttemptId, APP_ATTEMPT_PATH_KEY_PREFIX);
+  }
+
+  /**
+   * Reload the DB to recover the meta data stored in the hashmap for merged shuffles.
+   * The application attempts local paths information will be firstly reloaded, and then
+   * the finalized shuffle merges will be updated.
+   * This method will also try deleting dangling key/values in DB, which includes:
+   * 1) Outdated application attempt local paths information as of some DB deletion failures
+   * 2) The deletion of finalized shuffle merges are triggered asynchronously, there can be cases
+   * that deletions miss the execution during restart. These finalized shuffle merges should have
+   * no relevant application attempts local paths information registered in the DB and the hashmap.
+   */
+  @VisibleForTesting
+  void reloadAndCleanUpAppShuffleInfo(DB db) throws IOException {
+    logger.info("Reload applications merged shuffle information from DB");
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    dbKeysToBeRemoved.addAll(reloadActiveAppAttemptsPathInfo(db));
+    dbKeysToBeRemoved.addAll(reloadFinalizedAppAttemptsShuffleMergeInfo(db));
+    removeOutdatedKeyValuesInDB(dbKeysToBeRemoved);
+  }
+
+  /**
+   * Reload application attempts local paths information.
+   */
+  @VisibleForTesting
+  List<byte[]> reloadActiveAppAttemptsPathInfo(DB db) throws IOException {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_PATH_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_PATH_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptId appAttemptId = parseDbAppAttemptPathsKey(key);
+        AppPathsInfo appPathsInfo = mapper.readValue(entry.getValue(), AppPathsInfo.class);
+        logger.debug("Reloading Application paths info for application {}", appAttemptId);
+        appsShuffleInfo.compute(appAttemptId.appId,
+            (appId, existingAppShuffleInfo) -> {
+              if (existingAppShuffleInfo == null ||
+                  existingAppShuffleInfo.attemptId < appAttemptId.attemptId) {
+                if (existingAppShuffleInfo != null) {
+                  AppAttemptId existingAppAttemptId = new AppAttemptId(
+                      existingAppShuffleInfo.appId, existingAppShuffleInfo.attemptId);
+                  try {
+                    // Add the former outdated DB key to deletion list
+                    dbKeysToBeRemoved.add(getDbAppAttemptPathsKey(existingAppAttemptId));
+                  } catch (IOException e) {
+                    logger.error("Failed to get the DB key for {}", existingAppAttemptId, e);
+                  }
+                }
+                return new AppShuffleInfo(
+                    appAttemptId.appId, appAttemptId.attemptId, appPathsInfo);
+              } else {
+                // Add the current DB key to deletion list as it is outdated
+                dbKeysToBeRemoved.add(entry.getKey());
+                return existingAppShuffleInfo;
+              }
+            });
+      }
+    }
+    return dbKeysToBeRemoved;
+  }
+
+  /**
+   * Reload the finalized shuffle merges.
+   */
+  @VisibleForTesting
+  List<byte[]> reloadFinalizedAppAttemptsShuffleMergeInfo(DB db) throws IOException {
+    List<byte[]> dbKeysToBeRemoved = new ArrayList<>();
+    if (db != null) {
+      DBIterator itr = db.iterator();
+      itr.seek(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX.getBytes(StandardCharsets.UTF_8));
+      while (itr.hasNext()) {
+        Map.Entry<byte[], byte[]> entry = itr.next();
+        String key = new String(entry.getKey(), StandardCharsets.UTF_8);
+        if (!key.startsWith(APP_ATTEMPT_SHUFFLE_FINALIZE_STATUS_KEY_PREFIX)) {
+          break;
+        }
+        AppAttemptShuffleMergeId partitionId = parseDbAppAttemptShufflePartitionKey(key);
+        logger.debug("Reloading finalized shuffle info for partitionId {}", partitionId);
+        AppShuffleInfo appShuffleInfo = appsShuffleInfo.get(partitionId.appId);
+        if (appShuffleInfo != null && appShuffleInfo.attemptId == partitionId.attemptId) {
+          appShuffleInfo.shuffles.compute(partitionId.shuffleId,
+              (shuffleId, existingMergePartitionInfo) -> {
+                if (existingMergePartitionInfo == null ||
+                    existingMergePartitionInfo.shuffleMergeId < partitionId.shuffleMergeId) {
+                  if (existingMergePartitionInfo != null) {
+                    AppAttemptShuffleMergeId appAttemptShuffleMergeId =
+                        new AppAttemptShuffleMergeId(appShuffleInfo.appId, appShuffleInfo.attemptId,
+                            shuffleId, existingMergePartitionInfo.shuffleMergeId);
+                    try{
+                      dbKeysToBeRemoved.add(
+                          getDbAppAttemptShufflePartitionKey(appAttemptShuffleMergeId));
+                    } catch (Exception e) {
+                      logger.error("Error getting the DB key for {}", appAttemptShuffleMergeId, e);
+                    }
+                  }
+                  return new AppShuffleMergePartitionsInfo(partitionId.shuffleMergeId, true);
+                } else {
+                  dbKeysToBeRemoved.add(entry.getKey());
+                  return existingMergePartitionInfo;
+                }
+              });
+        } else {
+          dbKeysToBeRemoved.add(entry.getKey());
+        }
+      }
+    }
+    return dbKeysToBeRemoved;
+  }
+
+  /**
+   * Clean up DB with a list of outdated keys collected during DB reload
+   */
+  @VisibleForTesting
+  void removeOutdatedKeyValuesInDB(List<byte[]> dbKeysToBeRemoved) {
+      dbKeysToBeRemoved.forEach(
+          (key) -> {
+            try {
+              db.delete(key);
+            } catch (Exception e) {
+              logger.error("Error deleting dangling key {} in DB", key, e);
+            }
+          }
+      );
+  }
+
+  /**
+   * Submit a runnable task to the single thread cleanup executor service
+   */
+  @VisibleForTesting
+  void submitCleanupTask(Runnable task) {
+    mergedShuffleCleaner.execute(task);
+  }
+
   /**
    * Callback for push stream that handles blocks which are not already merged.
    */
@@ -819,8 +1182,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       // to disk as well. This way, we avoid having to buffer the entirety of every blocks in
       // memory, while still providing the necessary guarantee.
       synchronized (partitionInfo) {
-        AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
-        if (isStale(info, partitionInfo.shuffleMergeId) ||
+        AppShuffleMergePartitionsInfo info =
+            appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
+        if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId) ||
             isTooLate(info, partitionInfo.reduceId)) {
           deferredBufs = null;
           return;
@@ -893,14 +1257,15 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         // generating shuffle output for the shuffle ID. By the time we finish reading this
         // message, the block request is either stale or too late. We should thus respond
         // the error code to the client.
-        AppShuffleMergePartitionsInfo info = appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+        AppShuffleMergePartitionsInfo info =
+            appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
         if (isTooLate(info, partitionInfo.reduceId)) {
           deferredBufs = null;
           throw new BlockPushNonFatalFailure(
             new BlockPushReturnCode(ReturnCode.TOO_LATE_BLOCK_PUSH.id(), streamId).toByteBuffer(),
             BlockPushNonFatalFailure.getErrorMsg(streamId, ReturnCode.TOO_LATE_BLOCK_PUSH));
         }
-        if (isStale(info, partitionInfo.shuffleMergeId)) {
+        if (isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
           deferredBufs = null;
           throw new BlockPushNonFatalFailure(
             new BlockPushReturnCode(ReturnCode.STALE_BLOCK_PUSH.id(), streamId).toByteBuffer(),
@@ -976,9 +1341,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       if (isWriting) {
         synchronized (partitionInfo) {
           AppShuffleMergePartitionsInfo info =
-            appShuffleInfo.shuffles.get(partitionInfo.shuffleId);
+            appShuffleInfo.shuffles.get(partitionInfo.appAttemptShuffleMergeId.shuffleId);
           if (!isTooLate(info, partitionInfo.reduceId) &&
-              !isStale(info, partitionInfo.shuffleMergeId)) {
+              !isStale(info, partitionInfo.appAttemptShuffleMergeId.shuffleMergeId)) {
             logger.debug("{} encountered failure", partitionInfo);
             partitionInfo.setCurrentMapIndex(-1);
           }
@@ -993,6 +1358,42 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
+  /**
+   * Encodes an application attempt ID.
+   */
+  public static class AppAttemptId {
+    public final String appId;
+    public final int attemptId;
+
+    @JsonCreator
+    public AppAttemptId(
+        @JsonProperty("appId") String appId,
+        @JsonProperty("attemptId") int attemptId) {
+      this.appId = appId;
+      this.attemptId = attemptId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppAttemptId appAttemptId = (AppAttemptId) o;
+      return attemptId == appAttemptId.attemptId &&
+          Objects.equals(appId, appAttemptId.appId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(appId, attemptId);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Application %s_%s", appId, attemptId);
+    }
+  }
+
   /**
    * Wrapper class to hold merged Shuffle related information for a specific shuffleMergeId
    * required for the shuffles of indeterminate stages.
@@ -1021,12 +1422,55 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
   }
 
+  /**
+   * Encodes an application attempt shuffle merge ID.
+   */
+  public static class AppAttemptShuffleMergeId {
+    public final String appId;
+    public final int attemptId;
+    public final int shuffleId;
+    public final int shuffleMergeId;
+
+    @JsonCreator
+    public AppAttemptShuffleMergeId(
+        @JsonProperty("appId") String appId,
+        @JsonProperty("attemptId") int attemptId,
+        @JsonProperty("shuffleId") int shuffleId,
+        @JsonProperty("shuffleMergeId") int shuffleMergeId) {
+      Preconditions.checkArgument(appId != null, "app id is null");
+      this.appId = appId;
+      this.attemptId = attemptId;
+      this.shuffleId = shuffleId;
+      this.shuffleMergeId = shuffleMergeId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = (AppAttemptShuffleMergeId) o;
+      return attemptId == appAttemptShuffleMergeId.attemptId &&
+          shuffleId == appAttemptShuffleMergeId.shuffleId &&
+          shuffleMergeId == appAttemptShuffleMergeId.shuffleMergeId &&
+          Objects.equals(appId, appAttemptShuffleMergeId.appId);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(appId, attemptId, shuffleId, shuffleMergeId);
+    }
+
+    @Override
+    public String toString() {
+      return String.format("Application %s_%s shuffleId %s shuffleMergeId %s",
+        appId, attemptId, shuffleId, shuffleMergeId);
+    }
+  }
+
   /** Metadata tracked for an actively merged shuffle partition */
   public static class AppShufflePartitionInfo {
-
-    private final String appId;
-    private final int shuffleId;
-    private final int shuffleMergeId;
+    private final AppAttemptShuffleMergeId appAttemptShuffleMergeId;
     private final int reduceId;
     private final File dataFile;
     // The merged shuffle data file channel
@@ -1051,18 +1495,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     private boolean indexMetaUpdateFailed;
 
     AppShufflePartitionInfo(
-        String appId,
-        int shuffleId,
-        int shuffleMergeId,
+        AppAttemptShuffleMergeId appAttemptShuffleMergeId,
         int reduceId,
         File dataFile,
         MergeShuffleFile indexFile,
         MergeShuffleFile metaFile) throws IOException {
-      Preconditions.checkArgument(appId != null, "app id is null");
-      this.appId = appId;
-      this.shuffleId = shuffleId;
-      this.shuffleMergeId = shuffleMergeId;
+      this.appAttemptShuffleMergeId = appAttemptShuffleMergeId;
       this.reduceId = reduceId;
+      // Create FileOutputStream with append mode set to false by default.
+      // This ensures that the file is always overwritten and not appended to even after the
+      // service is restarted. This is required as non-finalized merged shuffle blocks will be
+      // discarded during service restart.
       this.dataChannel = new FileOutputStream(dataFile).getChannel();
       this.dataFile = dataFile;
       this.indexFile = indexFile;
@@ -1131,8 +1574,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
         this.lastChunkOffset = chunkOffset;
         indexMetaUpdateFailed = false;
       } catch (IOException ioe) {
-        logger.warn("{} shuffleId {} reduceId {} update to index/meta failed", appId,
-          shuffleId, reduceId);
+        logger.warn("{} reduceId {} update to index/meta failed",
+            appAttemptShuffleMergeId, reduceId);
         indexMetaUpdateFailed = true;
         // Any exception here is propagated to the caller and the caller can decide whether to
         // abort or not.
@@ -1173,6 +1616,9 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       // Get rid of any partial block data at the end of the file. This could either
       // be due to failure, or a request still being processed when the shuffle
       // merge gets finalized, or any exceptions while updating index/meta files.
+      logger.trace("{} reduceId {} truncating files data {} index {} meta {}",
+          appAttemptShuffleMergeId, reduceId, lastChunkOffset,
+          indexFile.getPos(), metaFile.getPos());
       dataChannel.truncate(lastChunkOffset);
       indexFile.getChannel().truncate(indexFile.getPos());
       metaFile.getChannel().truncate(metaFile.getPos());
@@ -1187,8 +1633,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           }
         }
       } catch (IOException ioe) {
-        logger.warn("Error closing data channel for {} shuffleId {} shuffleMergeId {}"
-          + " reduceId {}", appId, shuffleId, shuffleMergeId, reduceId);
+        logger.warn("Error closing data channel for {} reduceId {}",
+            appAttemptShuffleMergeId, reduceId);
       }
       try {
         metaFile.close();
@@ -1196,8 +1642,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           metaFile.delete();
         }
       } catch (IOException ioe) {
-        logger.warn("Error closing meta file for {} shuffleId {} shuffleMergeId {}"
-          + " reduceId {}", appId, shuffleId, shuffleMergeId, reduceId);
+        logger.warn("Error closing meta file for {} reduceId {}",
+            appAttemptShuffleMergeId, reduceId);
         }
       try {
         indexFile.close();
@@ -1205,15 +1651,17 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           indexFile.delete();
         }
       } catch (IOException ioe) {
-        logger.warn("Error closing index file for {} shuffleId {} shuffleMergeId {}"
-          + " reduceId {}", appId, shuffleId, shuffleMergeId, reduceId);
+        logger.warn("Error closing index file for {} reduceId {}",
+            appAttemptShuffleMergeId, reduceId);
       }
     }
 
     @Override
     public String toString() {
-      return String.format("Application %s shuffleId %s shuffleMergeId %s reduceId %s",
-        appId, shuffleId, shuffleMergeId, reduceId);
+      return String.format("Application %s_%s shuffleId %s shuffleMergeId %s reduceId %s",
+          appAttemptShuffleMergeId.appId, appAttemptShuffleMergeId.attemptId,
+          appAttemptShuffleMergeId.shuffleId, appAttemptShuffleMergeId.shuffleMergeId,
+          reduceId);
     }
 
     @Override
@@ -1236,6 +1684,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       return dataChannel;
     }
 
+    @VisibleForTesting
+    public RoaringBitmap getMapTracker() {
+      return mapTracker;
+    }
+
     @VisibleForTesting
     int getNumIOExceptions() {
       return numIOExceptions;
@@ -1245,11 +1698,25 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   /**
    * Wraps all the information related to the merge directory of an application.
    */
-  private static class AppPathsInfo {
+  @VisibleForTesting
+  public static class AppPathsInfo {
 
+    @JsonFormat(shape = JsonFormat.Shape.ARRAY)
+    @JsonProperty("activeLocalDirs")
     private final String[] activeLocalDirs;
+    @JsonProperty("subDirsPerLocalDir")
     private final int subDirsPerLocalDir;
 
+    @JsonCreator
+    public AppPathsInfo(
+        @JsonFormat(shape = JsonFormat.Shape.ARRAY)
+        @JsonProperty("activeLocalDirs") String[] activeLocalDirs,
+        @JsonProperty("subDirsPerLocalDir") int subDirsPerLocalDir
+      ) {
+      this.activeLocalDirs = activeLocalDirs;
+      this.subDirsPerLocalDir = subDirsPerLocalDir;
+    }
+
     private AppPathsInfo(
         String appId,
         String[] localDirs,
@@ -1270,13 +1737,30 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
           Arrays.toString(activeLocalDirs),subDirsPerLocalDir, appId);
       }
     }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      AppPathsInfo appPathsInfo = (AppPathsInfo) o;
+      return subDirsPerLocalDir == appPathsInfo.subDirsPerLocalDir &&
+          Arrays.equals(activeLocalDirs, appPathsInfo.activeLocalDirs);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(subDirsPerLocalDir) * 41 + Arrays.hashCode(activeLocalDirs);
+    }
   }
 
   /** Merged Shuffle related information tracked for a specific application attempt */
   public static class AppShuffleInfo {
 
-    private final String appId;
-    private final int attemptId;
+    @VisibleForTesting
+    final String appId;
+    @VisibleForTesting
+    final int attemptId;
     private final AppPathsInfo appPathsInfo;
     /**
      * 1. Key tracks shuffleId for an application
@@ -1295,6 +1779,11 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       shuffles = new ConcurrentHashMap<>();
     }
 
+    @VisibleForTesting
+    public AppPathsInfo getAppPathsInfo() {
+      return appPathsInfo;
+    }
+
     @VisibleForTesting
     public ConcurrentMap<Integer, AppShuffleMergePartitionsInfo> getShuffles() {
       return shuffles;
@@ -1305,8 +1794,8 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
      * @see [[org.apache.spark.storage.DiskBlockManager#getMergedShuffleFile(
      *      org.apache.spark.storage.BlockId, scala.Option)]]
      */
-    private String getFilePath(String filename) {
-      // TODO: [SPARK-33236] Change the message when this service is able to handle NM restart
+    @VisibleForTesting
+    String getFilePath(String filename) {
       String targetFile =
         ExecutorDiskUtils.getFilePath(
           appPathsInfo.activeLocalDirs,
@@ -1355,7 +1844,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   }
 
   @VisibleForTesting
-  static class MergeShuffleFile {
+  public static class MergeShuffleFile {
     private final FileChannel channel;
     private final DataOutputStream dos;
     private long pos;
@@ -1369,13 +1858,6 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
       this.file = file;
     }
 
-    @VisibleForTesting
-    MergeShuffleFile(FileChannel channel, DataOutputStream dos) {
-      this.channel = channel;
-      this.dos = dos;
-      this.file = null;
-    }
-
     private void updatePos(long numBytes) {
       pos += numBytes;
     }
@@ -1397,7 +1879,7 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
     }
 
     @VisibleForTesting
-    DataOutputStream getDos() {
+    public DataOutputStream getDos() {
       return dos;
     }
 
diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
index 5d153ae6cde..eb2c1d9fa5c 100644
--- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
+++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadLocalRandom;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableMap;
 
 import org.apache.commons.io.FileUtils;
@@ -88,7 +89,7 @@ public class RemoteBlockPushResolverSuite {
     MapConfigProvider provider = new MapConfigProvider(
       ImmutableMap.of("spark.shuffle.push.server.minChunkSizeInMergedShuffleFile", "4"));
     conf = new TransportConf("shuffle", provider);
-    pushResolver = new RemoteBlockPushResolver(conf);
+    pushResolver = new RemoteBlockPushResolver(conf, null);
     registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META);
   }
 
@@ -488,7 +489,7 @@ public class RemoteBlockPushResolverSuite {
   public void testCleanUpDirectory() throws IOException, InterruptedException {
     String testApp = "cleanUpDirectory";
     Semaphore deleted = new Semaphore(0);
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
       void deleteExecutorDirs(AppShuffleInfo appShuffleInfo) {
         super.deleteExecutorDirs(appShuffleInfo);
@@ -886,12 +887,12 @@ public class RemoteBlockPushResolverSuite {
   public void testPushBlockFromPreviousAttemptIsRejected()
       throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
-      void closeAndDeletePartitionFilesIfNeeded(
-        AppShuffleInfo appShuffleInfo,
-        boolean cleanupLocalDirs) {
-        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs);
+      void closeAndDeletePartitionsIfNeeded(
+          AppShuffleInfo appShuffleInfo,
+          boolean cleanupLocalDirs) {
+        super.closeAndDeletePartitionsIfNeeded(appShuffleInfo, cleanupLocalDirs);
         closed.release();
       }
     };
@@ -983,12 +984,12 @@ public class RemoteBlockPushResolverSuite {
   public void testOngoingMergeOfBlockFromPreviousAttemptIsAborted()
       throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
-      void closeAndDeletePartitionFilesIfNeeded(
+      void closeAndDeletePartitionsIfNeeded(
           AppShuffleInfo appShuffleInfo,
           boolean cleanupLocalDirs) {
-        super.closeAndDeletePartitionFilesIfNeeded(appShuffleInfo, cleanupLocalDirs);
+        super.closeAndDeletePartitionsIfNeeded(appShuffleInfo, cleanupLocalDirs);
         closed.release();
       }
     };
@@ -1145,10 +1146,12 @@ public class RemoteBlockPushResolverSuite {
   @Test
   public void testCleanupOlderShuffleMergeId() throws IOException, InterruptedException {
     Semaphore closed = new Semaphore(0);
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
-      void closeAndDeletePartitionFiles(Map<Integer, AppShufflePartitionInfo> partitions) {
-        super.closeAndDeletePartitionFiles(partitions);
+      void closeAndDeleteOutdatedPartitions(
+          AppAttemptShuffleMergeId appAttemptShuffleMergeId,
+          Map<Integer, AppShufflePartitionInfo> partitions) {
+        super.closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, partitions);
         closed.release();
       }
     };
@@ -1273,23 +1276,66 @@ public class RemoteBlockPushResolverSuite {
     removeApplication(TEST_APP);
   }
 
+
+  @Test
+  public void testJsonSerializationOfPushShufflePartitionInfo() throws IOException {
+    ObjectMapper mapper = new ObjectMapper();
+    RemoteBlockPushResolver.AppAttemptId appAttemptId =
+      new RemoteBlockPushResolver.AppAttemptId("foo", 1);
+    String appAttemptIdJson = mapper.writeValueAsString(appAttemptId);
+    RemoteBlockPushResolver.AppAttemptId parsedAppAttemptId =
+      mapper.readValue(appAttemptIdJson, RemoteBlockPushResolver.AppAttemptId.class);
+    assertEquals(appAttemptId, parsedAppAttemptId);
+
+    RemoteBlockPushResolver.AppPathsInfo pathInfo =
+      new RemoteBlockPushResolver.AppPathsInfo(new String[]{"/foo", "/bar"}, 64);
+    String pathInfoJson = mapper.writeValueAsString(pathInfo);
+    RemoteBlockPushResolver.AppPathsInfo
+      parsedPathInfo = mapper.readValue(pathInfoJson, RemoteBlockPushResolver.AppPathsInfo.class);
+    assertEquals(pathInfo, parsedPathInfo);
+
+    RemoteBlockPushResolver.AppAttemptShuffleMergeId partitionId =
+      new RemoteBlockPushResolver.AppAttemptShuffleMergeId("foo", 1, 1, 1);
+    String partitionIdJson = mapper.writeValueAsString(partitionId);
+    RemoteBlockPushResolver.AppAttemptShuffleMergeId parsedPartitionId =
+      mapper.readValue(partitionIdJson, RemoteBlockPushResolver.AppAttemptShuffleMergeId.class);
+    assertEquals(partitionId, parsedPartitionId);
+
+    // Intentionally keep these hard-coded strings in here, to check backwards-compatibility.
+    // It is not legacy yet, but keeping this here in case anybody changes it
+    String legacyAppAttemptIdJson = "{\"appId\": \"foo\", \"attemptId\":\"1\"}";
+    assertEquals(appAttemptId,
+      mapper.readValue(legacyAppAttemptIdJson, RemoteBlockPushResolver.AppAttemptId.class));
+    String legacyAppPathInfoJson =
+      "{\"activeLocalDirs\": [\"/foo\", \"/bar\"], \"subDirsPerLocalDir\":\"64\"}";
+    assertEquals(pathInfo,
+      mapper.readValue(legacyAppPathInfoJson, RemoteBlockPushResolver.AppPathsInfo.class));
+    String legacyPartitionIdJson = "{\"appId\":\"foo\", \"attemptId\":\"1\", "
+      + "\"shuffleId\":\"1\", \"shuffleMergeId\":\"1\"}";
+    assertEquals(partitionId, mapper.readValue(legacyPartitionIdJson,
+      RemoteBlockPushResolver.AppAttemptShuffleMergeId.class));
+  }
+
   private void useTestFiles(boolean useTestIndexFile, boolean useTestMetaFile) throws IOException {
-    pushResolver = new RemoteBlockPushResolver(conf) {
+    pushResolver = new RemoteBlockPushResolver(conf, null) {
       @Override
       AppShufflePartitionInfo newAppShufflePartitionInfo(
-          String appId,
+          AppShuffleInfo appShuffleInfo,
           int shuffleId,
           int shuffleMergeId,
           int reduceId,
           File dataFile,
           File indexFile,
           File metaFile) throws IOException {
-        MergeShuffleFile mergedIndexFile = useTestIndexFile ? new TestMergeShuffleFile(indexFile)
-          : new MergeShuffleFile(indexFile);
-        MergeShuffleFile mergedMetaFile = useTestMetaFile ? new TestMergeShuffleFile(metaFile) :
-          new MergeShuffleFile(metaFile);
-        return new AppShufflePartitionInfo(appId, shuffleId, shuffleMergeId, reduceId, dataFile,
-          mergedIndexFile, mergedMetaFile);
+        MergeShuffleFile mergedIndexFile = useTestIndexFile ?
+          new TestMergeShuffleFile(indexFile)
+            : new MergeShuffleFile(indexFile);
+        MergeShuffleFile mergedMetaFile = useTestMetaFile ?
+          new TestMergeShuffleFile(metaFile) :
+            new MergeShuffleFile(metaFile);
+        return new AppShufflePartitionInfo(new AppAttemptShuffleMergeId(
+            appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId), reduceId,
+            dataFile, mergedIndexFile, mergedMetaFile);
       }
     };
     registerExecutor(TEST_APP, prepareLocalDirs(localDirs, MERGE_DIRECTORY), MERGE_DIRECTORY_META);
@@ -1345,6 +1391,7 @@ public class RemoteBlockPushResolverSuite {
     assertEquals("num of bitmaps", meta.getNumChunks(), bitmaps.length);
     for (int i = 0; i < meta.getNumChunks(); i++) {
       RoaringBitmap chunkBitmap = bitmaps[i];
+      assertEquals("cardinality", expectedMapsPerChunk[i].length, chunkBitmap.getCardinality());
       Arrays.stream(expectedMapsPerChunk[i]).forEach(x -> assertTrue(chunkBitmap.contains(x)));
     }
     for (int i = 0; i < meta.getNumChunks(); i++) {
@@ -1389,7 +1436,7 @@ public class RemoteBlockPushResolverSuite {
     private FileChannel channel;
 
     private TestMergeShuffleFile(File file) throws IOException {
-      super(null, null);
+      super(file);
       this.file = file;
       FileOutputStream fos = new FileOutputStream(file);
       channel = fos.getChannel();
@@ -1397,7 +1444,7 @@ public class RemoteBlockPushResolverSuite {
     }
 
     @Override
-    DataOutputStream getDos() {
+    public DataOutputStream getDos() {
       return activeDos;
     }
 
diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
index f1b89413914..9295239f996 100644
--- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
+++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java
@@ -120,6 +120,8 @@ public class YarnShuffleService extends AuxiliaryService {
 
   private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
   private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";
+  @VisibleForTesting
+  static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME = "sparkShuffleMergeRecovery.ldb";
 
   // Whether failure during service initialization should stop the NM.
   @VisibleForTesting
@@ -156,7 +158,8 @@ public class YarnShuffleService extends AuxiliaryService {
 
   private TransportContext transportContext = null;
 
-  private Configuration _conf = null;
+  @VisibleForTesting
+  Configuration _conf = null;
 
   // The recovery path used to shuffle service recovery
   @VisibleForTesting
@@ -166,6 +169,10 @@ public class YarnShuffleService extends AuxiliaryService {
   @VisibleForTesting
   ExternalBlockHandler blockHandler;
 
+  // Handles merged shuffle registration, push blocks and finalization
+  @VisibleForTesting
+  MergedShuffleFileManager shuffleMergeManager;
+
   // Where to store & reload executor info for recovering state after an NM restart
   @VisibleForTesting
   File registeredExecutorFile;
@@ -174,6 +181,10 @@ public class YarnShuffleService extends AuxiliaryService {
   @VisibleForTesting
   File secretsFile;
 
+  // Where to store & reload merge manager info for recovering state after an NM restart
+  @VisibleForTesting
+  File mergeManagerFile;
+
   private DB db;
 
   public YarnShuffleService() {
@@ -230,11 +241,16 @@ public class YarnShuffleService extends AuxiliaryService {
       // when it comes back
       if (_recoveryPath != null) {
         registeredExecutorFile = initRecoveryDb(RECOVERY_FILE_NAME);
+        mergeManagerFile = initRecoveryDb(SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME);
       }
 
       TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(_conf));
-      MergedShuffleFileManager shuffleMergeManager = newMergedShuffleFileManagerInstance(
-        transportConf);
+      // Create new MergedShuffleFileManager if shuffleMergeManager is null.
+      // This is because in the unit test, a customized MergedShuffleFileManager will
+      // be created through setShuffleFileManager method.
+      if (shuffleMergeManager == null) {
+        shuffleMergeManager = newMergedShuffleFileManagerInstance(transportConf, mergeManagerFile);
+      }
       blockHandler = new ExternalBlockHandler(
         transportConf, registeredExecutorFile, shuffleMergeManager);
 
@@ -286,8 +302,18 @@ public class YarnShuffleService extends AuxiliaryService {
     }
   }
 
+  /**
+   * Set the customized MergedShuffleFileManager for unit testing only
+   * @param mergeManager
+   */
+  @VisibleForTesting
+  void setShuffleMergeManager(MergedShuffleFileManager mergeManager) {
+    this.shuffleMergeManager = mergeManager;
+  }
+
   @VisibleForTesting
-  static MergedShuffleFileManager newMergedShuffleFileManagerInstance(TransportConf conf) {
+  static MergedShuffleFileManager newMergedShuffleFileManagerInstance(
+      TransportConf conf, File mergeManagerFile) {
     String mergeManagerImplClassName = conf.mergedShuffleFileManagerImpl();
     try {
       Class<?> mergeManagerImplClazz = Class.forName(
@@ -296,10 +322,11 @@ public class YarnShuffleService extends AuxiliaryService {
         mergeManagerImplClazz.asSubclass(MergedShuffleFileManager.class);
       // The assumption is that all the custom implementations just like the RemoteBlockPushResolver
       // will also need the transport configuration.
-      return mergeManagerSubClazz.getConstructor(TransportConf.class).newInstance(conf);
+      return mergeManagerSubClazz.getConstructor(TransportConf.class, File.class)
+        .newInstance(conf, mergeManagerFile);
     } catch (Exception e) {
       defaultLogger.error("Unable to create an instance of {}", mergeManagerImplClassName);
-      return new NoOpMergedShuffleFileManager(conf);
+      return new NoOpMergedShuffleFileManager(conf, mergeManagerFile);
     }
   }
 
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
index 77ee09de926..d67d372db2b 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/shuffle/ShuffleTestAccessor.scala
@@ -17,6 +17,8 @@
 package org.apache.spark.network.shuffle
 
 import java.io.File
+import java.nio.channels.FileChannel
+import java.util.List
 import java.util.concurrent.ConcurrentMap
 
 import org.apache.hadoop.yarn.api.records.ApplicationId
@@ -24,7 +26,9 @@ import org.fusesource.leveldbjni.JniDBFactory
 import org.iq80.leveldb.{DB, Options}
 
 import org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.AppExecId
-import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
+import org.apache.spark.network.shuffle.RemoteBlockPushResolver._
+import org.apache.spark.network.shuffle.protocol.{ExecutorShuffleInfo, FinalizeShuffleMerge}
+import org.apache.spark.network.util.TransportConf
 
 /**
  * just a cheat to get package-visible members in tests
@@ -44,14 +48,165 @@ object ShuffleTestAccessor {
     Option(resolver.executors.get(id))
   }
 
+  def getAppPathsInfo(
+      appId: String,
+      mergeManager: RemoteBlockPushResolver): Option[AppPathsInfo] = {
+    Option(mergeManager.appsShuffleInfo.get(appId)).flatMap(v => Option(v.getAppPathsInfo))
+  }
+
+  def getAppsShuffleInfo(
+    mergeManager: RemoteBlockPushResolver
+  ): ConcurrentMap[String, RemoteBlockPushResolver.AppShuffleInfo] = {
+    mergeManager.appsShuffleInfo
+  }
+
   def registeredExecutorFile(resolver: ExternalShuffleBlockResolver): File = {
     resolver.registeredExecutorFile
   }
 
+  def recoveryFile(mergeManager: RemoteBlockPushResolver): File = {
+    mergeManager.recoveryFile
+  }
+
   def shuffleServiceLevelDB(resolver: ExternalShuffleBlockResolver): DB = {
     resolver.db
   }
 
+  def mergeManagerLevelDB(mergeManager: RemoteBlockPushResolver): DB = {
+    mergeManager.db
+  }
+
+  def createMergeManagerWithSynchronizedCleanup(
+      transportConf: TransportConf,
+      file: File): MergedShuffleFileManager = {
+    new RemoteBlockPushResolver(transportConf, file) {
+      override private[shuffle] def submitCleanupTask(task: Runnable): Unit = {
+        task.run()
+      }
+    }
+  }
+
+  def createMergeManagerWithNoOpAppShuffleDBCleanup(
+      transportConf: TransportConf,
+      file: File): MergedShuffleFileManager = {
+    new RemoteBlockPushResolver(transportConf, file) {
+      override private[shuffle] def removeAppShuffleInfoFromDB(
+          appShuffleInfo: RemoteBlockPushResolver.AppShuffleInfo): Unit = {
+        // NoOp
+      }
+      override private[shuffle] def submitCleanupTask(task: Runnable): Unit = {
+        task.run()
+      }
+    }
+  }
+
+  def createMergeManagerWithNoDBCleanup(
+      transportConf: TransportConf,
+      file: File): MergedShuffleFileManager = {
+    new RemoteBlockPushResolver(transportConf, file) {
+      override private[shuffle] def removeAppAttemptPathInfoFromDB(
+        appId: String, attemptId: Int): Unit = {
+        // NoOp
+      }
+      override private[shuffle] def removeAppShuffleInfoFromDB(
+        appShuffleInfo: RemoteBlockPushResolver.AppShuffleInfo): Unit = {
+        // NoOp
+      }
+      override private[shuffle] def submitCleanupTask(task: Runnable): Unit = {
+        task.run()
+      }
+    }
+  }
+
+  def createMergeManagerWithNoCleanupAfterReload(
+      transportConf: TransportConf,
+      file: File): MergedShuffleFileManager = {
+    new RemoteBlockPushResolver(transportConf, file) {
+      override private[shuffle] def removeOutdatedKeyValuesInDB(
+          dbKeysToBeRemoved: List[Array[Byte]]): Unit = {
+        // NoOp
+      }
+    }
+  }
+
+  def getOrCreateAppShufflePartitionInfo(
+      mergeManager: RemoteBlockPushResolver,
+      appShufflePartitionId: AppAttemptShuffleMergeId,
+      reduceId: Int,
+      blockId: String): AppShufflePartitionInfo = {
+    mergeManager.getOrCreateAppShufflePartitionInfo(
+      mergeManager.appsShuffleInfo.get(appShufflePartitionId.appId),
+      appShufflePartitionId.shuffleId, appShufflePartitionId.shuffleMergeId,
+      reduceId, blockId)
+  }
+
+  def finalizeShuffleMerge(
+      mergeManager: RemoteBlockPushResolver,
+      appAttemptShuffleMergeId: AppAttemptShuffleMergeId): Unit = {
+    mergeManager.finalizeShuffleMerge(
+      new FinalizeShuffleMerge(
+        appAttemptShuffleMergeId.appId, appAttemptShuffleMergeId.attemptId,
+        appAttemptShuffleMergeId.shuffleId, appAttemptShuffleMergeId.shuffleMergeId))
+  }
+
+  def getMergedShuffleDataFile(
+      mergeManager: RemoteBlockPushResolver,
+      appShufflePartitionId: AppAttemptShuffleMergeId,
+      reduceId: Int): File = {
+    mergeManager.appsShuffleInfo.get(appShufflePartitionId.appId)
+      .getMergedShuffleDataFile(appShufflePartitionId.shuffleId,
+        appShufflePartitionId.shuffleMergeId, reduceId)
+  }
+
+  def getMergedShuffleIndexFile(
+      mergeManager: RemoteBlockPushResolver,
+      appShufflePartitionId: AppAttemptShuffleMergeId,
+      reduceId: Int): File = {
+    new File(mergeManager.appsShuffleInfo.get(appShufflePartitionId.appId)
+      .getMergedShuffleIndexFilePath(appShufflePartitionId.shuffleId,
+        appShufflePartitionId.shuffleMergeId, reduceId))
+  }
+
+  def getMergedShuffleMetaFile(
+      mergeManager: RemoteBlockPushResolver,
+      appShufflePartitionId: AppAttemptShuffleMergeId,
+      reduceId: Int): File = {
+    mergeManager.appsShuffleInfo.get(appShufflePartitionId.appId)
+      .getMergedShuffleMetaFile(appShufflePartitionId.shuffleId,
+        appShufflePartitionId.shuffleMergeId, reduceId)
+  }
+
+  def getPartitionFileHandlers(
+      partitionInfo: AppShufflePartitionInfo):
+      (FileChannel, MergeShuffleFile, MergeShuffleFile) = {
+    (partitionInfo.getDataChannel, partitionInfo.getMetaFile, partitionInfo.getIndexFile)
+  }
+
+  def closePartitionFiles(partitionInfo: AppShufflePartitionInfo): Unit = {
+    partitionInfo.closeAllFilesAndDeleteIfNeeded(false)
+  }
+
+  def clearAppShuffleInfo(mergeMgr: RemoteBlockPushResolver): Unit = {
+    mergeMgr.appsShuffleInfo.clear()
+  }
+
+  def reloadAppShuffleInfo(
+      mergeMgr: RemoteBlockPushResolver, db: DB): ConcurrentMap[String, AppShuffleInfo] = {
+    mergeMgr.appsShuffleInfo.clear()
+    mergeMgr.reloadAndCleanUpAppShuffleInfo(db)
+    mergeMgr.appsShuffleInfo
+  }
+
+  def getOutdatedAppPathInfoCountDuringDBReload(
+      mergeMgr: RemoteBlockPushResolver, db: DB): Int = {
+    mergeMgr.reloadActiveAppAttemptsPathInfo(db).size()
+  }
+
+  def getOutdatedFinalizedShuffleCountDuringDBReload(
+    mergeMgr: RemoteBlockPushResolver, db: DB): Int = {
+    mergeMgr.reloadFinalizedAppAttemptsShuffleMergeInfo(db).size()
+  }
+
   def reloadRegisteredExecutors(
     file: File): ConcurrentMap[ExternalShuffleBlockResolver.AppExecId, ExecutorShuffleInfo] = {
     val options: Options = new Options
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
index dded1c77390..a23be6c6e70 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala
@@ -18,6 +18,7 @@ package org.apache.spark.network.yarn
 
 import java.io.{DataOutputStream, File, FileOutputStream, IOException}
 import java.nio.ByteBuffer
+import java.nio.charset.StandardCharsets
 import java.nio.file.Files
 import java.nio.file.attribute.PosixFilePermission._
 import java.util.EnumSet
@@ -35,6 +36,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.server.api.{ApplicationInitializationContext, ApplicationTerminationContext}
 import org.mockito.Mockito.{mock, when}
+import org.roaringbitmap.RoaringBitmap
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
@@ -42,9 +44,12 @@ import org.scalatest.matchers.should.Matchers._
 import org.apache.spark.SecurityManager
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.internal.config._
-import org.apache.spark.network.shuffle.{NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor}
+import org.apache.spark.network.server.BlockPushNonFatalFailure
+import org.apache.spark.network.shuffle.{MergedShuffleFileManager, NoOpMergedShuffleFileManager, RemoteBlockPushResolver, ShuffleTestAccessor}
+import org.apache.spark.network.shuffle.RemoteBlockPushResolver._
 import org.apache.spark.network.shuffle.protocol.ExecutorShuffleInfo
 import org.apache.spark.network.util.TransportConf
+import org.apache.spark.network.yarn.util.HadoopConfigProvider
 import org.apache.spark.tags.ExtendedLevelDBTest
 import org.apache.spark.util.Utils
 
@@ -52,8 +57,18 @@ import org.apache.spark.util.Utils
 class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
   private[yarn] var yarnConfig: YarnConfiguration = null
   private[yarn] val SORT_MANAGER = "org.apache.spark.shuffle.sort.SortShuffleManager"
+  private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1 =
+    "org.apache.spark.shuffle.sort.SortShuffleManager:" +
+      "{\"mergeDir\": \"merge_manager_1\", \"attemptId\": \"1\"}"
+  private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2 =
+    "org.apache.spark.shuffle.sort.SortShuffleManager:" +
+      "{\"mergeDir\": \"merge_manager_2\", \"attemptId\": \"2\"}"
+  private[yarn] val SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID =
+    "org.apache.spark.shuffle.sort.SortShuffleManager:{\"mergeDir\": \"merge_manager\"}"
+  private val DUMMY_BLOCK_DATA = "dummyBlockData".getBytes(StandardCharsets.UTF_8)
 
   private var recoveryLocalDir: File = _
+  protected var tempDir: File = _
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -68,8 +83,11 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     yarnConfig.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true)
     val localDir = Utils.createTempDir()
     yarnConfig.set(YarnConfiguration.NM_LOCAL_DIRS, localDir.getAbsolutePath)
+    yarnConfig.set("spark.shuffle.push.server.mergedShuffleFileManagerImpl",
+      "org.apache.spark.network.shuffle.RemoteBlockPushResolver")
 
     recoveryLocalDir = Utils.createTempDir()
+    tempDir = Utils.createTempDir()
   }
 
   var s1: YarnShuffleService = null
@@ -95,36 +113,132 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     }
   }
 
-  test("executor state kept across NM restart") {
-    s1 = new YarnShuffleService
-    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+  private def prepareAppShufflePartition(
+      mergeManager: RemoteBlockPushResolver,
+      partitionId: AppAttemptShuffleMergeId,
+      reduceId: Int,
+      blockId: String): AppShufflePartitionInfo = {
+    val dataFile = ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager, partitionId, reduceId)
+    dataFile.getParentFile.mkdirs()
+    val indexFile =
+      ShuffleTestAccessor.getMergedShuffleIndexFile(mergeManager, partitionId, reduceId)
+    indexFile.getParentFile.mkdirs()
+    val metaFile = ShuffleTestAccessor.getMergedShuffleMetaFile(mergeManager, partitionId, reduceId)
+    metaFile.getParentFile.mkdirs()
+    val partitionInfo = ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo(
+      mergeManager, partitionId, reduceId, blockId)
+
+    val (dataChannel, mergeMetaFile, mergeIndexFile) =
+      ShuffleTestAccessor.getPartitionFileHandlers(partitionInfo)
+    for (chunkId <- 1 to 5) {
+      (0 until 4).foreach(_ => dataChannel.write(ByteBuffer.wrap(DUMMY_BLOCK_DATA)))
+      mergeIndexFile.getDos.writeLong(chunkId * 4 * DUMMY_BLOCK_DATA.length - 1)
+      val bitmap = new RoaringBitmap
+      for (j <- (chunkId - 1) * 10 until chunkId * 10) {
+        bitmap.add(j)
+      }
+      bitmap.serialize(mergeMetaFile.getDos())
+    }
+    dataChannel.write(ByteBuffer.wrap(DUMMY_BLOCK_DATA))
+    ShuffleTestAccessor.closePartitionFiles(partitionInfo)
+
+    partitionInfo
+  }
+
+  private def createYarnShuffleService(init: Boolean = true): YarnShuffleService = {
+    val shuffleService = new YarnShuffleService
+    shuffleService.setRecoveryPath(new Path(recoveryLocalDir.toURI))
+    shuffleService._conf = yarnConfig
+    if (init) {
+      shuffleService.init(yarnConfig)
+    }
+    shuffleService
+  }
+
+  private def createYarnShuffleServiceWithCustomMergeManager(
+      createMergeManager: (TransportConf, File) => MergedShuffleFileManager): YarnShuffleService = {
+    val shuffleService = createYarnShuffleService(false)
+    val transportConf = new TransportConf("shuffle", new HadoopConfigProvider(yarnConfig))
+    val testShuffleMergeManager = createMergeManager(
+        transportConf,
+        shuffleService.initRecoveryDb(YarnShuffleService.SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME))
+    shuffleService.setShuffleMergeManager(testShuffleMergeManager)
+    shuffleService.init(yarnConfig)
+    shuffleService
+  }
+
+  test("executor and merged shuffle state kept across NM restart") {
     // set auth to true to test the secrets recovery
     yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, true)
-    s1.init(yarnConfig)
+    s1 = createYarnShuffleService()
     val app1Id = ApplicationId.newInstance(0, 1)
     val app1Data = makeAppInfo("user", app1Id)
     s1.initializeApplication(app1Data)
     val app2Id = ApplicationId.newInstance(0, 2)
     val app2Data = makeAppInfo("user", app2Id)
     s1.initializeApplication(app2Data)
+    val app3Id = ApplicationId.newInstance(0, 3)
+    val app3Data = makeAppInfo("user", app3Id)
+    s1.initializeApplication(app3Data)
+    val app4Id = ApplicationId.newInstance(0, 4)
+    val app4Data = makeAppInfo("user", app4Id)
+    s1.initializeApplication(app4Data)
 
     val execStateFile = s1.registeredExecutorFile
     execStateFile should not be (null)
     val secretsFile = s1.secretsFile
     secretsFile should not be (null)
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
     val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
     val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
+    val mergedShuffleInfo3 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo4 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy/bippy").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
 
     val blockHandler = s1.blockHandler
     val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
     ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile)
 
+    val mergeManager = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    ShuffleTestAccessor.recoveryFile(mergeManager) should be (mergeMgrFile)
+
     blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
     blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+    blockResolver.registerExecutor(app3Id.toString, "exec-3", mergedShuffleInfo3)
+    blockResolver.registerExecutor(app4Id.toString, "exec-4", mergedShuffleInfo4)
     ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", blockResolver) should
       be (Some(shuffleInfo1))
     ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", blockResolver) should
       be (Some(shuffleInfo2))
+    ShuffleTestAccessor.getExecutorInfo(app3Id, "exec-3", blockResolver) should
+      be (Some(mergedShuffleInfo3))
+    ShuffleTestAccessor.getExecutorInfo(app4Id, "exec-4", blockResolver) should
+      be (Some(mergedShuffleInfo4))
+
+    mergeManager.registerExecutor(app3Id.toString, mergedShuffleInfo3)
+    mergeManager.registerExecutor(app4Id.toString, mergedShuffleInfo4)
+
+    val localDirs3 = Array(new File(tempDir, "foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs4 = Array(new File(tempDir, "bippy/merge_manager_1").getAbsolutePath)
+    val appPathsInfo3 = new AppPathsInfo(localDirs3, 3)
+    val appPathsInfo4 = new AppPathsInfo(localDirs4, 5)
+
+    ShuffleTestAccessor.getAppPathsInfo(app3Id.toString, mergeManager) should
+      be (Some(appPathsInfo3))
+    ShuffleTestAccessor.getAppPathsInfo(app4Id.toString, mergeManager) should
+      be (Some(appPathsInfo4))
+
+    val partitionId3 = new AppAttemptShuffleMergeId(app3Id.toString, 1, 1, 1)
+    val partitionId4 = new AppAttemptShuffleMergeId(app4Id.toString, 1, 2, 1)
+    prepareAppShufflePartition(mergeManager, partitionId3, 1, "3")
+    prepareAppShufflePartition(mergeManager, partitionId4, 2, "4")
 
     if (!execStateFile.exists()) {
       @tailrec def findExistingParent(file: File): File = {
@@ -136,48 +250,80 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
       assert(false, s"$execStateFile does not exist -- closest existing parent is $existingParent")
     }
     assert(execStateFile.exists(), s"$execStateFile did not exist")
+    assert(mergeMgrFile.exists(), s"$mergeMgrFile did not exist")
 
     // now we pretend the shuffle service goes down, and comes back up
     s1.stop()
-    s2 = new YarnShuffleService
-    s2.setRecoveryPath(new Path(recoveryLocalDir.toURI))
-    s2.init(yarnConfig)
+    s2 = createYarnShuffleService()
     s2.secretsFile should be (secretsFile)
     s2.registeredExecutorFile should be (execStateFile)
+    s2.mergeManagerFile should be (mergeMgrFile)
 
     val handler2 = s2.blockHandler
     val resolver2 = ShuffleTestAccessor.getBlockResolver(handler2)
+    val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
 
-    // now we reinitialize only one of the apps, and expect yarn to tell us that app2 was stopped
-    // during the restart
+    // now we reinitialize only two of the apps, and expect yarn to tell us that the other two apps
+    // were stopped during the restart
     s2.initializeApplication(app1Data)
+    s2.initializeApplication(app3Data)
     s2.stopApplication(new ApplicationTerminationContext(app2Id))
+    s2.stopApplication(new ApplicationTerminationContext(app4Id))
     ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver2) should be (Some(shuffleInfo1))
     ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver2) should be (None)
+    ShuffleTestAccessor
+      .getExecutorInfo(app3Id, "exec-3", resolver2) should be (Some(mergedShuffleInfo3))
+    ShuffleTestAccessor.getExecutorInfo(app4Id, "exec-4", resolver2) should be (None)
+    ShuffleTestAccessor
+      .getAppPathsInfo(app3Id.toString, mergeManager2) should be (Some(appPathsInfo3))
+    ShuffleTestAccessor.getAppPathsInfo(app4Id.toString, mergeManager2) should be (None)
+
+    val dataFileReload3 =
+      ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager2, partitionId3, 1)
+    dataFileReload3.length() should be ((4 * 5 + 1) * DUMMY_BLOCK_DATA.length)
+
+    // Regenerate the merge partitions as it was not finalized before the restart
+    prepareAppShufflePartition(mergeManager2, partitionId3, 1, "3")
+    // Finalize shuffle merge for partitionId3
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager2, partitionId3)
 
     // Act like the NM restarts one more time
     s2.stop()
-    s3 = new YarnShuffleService
-    s3.setRecoveryPath(new Path(recoveryLocalDir.toURI))
-    s3.init(yarnConfig)
+    s3 = createYarnShuffleService()
     s3.registeredExecutorFile should be (execStateFile)
     s3.secretsFile should be (secretsFile)
+    s3.mergeManagerFile should be (mergeMgrFile)
 
     val handler3 = s3.blockHandler
     val resolver3 = ShuffleTestAccessor.getBlockResolver(handler3)
+    val mergeManager3 = s3.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
 
-    // app1 is still running
+    // app1 and app3 are still running
     s3.initializeApplication(app1Data)
     ShuffleTestAccessor.getExecutorInfo(app1Id, "exec-1", resolver3) should be (Some(shuffleInfo1))
     ShuffleTestAccessor.getExecutorInfo(app2Id, "exec-2", resolver3) should be (None)
+    ShuffleTestAccessor
+      .getExecutorInfo(app3Id, "exec-3", resolver3) should be (Some(mergedShuffleInfo3))
+    ShuffleTestAccessor.getExecutorInfo(app4Id, "exec-4", resolver3) should be (None)
+    ShuffleTestAccessor
+      .getAppPathsInfo(app3Id.toString, mergeManager3) should be (Some(appPathsInfo3))
+    ShuffleTestAccessor.getAppPathsInfo(app4Id.toString, mergeManager3) should be (None)
+
+    val error = intercept[BlockPushNonFatalFailure] {
+      ShuffleTestAccessor.getOrCreateAppShufflePartitionInfo(
+        mergeManager3, partitionId3, 2, "3")
+    }
+    assert(error.getMessage.contains("is finalized"))
+
+    val dataFileReload3Again =
+      ShuffleTestAccessor.getMergedShuffleDataFile(mergeManager3, partitionId3, 1)
+    dataFileReload3Again.length() should be ((4 * 5 + 1) * DUMMY_BLOCK_DATA.length)
     s3.stop()
   }
 
-  test("removed applications should not be in registered executor file") {
-    s1 = new YarnShuffleService
-    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
-    yarnConfig.setBoolean(SecurityManager.SPARK_AUTH_CONF, false)
-    s1.init(yarnConfig)
+  test("removed applications should not be in registered executor file and merged shuffle file") {
+    s1 = createYarnShuffleServiceWithCustomMergeManager(
+      ShuffleTestAccessor.createMergeManagerWithSynchronizedCleanup)
     val secretsFile = s1.secretsFile
     secretsFile should be (null)
     val app1Id = ApplicationId.newInstance(0, 1)
@@ -186,32 +332,63 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     val app2Id = ApplicationId.newInstance(0, 2)
     val app2Data = makeAppInfo("user", app2Id)
     s1.initializeApplication(app2Data)
+    val app3Id = ApplicationId.newInstance(0, 3)
+    val app3Data = makeAppInfo("user", app3Id)
+    s1.initializeApplication(app3Data)
+    val app4Id = ApplicationId.newInstance(0, 4)
+    val app4Data = makeAppInfo("user", app4Id)
+    s1.initializeApplication(app4Data)
 
     val execStateFile = s1.registeredExecutorFile
     execStateFile should not be (null)
     val shuffleInfo1 = new ExecutorShuffleInfo(Array("/foo", "/bar"), 3, SORT_MANAGER)
     val shuffleInfo2 = new ExecutorShuffleInfo(Array("/bippy"), 5, SORT_MANAGER)
+    val mergedShuffleInfo3 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath),
+      3, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo4 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy/bippy").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
 
     val blockHandler = s1.blockHandler
     val blockResolver = ShuffleTestAccessor.getBlockResolver(blockHandler)
     ShuffleTestAccessor.registeredExecutorFile(blockResolver) should be (execStateFile)
 
+    val mergeMgrFile = s1.mergeManagerFile
+    val mergeManager = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    ShuffleTestAccessor.recoveryFile(mergeManager) should be (mergeMgrFile)
+
     blockResolver.registerExecutor(app1Id.toString, "exec-1", shuffleInfo1)
     blockResolver.registerExecutor(app2Id.toString, "exec-2", shuffleInfo2)
+    blockResolver.registerExecutor(app3Id.toString, "exec-3", mergedShuffleInfo3)
+    blockResolver.registerExecutor(app4Id.toString, "exec-4", mergedShuffleInfo4)
+    mergeManager.registerExecutor(app3Id.toString, mergedShuffleInfo3)
+    mergeManager.registerExecutor(app4Id.toString, mergedShuffleInfo4)
+
+    val partitionId3 = new AppAttemptShuffleMergeId(app3Id.toString, 1, 1, 1)
+    val partitionId4 = new AppAttemptShuffleMergeId(app4Id.toString, 1, 2, 1)
+    prepareAppShufflePartition(mergeManager, partitionId3, 1, "3")
+    prepareAppShufflePartition(mergeManager, partitionId4, 2, "4")
 
-    val db = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver)
-    ShuffleTestAccessor.reloadRegisteredExecutors(db) should not be empty
+    val blockResolverDB = ShuffleTestAccessor.shuffleServiceLevelDB(blockResolver)
+    ShuffleTestAccessor.reloadRegisteredExecutors(blockResolverDB) should not be empty
+    val mergeManagerDB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager)
+    ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager, mergeManagerDB) should not be empty
 
     s1.stopApplication(new ApplicationTerminationContext(app1Id))
-    ShuffleTestAccessor.reloadRegisteredExecutors(db) should not be empty
+    ShuffleTestAccessor.reloadRegisteredExecutors(blockResolverDB) should not be empty
+    ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager, mergeManagerDB) should not be empty
     s1.stopApplication(new ApplicationTerminationContext(app2Id))
-    ShuffleTestAccessor.reloadRegisteredExecutors(db) shouldBe empty
+    s1.stopApplication(new ApplicationTerminationContext(app3Id))
+    s1.stopApplication(new ApplicationTerminationContext(app4Id))
+    ShuffleTestAccessor.reloadRegisteredExecutors(blockResolverDB) shouldBe empty
+    ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager, mergeManagerDB) shouldBe empty
   }
 
   test("shuffle service should be robust to corrupt registered executor file") {
-    s1 = new YarnShuffleService
-    s1.setRecoveryPath(new Path(recoveryLocalDir.toURI))
-    s1.init(yarnConfig)
+    s1 = createYarnShuffleService()
     val app1Id = ApplicationId.newInstance(0, 1)
     val app1Data = makeAppInfo("user", app1Id)
     s1.initializeApplication(app1Data)
@@ -235,9 +412,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     out.writeInt(42)
     out.close()
 
-    s2 = new YarnShuffleService
-    s2.setRecoveryPath(new Path(recoveryLocalDir.toURI))
-    s2.init(yarnConfig)
+    s2 = createYarnShuffleService()
     s2.registeredExecutorFile should be (execStateFile)
 
     val handler2 = s2.blockHandler
@@ -255,9 +430,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     s2.stop()
 
     // another stop & restart should be fine though (e.g., we recover from previous corruption)
-    s3 = new YarnShuffleService
-    s3.setRecoveryPath(new Path(recoveryLocalDir.toURI))
-    s3.init(yarnConfig)
+    s3 = createYarnShuffleService()
     s3.registeredExecutorFile should be (execStateFile)
     val handler3 = s3.blockHandler
     val resolver3 = ShuffleTestAccessor.getBlockResolver(handler3)
@@ -380,6 +553,453 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     }
   }
 
+  test("Consistency in AppPathInfo between in-memory hashmap and the DB") {
+    s1 = createYarnShuffleService()
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+    s1.initializeApplication(app2Attempt1Data)
+    val app2Attempt2Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt2Data = makeAppInfo("user", app2Attempt2Id)
+    s1.initializeApplication(app2Attempt2Data)
+    val app3IdNoAttemptId = ApplicationId.newInstance(0, 3)
+    val app3NoAttemptIdData = makeAppInfo("user", app3IdNoAttemptId)
+    s1.initializeApplication(app3NoAttemptIdData)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy2/bippy2").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+    val mergedShuffleInfo3NoAttemptId =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath),
+      4, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithNoAttemptID)
+
+    val localDirs1 = Array(new File(tempDir, "foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, "bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt2 = Array(new File(tempDir, "bippy2/merge_manager_2").getAbsolutePath)
+    val localDirs3NoAttempt = Array(new File(tempDir, "foo/merge_manager").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+    val appPathsInfo2Attempt2 = new AppPathsInfo(localDirs2Attempt2, 5)
+    val appPathsInfo3NoAttempt = new AppPathsInfo(localDirs3NoAttempt, 4)
+
+    val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+    ShuffleTestAccessor.reloadAppShuffleInfo(
+      mergeManager1, mergeManager1DB).size() equals 0
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+
+    mergeManager1.registerExecutor(app2Attempt1Id.toString, mergedShuffleInfo2Attempt1)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+
+    mergeManager1.registerExecutor(app3IdNoAttemptId.toString, mergedShuffleInfo3NoAttemptId)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 3
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be (appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 3
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be (appPathsInfo3NoAttempt)
+
+    mergeManager1.registerExecutor(app2Attempt2Id.toString, mergedShuffleInfo2Attempt2)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 3
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt2Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt2)
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be (appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 3
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt2Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt2)
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be (appPathsInfo3NoAttempt)
+
+    mergeManager1.applicationRemoved(app2Attempt2Id.toString, true)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    assert(!appShuffleInfo.containsKey(app2Attempt2Id.toString))
+    appShuffleInfo.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be (appPathsInfo3NoAttempt)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    assert(!appShuffleInfoAfterReload.containsKey(app2Attempt2Id.toString))
+    appShuffleInfoAfterReload.get(
+      app3IdNoAttemptId.toString).getAppPathsInfo should be (appPathsInfo3NoAttempt)
+
+    s1.stop()
+  }
+
+  test("Finalized merged shuffle are written into DB and cleaned up after application stopped") {
+    s1 = createYarnShuffleService()
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Attempt1Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Attempt1Id)
+    s1.initializeApplication(app2Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+    val localDirs1 = Array(new File(tempDir, "foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, "bippy1/merge_manager_1").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+    val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1).size() equals 0
+    ShuffleTestAccessor.reloadAppShuffleInfo(
+      mergeManager1, mergeManager1DB).size() equals 0
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    mergeManager1.registerExecutor(app2Attempt1Id.toString, mergedShuffleInfo2Attempt1)
+    val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+    val partitionId2 = new AppAttemptShuffleMergeId(app2Attempt1Id.toString, 1, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+    prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfo.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(!appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 2
+    appShuffleInfoAfterReload.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfoAfterReload.get(
+      app2Attempt1Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+    assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+    assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.isEmpty)
+
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+    mergeManager1.applicationRemoved(app1Id.toString, true)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    assert(!appShuffleInfo.containsKey(app1Id.toString))
+    assert(appShuffleInfo.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+    assert(appShuffleInfoAfterReload.get(app2Attempt1Id.toString).getShuffles.get(2).isFinalized)
+
+    s1.stop()
+  }
+
+  test("Dangling finalized merged partition info in DB will be removed during restart") {
+    s1 = createYarnShuffleServiceWithCustomMergeManager(
+      ShuffleTestAccessor.createMergeManagerWithNoOpAppShuffleDBCleanup)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Data)
+    val app2Id = ApplicationId.newInstance(0, 2)
+    val app2Attempt1Data = makeAppInfo("user", app2Id)
+    s1.initializeApplication(app2Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+    val mergedShuffleInfo1 =
+      new ExecutorShuffleInfo(
+        Array(new File(tempDir, "foo/foo").getAbsolutePath,
+          new File(tempDir, "bar/bar").getAbsolutePath), 3,
+        SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo2Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+
+    val localDirs1 = Array(new File(tempDir, "foo/merge_manager_1").getAbsolutePath,
+      new File(tempDir, "bar/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt1 = Array(new File(tempDir, "bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs2Attempt2 = Array(new File(tempDir, "bippy2/merge_manager_2").getAbsolutePath)
+    val appPathsInfo1 = new AppPathsInfo(localDirs1, 3)
+    val appPathsInfo2Attempt1 = new AppPathsInfo(localDirs2Attempt1, 5)
+
+    val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager1DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager1)
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    mergeManager1.registerExecutor(app2Id.toString, mergedShuffleInfo2Attempt1)
+    val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 1, 1)
+    val partitionId2 = new AppAttemptShuffleMergeId(app2Id.toString, 1, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId1, 1, "3")
+    prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    appShuffleInfo.get(
+      app2Id.toString).getAppPathsInfo should be (appPathsInfo2Attempt1)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(!appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+    var appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.get(1).isFinalized)
+    assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    // The applicationRemove will not clean up the finalized merged shuffle partition in DB
+    // as of the NoOp mergedShuffleFileManager removeAppShuffleInfoFromDB method
+    mergeManager1.applicationRemoved(app1Id.toString, true)
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    assert(!appShuffleInfo.containsKey(app1Id.toString))
+    assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+    // Clear the AppsShuffleInfo hashmap and reload the hashmap from DB
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    appShuffleInfoAfterReload.size() equals 1
+    assert(!appShuffleInfoAfterReload.containsKey(app1Id.toString))
+    assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    // Register application app1Id again and reload the DB again
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 2
+    appShuffleInfo.get(app1Id.toString).getAppPathsInfo should be (appPathsInfo1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.isEmpty)
+    assert(appShuffleInfo.get(app2Id.toString).getShuffles.get(2).isFinalized)
+    appShuffleInfoAfterReload =
+      ShuffleTestAccessor.reloadAppShuffleInfo(mergeManager1, mergeManager1DB)
+    // The merged partition information for App1 should be empty as they have been removed from DB
+    assert(appShuffleInfoAfterReload.get(app1Id.toString).getShuffles.isEmpty)
+    assert(appShuffleInfoAfterReload.get(app2Id.toString).getShuffles.get(2).isFinalized)
+
+    s1.stop()
+  }
+
+  test("Dangling application path or shuffle information in DB will be removed during restart") {
+    s1 = createYarnShuffleServiceWithCustomMergeManager(
+      ShuffleTestAccessor.createMergeManagerWithNoDBCleanup)
+
+    val app1Id = ApplicationId.newInstance(0, 2)
+    val app1Attempt1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+
+    val mergedShuffleInfo1Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo1Attempt2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy2/bippy2").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+
+    val localDirs1Attempt1 = Array(new File(tempDir, "bippy1/merge_manager_1").getAbsolutePath)
+    val localDirs1Attempt2 = Array(new File(tempDir, "bippy2/merge_manager_2").getAbsolutePath)
+    val appPathsInfo1Attempt1 = new AppPathsInfo(localDirs1Attempt1, 5)
+    val appPathsInfo1Attempt2 = new AppPathsInfo(localDirs1Attempt2, 5)
+
+    val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    ShuffleTestAccessor.recoveryFile(mergeManager1) should be (mergeMgrFile)
+
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt1)
+    val partitionId1 = new AppAttemptShuffleMergeId(app1Id.toString, 1, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId1, 2, "4")
+
+    var appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(
+      app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt1)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId1)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+    // Register Attempt 2
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt2)
+    val partitionId2 = new AppAttemptShuffleMergeId(app1Id.toString, 2, 2, 1)
+    prepareAppShufflePartition(mergeManager1, partitionId2, 2, "4")
+
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(
+      app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt2)
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+    val partitionId2Attempt2 = new AppAttemptShuffleMergeId(app1Id.toString, 2, 2, 2)
+    prepareAppShufflePartition(mergeManager1, partitionId2Attempt2, 2, "4")
+    assert(!appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+    ShuffleTestAccessor.finalizeShuffleMerge(mergeManager1, partitionId2Attempt2)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+
+    // now we pretend the shuffle service goes down, since the DB deletion are NoOp,
+    // it should have multiple app attempt local paths info and finalized merge info
+    s1.stop()
+    // Yarn shuffle service with custom mergeManager to confirm that DB has outdated data
+    s2 = createYarnShuffleServiceWithCustomMergeManager(
+      ShuffleTestAccessor.createMergeManagerWithNoCleanupAfterReload)
+    val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager2DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager2)
+    ShuffleTestAccessor.clearAppShuffleInfo(mergeManager2)
+    assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
+      mergeManager2, mergeManager2DB) == 1)
+    assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload(
+      mergeManager2, mergeManager2DB) == 2)
+    s2.stop()
+
+    // Yarn Shuffle service comes back up without custom mergeManager
+    s3 = createYarnShuffleService()
+    s3.mergeManagerFile should be (mergeMgrFile)
+
+    val mergeManager3 = s3.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager3DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager3)
+    appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager3)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(
+      app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt2)
+    assert(appShuffleInfo.get(app1Id.toString).getShuffles.get(2).isFinalized)
+    ShuffleTestAccessor.clearAppShuffleInfo(mergeManager3)
+    assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
+      mergeManager3, mergeManager3DB) == 0)
+    assert(ShuffleTestAccessor.getOutdatedFinalizedShuffleCountDuringDBReload(
+      mergeManager3, mergeManager3DB) == 0)
+
+    s3.stop()
+  }
+
+  test("Cleanup for former attempts local path info should be triggered in applicationRemoved") {
+    s1 = createYarnShuffleServiceWithCustomMergeManager(
+      ShuffleTestAccessor.createMergeManagerWithNoDBCleanup)
+
+    val app1Id = ApplicationId.newInstance(0, 1)
+    val app1Attempt1Data = makeAppInfo("user", app1Id)
+    s1.initializeApplication(app1Attempt1Data)
+
+    val mergeMgrFile = s1.mergeManagerFile
+    mergeMgrFile should not be (null)
+
+    val mergedShuffleInfo1Attempt1 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy1/bippy1").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID1)
+    val mergedShuffleInfo1Attempt2 =
+      new ExecutorShuffleInfo(Array(new File(tempDir, "bippy2/bippy2").getAbsolutePath),
+        5, SORT_MANAGER_WITH_MERGE_SHUFFLE_META_WithAttemptID2)
+
+    val localDirs1Attempt2 = Array(new File(tempDir, "bippy2/merge_manager_2").getAbsolutePath)
+    val appPathsInfo1Attempt2 = new AppPathsInfo(localDirs1Attempt2, 5)
+
+    val mergeManager1 = s1.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt1)
+
+    // Register Attempt 2
+    mergeManager1.registerExecutor(app1Id.toString, mergedShuffleInfo1Attempt2)
+
+    val appShuffleInfo = ShuffleTestAccessor.getAppsShuffleInfo(mergeManager1)
+    appShuffleInfo.size() equals 1
+    appShuffleInfo.get(
+      app1Id.toString).getAppPathsInfo should be (appPathsInfo1Attempt2)
+
+    // now we pretend the shuffle service goes down, since the DB deletion are NoOp,
+    // it should have multiple app attempt local paths info
+    s1.stop()
+    // Yarn Shuffle service comes back up without custom mergeManager
+    s2 = createYarnShuffleServiceWithCustomMergeManager(
+      ShuffleTestAccessor.createMergeManagerWithNoCleanupAfterReload)
+
+    val mergeManager2 = s2.shuffleMergeManager.asInstanceOf[RemoteBlockPushResolver]
+    val mergeManager2DB = ShuffleTestAccessor.mergeManagerLevelDB(mergeManager2)
+    ShuffleTestAccessor.clearAppShuffleInfo(mergeManager2)
+    assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
+      mergeManager2, mergeManager2DB) == 1)
+
+    // ApplicationRemove should trigger DB cleanup
+    mergeManager2.applicationRemoved(app1Id.toString, true)
+    assert(ShuffleTestAccessor.getOutdatedAppPathInfoCountDuringDBReload(
+      mergeManager2, mergeManager2DB) == 0)
+
+    s2.stop()
+  }
+
   private def makeAppInfo(user: String, appId: ApplicationId): ApplicationInitializationContext = {
     val secret = ByteBuffer.wrap(new Array[Byte](0))
     new ApplicationInitializationContext(user, appId, secret)
@@ -436,7 +1056,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     val mockConf = mock(classOf[TransportConf])
     when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
       "org.apache.spark.network.shuffle.NoOpMergedShuffleFileManager")
-    val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
+    val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf, null)
     assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
   }
 
@@ -444,7 +1064,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     val mockConf = mock(classOf[TransportConf])
     when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
       "org.apache.spark.network.shuffle.RemoteBlockPushResolver")
-    val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
+    val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf, null)
     assert(mergeMgr.isInstanceOf[RemoteBlockPushResolver])
   }
 
@@ -452,7 +1072,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers {
     val mockConf = mock(classOf[TransportConf])
     when(mockConf.mergedShuffleFileManagerImpl).thenReturn(
       "org.apache.spark.network.shuffle.NotExistent")
-    val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf)
+    val mergeMgr = YarnShuffleService.newMergedShuffleFileManagerInstance(mockConf, null)
     assert(mergeMgr.isInstanceOf[NoOpMergedShuffleFileManager])
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org