You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/27 16:22:56 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #35906: [SPARK-33236][shuffle] Enable Push-based shuffle service to store state in NM level DB for work preserving restart

mridulm commented on code in PR #35906:
URL: https://github.com/apache/spark/pull/35906#discussion_r883744010


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    cleanUpAppShuffleInfoInDB(appShuffleInfo);

Review Comment:
   This method is called async to application completion - and can be delayed.
   In case there is a NM shutdown, some of these wont be cleaned up. (`close` will close the db, and so all subsequent deletes will fail)
   
   Do we want to delete the app attempt paths immediately, and do the shuffle deletes async (along with path deletes like here) ?
   On reload time, if shuffle info is present for missing attempts paths, we can remove those from the db.
   
   Thoughts ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -209,9 +246,16 @@ private AppShufflePartitionInfo getOrCreateAppShufflePartitionInfo(
         appShuffleInfo.getMergedShuffleIndexFilePath(shuffleId, shuffleMergeId, reduceId));
       File metaFile =
         appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId);
+      // Make sure unuseful non-finalized merged data/index/meta files get cleaned up
+      // during service restart
+      if (dataFile.exists()) dataFile.delete();
+      if (indexFile.exists()) indexFile.delete();
+      if (metaFile.exists()) metaFile.delete();

Review Comment:
   We are immediately opening all these files with append = false, why do we need to delete them ?
   Note: if we are removing the `delete()` here, do add a comment in `MergeShuffleFile`, etc at `FileOutputStream` creation that it must have append = false



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -342,6 +389,29 @@ void closeAndDeletePartitionFilesIfNeeded(
     if (cleanupLocalDirs) {
       deleteExecutorDirs(appShuffleInfo);
     }
+    cleanUpAppShuffleInfoInDB(appShuffleInfo);
+  }
+
+  private void cleanUpAppShuffleInfoInDB(AppShuffleInfo appShuffleInfo) {
+    if (db != null) {
+      try {
+        db.delete(
+          getDbAppAttemptPathsKey(
+            new AppAttemptId(appShuffleInfo.appId, appShuffleInfo.attemptId)));
+        appShuffleInfo.shuffles
+          .forEach((shuffleId, shuffleInfo) -> shuffleInfo.shuffleMergePartitions
+            .forEach((shuffleMergeId, partitionInfo) -> {
+              synchronized (partitionInfo) {
+                cleanUpAppShufflePartitionInfoInDB(
+                  new AppAttemptShuffleMergeId(
+                    appShuffleInfo.appId, appShuffleInfo.attemptId, shuffleId, shuffleMergeId));
+              }
+            }));
+      } catch (Exception e) {
+        logger.error("Error deleting {}_{} from application paths info db",
+          appShuffleInfo.appId, appShuffleInfo.attemptId, e);

Review Comment:
   This failure could be for either application paths info or for shuffle partitions. Update message ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -709,9 +948,9 @@ public ByteBuffer getCompletionResponse() {
      */
     private void writeBuf(ByteBuffer buf) throws IOException {

Review Comment:
   revert changes to this method ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -565,8 +650,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
               sizes.add(partition.getLastChunkOffset());
               logger.debug("{} attempt {} shuffle {} shuffleMerge {}: finalization results " +
                   "added for partition {} data size {} index size {} meta size {}",
-                  msg.appId, msg.appAttemptId, msg.shuffleId,
-                  msg.shuffleMergeId, partition.reduceId, partition.getLastChunkOffset(),
+                  msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId,
+                  partition.reduceId, partition.getLastChunkOffset(),

Review Comment:
   revert ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -576,6 +661,7 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
           } finally {
             partition.closeAllFilesAndDeleteIfNeeded(false);
           }
+          cleanUpAppShufflePartitionInfoInDB(partition.appAttemptShuffleMergeId);

Review Comment:
   Why are we doing this ? We just inserted it in the `compute` above ?
   We should have a test which validates that after successful finalization, the db contains the partition info.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -614,12 +700,12 @@ public void registerExecutor(String appId, ExecutorShuffleInfo executorInfo) {
         if (attemptId == UNDEFINED_ATTEMPT_ID) {
           // When attemptId is -1, there is no attemptId stored in the ExecutorShuffleInfo.
           // Only the first ExecutorRegister message can register the merge dirs
-          appsShuffleInfo.computeIfAbsent(appId, id ->
-            new AppShuffleInfo(
-              appId, UNDEFINED_ATTEMPT_ID,
-              new AppPathsInfo(appId, executorInfo.localDirs,
-                mergeDir, executorInfo.subDirsPerLocalDir)
-            ));
+          appsShuffleInfo.computeIfAbsent(appId, id -> {
+            AppPathsInfo appPathsInfo = new AppPathsInfo(appId, attemptId, executorInfo.localDirs,
+                mergeDir, executorInfo.subDirsPerLocalDir);
+            writeAppPathsInfoToDb(appId, attemptId, appPathsInfo);
+            return new AppShuffleInfo(appId, UNDEFINED_ATTEMPT_ID, appPathsInfo);

Review Comment:
   nit: either use `UNDEFINED_ATTEMPT_ID` or `attemptId` in both cases.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -928,7 +1169,7 @@ public void onComplete(String streamId) throws IOException {
               throw ioe;
             }
           }
-          long updatedPos = partitionInfo.getDataFilePos() + length;
+          long updatedPos = partitionInfo.dataFilePos + length;
           boolean indexUpdated = false;

Review Comment:
   revert



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -992,6 +1233,45 @@ AppShufflePartitionInfo getPartitionInfo() {
     }
   }
 
+  /**
+   * Simply encodes an application attempt ID.
+   */
+  public static class AppAttemptId {

Review Comment:
   For the various json beans we have `AppAttemptId`, `AppAttemptShuffleMergeId`, `AppPathsInfo` - can we make sure the `equals`, `toString` and `hashCode` follow similar patterns ?
   
   For example, `AppAttemptId`/`AppPathsInfo` is allowing for subclasses to be passed in, while `AppAttemptShuffleMergeId` check if the arg is specifically of the same class.
   
   Also, in `equals`, go from cheapest check to more expensive ones progressively.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org