You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/10/09 09:26:56 UTC

[GitHub] [spark] mridulm commented on a diff in pull request #37922: [SPARK-40480][SHUFFLE] Remove push-based shuffle data after query finished

mridulm commented on code in PR #37922:
URL: https://github.com/apache/spark/pull/37922#discussion_r990751884


##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java:
##########
@@ -224,6 +224,12 @@ protected void handleMessage(
       } finally {
         responseDelayContext.stop();
       }
+    } else if (msgObj instanceof RemoveShuffleMerge) {
+      RemoveShuffleMerge msg = (RemoveShuffleMerge) msgObj;
+      checkAuth(client, msg.appId);
+      logger.info("Removing shuffle merge data for application {} shuffle {} shuffleMerge {}",
+          msg.appId, msg.shuffleId, msg.shuffleMergeId);
+      mergeManager.removeShuffleMerge(msg.appId, msg.shuffleId, msg.shuffleMergeId);

Review Comment:
   We need to pass in `appAttemptId` as well for `RemoveShuffleMerge` - I had left that comment earlier.
   Take a look at `FinalizeShuffleMerge`, its processing and handle it similarly ? (we can pass `RemoveShuffleMerge` to `mergeManager.removeShuffleMerge` - and lookup from the fields there).



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);

Review Comment:
   Add validation for `attemptId` here.
   Take a look at `finalizeShuffleMerge` for example



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -393,6 +393,22 @@ public void applicationRemoved(String appId, boolean cleanupLocalDirs) {
     }
   }
 
+  @Override
+  public void removeShuffleMerge(String appId, int shuffleId, int shuffleMergeId) {
+    AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(appId);
+    AppShuffleMergePartitionsInfo partitionsInfo = appShuffleInfo.shuffles.remove(shuffleId);

Review Comment:
   We should not remove it directly - the value within the map could be for a different `shuffleMergeId` (newer for example).
   Take a look at the `finalizeShuffleMerge` on how to handle the corner cases.
   
   Rough sketch is:
   
   ```
     public void removeShuffleMerge(FinalizeShuffleMerge msg) {
       AppShuffleInfo appShuffleInfo = validateAndGetAppShuffleInfo(msg.appId);
       if (appShuffleInfo.attemptId != msg.appAttemptId) {
         // incoming request for older app attempt - exception
         throw new IllegalArgumentException("appropriate msg for invalid app attempt");
       }
   
       appShuffleInfo.shuffles.compute(msg.shuffleId, (shuffleId, mergePartitionsInfo) -> {
   
         if (null != mergePartitionsInfo) {
           // where DELETE_CURRENT == -1
           // merge id will be set to -1 when we are cleaning up shuffle, and there is no chance of its reuse -
           // else it will be set to an explicit value.
           boolean deleteAny = msg.shuffleMergeId == DELETE_CURRENT;
   
           // Looks like there is a bug in finalizeShuffleMerge, let us fix it here anyway
           // and handle it for finalizeShuffleMerge in a different PR.
           AppAttemptShuffleMergeId currentAppAttemptShuffleMergeId new AppAttemptShuffleMergeId(
                   msg.appId, msg.appAttemptId, 
                   msg.shuffleId, mergePartitionsInfo.shuffleMergeId);
           if (!deleteAny && msg.shuffleMergeId < mergePartitionsInfo.shuffleMergeId) {
   
             // throw exception - request for an older shuffle merge id
             throw new RuntimeException("appropriate msg for delete of old merge id");
           } else if (!deleteAny && msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
   
             // cleanup request for newer shuffle - remove the outdated data we have.
             submitCleanupTask(() ->
                     closeAndDeleteOutdatedPartitions(
                             currentAppAttemptShuffleMergeId,
                             mergePartitionsInfo.shuffleMergePartitions));
           } else {
   
             // request to cleanup shuffle we are currently hosting
   
             // Not yet finalized - use the existing cleanup mechanism
             if (!mergePartitionsInfo.isFinalized()) {
               submitCleanupTask(() ->
                       closeAndDeleteOutdatedPartitions(
                               currentAppAttemptShuffleMergeId,
                               mergePartitionsInfo.shuffleMergePartitions));
             } else {
   
               if (! mergePartitionsInfo.getReduceIds().isEmpty()) {
                 // Introduce new method which deletes the files for shuffleMergeId
                 submitCleanupTask(() ->
                       deleteMergedFiles(msg.appId, msg.appAttemptId, 
                                     msg.shuffleId, msg.shuffleMergeId, 
                                     // To be introduced - see below
                                     mergePartitionsInfo.getReduceIds()));
   
               }
               // simply return existing entry immediately - db does not need updating - we can actually 
               // drop reduce-ids here as an optimization
               return mergePartitionsInfo;
             }
           }
         }
         // keep track of the latest merge id - and mark it as finalized and immutable as already marked for deletion/deleted.
         AppAttemptShuffleMergeId appAttemptShuffleMergeId = new AppAttemptShuffleMergeId(
           msg.appId, msg.appAttemptId, msg.shuffleId, msg.shuffleMergeId);
         writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
         // no reduceid's
         return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
       });
     }
   ```
   
   To write up `deleteMergedFiles`, the only thing missing is set of valid reduce id's (`getReduceIds` above).
   We can keep track of that by modifying `finalizeShuffleMerge` as follows:
   
   a) keep reference to response from `appShuffleInfo.shuffles.compute()`
   b) Before returning `mergeStatuses`, update this variable with `reduceIds`
   c) for efficiency, we can convert it to a bitmap and save space - but that is an impl detail.
   
   Thoughts ?
   
   +CC @otterc, @zhouyejoe - please take a look at the bug there should be there in `finalizeShuffleMerge`, which I have sketched fix for above.
   Unless I am missing something, we should fix `finalizeShuffleMerge` in a seperate PR.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -702,7 +722,8 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
                 "finalizing shuffle partition {}", msg.appId, msg.appAttemptId, msg.shuffleId,
                 msg.shuffleMergeId, partition.reduceId);
           } finally {
-            partition.closeAllFilesAndDeleteIfNeeded(false);
+            Boolean deleteFile = partition.mapTracker.getCardinality() == 0;
+            partition.closeAllFilesAndDeleteIfNeeded(deleteFile);

Review Comment:
   Revert ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+              mergePartitionsInfo.shuffleMergePartitions;
           submitCleanupTask(() ->
-              closeAndDeleteOutdatedPartitions(
-                  appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
+              closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, shuffleMergePartitions));
         } else {

Review Comment:
   revert this ?



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -650,24 +666,28 @@ public MergeStatuses finalizeShuffleMerge(FinalizeShuffleMerge msg) {
         } else if (msg.shuffleMergeId > mergePartitionsInfo.shuffleMergeId) {
           // If no blocks pushed for the finalizeShuffleMerge shuffleMergeId then return
           // empty MergeStatuses but cleanup the older shuffleMergeId files.
+          Map<Integer, AppShufflePartitionInfo> shuffleMergePartitions =
+              mergePartitionsInfo.shuffleMergePartitions;
           submitCleanupTask(() ->
-              closeAndDeleteOutdatedPartitions(
-                  appAttemptShuffleMergeId, mergePartitionsInfo.shuffleMergePartitions));
+              closeAndDeleteOutdatedPartitions(appAttemptShuffleMergeId, shuffleMergePartitions));
         } else {
           // This block covers:
           //  1. finalization of determinate stage
           //  2. finalization of indeterminate stage if the shuffleMergeId related to it is the one
           //  for which the message is received.
           shuffleMergePartitionsRef.set(mergePartitionsInfo.shuffleMergePartitions);
         }
+      } else {
+        mergePartitionsInfo = new AppShuffleMergePartitionsInfo(shuffleId, true);
       }
+      mergePartitionsInfo.setFinalized(true);
       // Update the DB for the finalized shuffle
       writeAppAttemptShuffleMergeInfoToDB(appAttemptShuffleMergeId);
       // Even when the mergePartitionsInfo is null, we mark the shuffle as finalized but the results
       // sent to the driver will be empty. This can happen when the service didn't receive any
       // blocks for the shuffle yet and the driver didn't wait for enough time to finalize the
       // shuffle.
-      return new AppShuffleMergePartitionsInfo(msg.shuffleMergeId, true);
+      return mergePartitionsInfo;

Review Comment:
   revert changes to this method ?



##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2543,16 +2541,13 @@ private[spark] class DAGScheduler(
       shuffleIdToMapStage.filter { case (_, stage) =>
         stage.shuffleDep.shuffleMergeAllowed && stage.shuffleDep.getMergerLocs.isEmpty &&
           runningStages.contains(stage)
-      }.foreach { case(_, stage: ShuffleMapStage) =>
-          if (getAndSetShufflePushMergerLocations(stage).nonEmpty) {
-            logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" +
-              s" ${stage.shuffleDep.shuffleId} and shuffle merge" +
-              s" ${stage.shuffleDep.shuffleMergeId} with ${stage.shuffleDep.getMergerLocs.size}" +
-              s" merger locations")
-            mapOutputTracker.registerShufflePushMergerLocations(stage.shuffleDep.shuffleId,
-              stage.shuffleDep.getMergerLocs)
-          }
-        }
+      }.foreach { case (_, stage: ShuffleMapStage) =>
+        configureShufflePushMergerLocations(stage)
+        logInfo(s"Shuffle merge enabled adaptively for $stage with shuffle" +

Review Comment:
   Surround the `logInfo` with `if (stage.shuffleDep.getMergerLocs.nonEmpty)` - else we will print that log line even if mergers were not set.



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1639,9 +1661,9 @@ void closeAllFilesAndDeleteIfNeeded(boolean delete) {
       try {
         if (dataChannel.isOpen()) {
           dataChannel.close();
-          if (delete) {
-            dataFile.delete();
-          }
+        }
+        if (delete) {
+          dataFile.delete();

Review Comment:
   This looks like an orthogonal bug fix.
   +CC @otterc 



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockStoreClient.java:
##########
@@ -256,6 +256,20 @@ public void onFailure(Throwable e) {
     }
   }
 
+  @Override
+  public boolean removeShuffleMerge(String host, int port, int shuffleId) {
+    checkInit();
+    try {
+      TransportClient client = clientFactory.createClient(host, port);

Review Comment:
   `send` should be fine here - there is no response from ESS to driver when sending `RemoveShuffleMerge`



##########
common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java:
##########
@@ -1410,26 +1431,27 @@ public String toString() {
    * required for the shuffles of indeterminate stages.
    */
   public static class AppShuffleMergePartitionsInfo {

Review Comment:
   Revert changes to this class ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org