You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/12/21 08:50:47 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r1054099320


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+                  + "with the current attempt id %s stored in shuffle service for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
+      boolean deleteCurrent =
+          msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+      if(deleteCurrent) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() -> {
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions);
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+          });
+        } else {
+          submitCleanupTask(() -> {
+            deleteMergedFiles(currentAppAttemptShuffleMergeId,
+                mergePartitionsInfo.getReduceIds());
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+            mergePartitionsInfo.setReduceIds(new int[0]);
+          });
+        }
+      } else if(msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
+        throw new RuntimeException(String.format("Asked to remove old shuffle merged data for " +
+                "application %s shuffleId %s shuffleMergeId %s, but current shuffleMergeId %s ",
+            msg.appId, msg.shuffleId, msg.shuffleMergeId, mergePartitionsInfo.shuffleMergeId));
+      } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
+        // cleanup request for newer shuffle - remove the outdated data we have.
+        submitCleanupTask(() -> {
+          closeAndDeleteOutdatedPartitions(
+              currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions);
+          writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   As mentioned above, `writeAppAttemptShuffleMergeInfoToDB` should be done within the critical section.
   We expect the this table to be consistent with `appShuffleInfo.shuffles`
   
   Let us actually move the update to db to the bottom - it is going to be required for all the code paths (since we error out if message is outdated).



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
       });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, int[] reduceIds) {
+    removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);

Review Comment:
   This call is happening in a threadpool and can be delayed - it is possible that appShuffleInfo was updated to a  different attempt id by this time.
   Pass this from the caller.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+                  + "with the current attempt id %s stored in shuffle service for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
+      boolean deleteCurrent =
+          msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);

Review Comment:
   ```suggestion
             msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
   ```
   
   `msg.shuffleMergeId` can be `-1`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+                  + "with the current attempt id %s stored in shuffle service for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {

Review Comment:
   We have to keep track of it even if it is absent - so that a late push does not end up creating/updating it.
   [This comment](https://github.com/apache/spark/pull/37922#discussion_r990753031) sketches the details (though writing to the db should be done only when `shuffleMergeId  != -1` and absent)



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1465,10 +1557,13 @@ public static class AppShuffleMergePartitionsInfo {
     private final int shuffleMergeId;
     private final Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions;
 
+    private int[] reduceIds;

Review Comment:
   ```suggestion
       private final AtomicReferen<int[]> reduceIds = new AtomicReference<>(new int[0]);
   ```
   
   and suitably modify the get and set.
   It is not thread safe otherwise



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -95,6 +96,12 @@ public class RemoteBlockPushResolver implements MergedShuffleFileManager {
   public static final String MERGE_DIR_KEY = "mergeDir";
   public static final String ATTEMPT_ID_KEY = "attemptId";
   private static final int UNDEFINED_ATTEMPT_ID = -1;
+
+  /**
+   * The flag for deleting the current merged shuffle data.
+   */
+  public static final int DELETE_CURRENT_MERGED_SHUFFLE_ID = -1;

Review Comment:
   nit: Rename this as `DELETE_ALL_MERGED_SHUFFLE` or some such ?
   That is the behavior we actually are looking for - in case the implementation evolves.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+                  + "with the current attempt id %s stored in shuffle service for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
+      boolean deleteCurrent =
+          msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+      if(deleteCurrent) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() -> {
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions);
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+          });
+        } else {
+          submitCleanupTask(() -> {
+            deleteMergedFiles(currentAppAttemptShuffleMergeId,
+                mergePartitionsInfo.getReduceIds());

Review Comment:
   Not sure I follow ... this is in the `else` block of `!mergePartitionsInfo.isFinalized()` - so it is finalized ?
   Or is this a stale comment that has already been addressed @yabola ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -702,7 +722,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
                 "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId,
                 msg.shuffleMergeId, partition.reduceId);
           } finally {
-            partition.closeAllFilesAndDeleteIfNeeded(false);
+            Boolean deleteFile = partition.mapTracker.getCardinality() == 0;
+            partition.closeAllFilesAndDeleteIfNeeded(deleteFile);

Review Comment:
   The actual deletion will be handled by either application termination, or during shuffle cleanup (this PR).
   Making this change here might have other issues we will need to think through.
   
   If we do want to do this, can you file a follow up jira and we can investigate it there ?
   I want to limit this PR specifically to changes required for this feature.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+                  + "with the current attempt id %s stored in shuffle service for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
+      boolean deleteCurrent =
+          msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+      if(deleteCurrent) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() -> {
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions);
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);

Review Comment:
   Yes, we should update the db.
   Also, `writeAppAttemptShuffleMergeInfoToDB` must be done from within the critical section (should be pulled outside the cleanup task).
   
   Spark thrift server is an interesting corner case - can you file a follow up jira for that please @wankunde ?
   It actually applies to all caches we are maintaining in ESS (not just this one) - for example, `ExternalShuffleBlockResolver.executors`.
   
   Practically, the number of executors should go into millions before this becomes a problem, but I can see that potentially happening.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -396,6 +403,56 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(RemoveShuffleMerge msg) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
+    if (appShuffleInfo.attemptId != msg.appAttemptId) {
+      throw new IllegalArgumentException(
+          String.format("The attempt id %s in this RemoveShuffleMerge message does not match "
+                  + "with the current attempt id %s stored in shuffle service for application %s",
+              msg.appAttemptId, appShuffleInfo.attemptId, msg.appId));
+    }
+    appShuffleInfo.shuffles.computeIfPresent(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
+      boolean deleteCurrent =
+          msg.shuffleMergeId == DELETE_CURRENT_MERGED_SHUFFLE_ID ||
+              msg.shuffleMergeId == mergePartitionsInfo.shuffleMergeId;
+      AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId =
+          new AppAttemptShuffleMergeId(
+              msg.appId, msg.appAttemptId, msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
+      AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
+          msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
+      if(deleteCurrent) {
+        // request to clean up shuffle we are currently hosting
+        if (!mergePartitionsInfo.isFinalized()) {
+          submitCleanupTask(() -> {
+            closeAndDeleteOutdatedPartitions(
+                currentAppAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions);
+            writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
+          });
+        } else {
+          submitCleanupTask(() -> {
+            deleteMergedFiles(currentAppAttemptShuffleMergeId,

Review Comment:
   ```suggestion
               deleteMergedFiles(currentAppAttemptShuffleMergeId, appShuffleInfo,
   ```
   
   See comment in `deleteMergedFiles`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -470,6 +527,39 @@ void closeAndDeleteOutdatedPartitions(
       });
   }
 
+  void deleteMergedFiles(AppAttemptShuffleMergeId appAttemptShuffleMergeId, int[] reduceIds) {
+    removeAppShufflePartitionInfoFromDB(appAttemptShuffleMergeId);
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appAttemptShuffleMergeId.appId);
+    int shuffleId = appAttemptShuffleMergeId.shuffleId;
+    int shuffleMergeId = appAttemptShuffleMergeId.shuffleMergeId;
+    for (int reduceId : reduceIds) {
+      try {
+        File dataFile =
+            appShuffleInfo.getMergedShuffleDataFile(shuffleId, shuffleMergeId, reduceId);
+        dataFile.delete();
+      } catch (Exception e) {

Review Comment:
   There is no `Exception` thrown during deletion (no security manager).
   It returns a `boolean` indicating successful or failed delete.
   
   We can keep track of that, and emit a consolidated log message at the end if `dataFileDeleteCount > 0 || indexFileDeleteCount > 0 || metaDeleteCount > 0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org