You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "wankunde (via GitHub)" <gi...@apache.org> on 2024/03/22 07:30:44 UTC

[PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

wankunde opened a new pull request, #45661:
URL: https://github.com/apache/spark/pull/45661

   
   
   ### What changes were proposed in this pull request?
   
   If there is only one spill data file, spark will transfer that spill file to the final data file.
   
   ```
   @Override
     public void transferMapSpillFile(
         File mapSpillFile,
         long[] partitionLengths,
         long[] checksums) throws IOException {
       // The map spill file already has the proper format, and it contains all of the partition data.
       // So just transfer it directly to the destination without any merging.
       File outputFile = blockResolver.getDataFile(shuffleId, mapId);
       File tempFile = Utils.tempFileWith(outputFile);
       Files.move(mapSpillFile.toPath(), tempFile.toPath());
       blockResolver
         .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile);
     }
   ```
   
   But if that spill file and the final data file are on different disks, there will still be a heavy data transfer.
   
   ```
   sun.nio.fs.UnixCopyFile.transfer(Native Method)
   sun.nio.fs.UnixCopyFile.copyFile(UnixCopyFile.java:251)
   sun.nio.fs.UnixCopyFile.move(UnixCopyFile.java:471)
   sun.nio.fs.UnixFileSystemProvider.move(UnixFileSystemProvider.java:262)
   java.nio.file.Files.move(Files.java:1395)
   org.apache.spark.shuffle.sort.io.LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile(LocalDiskSingleSpillMapOutputWriter.java:52)
   org.apache.spark.shuffle.sort.UnsafeShuffleWriter.mergeSpills(UnsafeShuffleWriter.java:280)
   org.apache.spark.shuffle.sort.UnsafeShuffleWriter.closeAndWriteOutput(UnsafeShuffleWriter.java:224)
   org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:180)
   org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
   org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
   org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
   org.apache.spark.scheduler.Task.run(Task.scala:131)
   org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:498)
   org.apache.spark.executor.Executor$TaskRunner$$Lambda$453/980524593.apply(Unknown Source)
   org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1465)
   org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501)
   java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
   java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
   java.lang.Thread.run(Thread.java:748)
   ```
   
   We can optimize this step by writing the final spill file to the disk which the final data file will be used.
   
   ### Why are the changes needed?
   
   Optimize spark shuffle performance.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Local test
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1535861287


##########
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala:
##########
@@ -226,6 +226,24 @@ private[spark] class DiskBlockManager(
     (blockId, getFile(blockId))
   }
 
+  /** Produces a unique block id and File suitable for storing shuffled intermediate results
+   * in the input directory. */

Review Comment:
   nit.
   ```java
     /**
      * Produces a unique block id and File suitable for storing shuffled intermediate results
      * in the input directory.
      */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1535871592


##########
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala:
##########
@@ -226,6 +226,24 @@ private[spark] class DiskBlockManager(
     (blockId, getFile(blockId))
   }
 
+  /** Produces a unique block id and File suitable for storing shuffled intermediate results
+   * in the input directory. */
+  def createTempShuffleBlockInDir(fileDir: File): (TempShuffleBlockId, File) = {
+    var blockId = TempShuffleBlockId(UUID.randomUUID())
+    var tmpFile = new File(fileDir, blockId.name)
+    while (tmpFile.exists()) {
+      blockId = TempShuffleBlockId(UUID.randomUUID())
+      tmpFile = new File(fileDir, blockId.name)

Review Comment:
   If `fileDir` is invalid, we are in the `Infinite` loop, aren't we? It seems that we need a safe guard to avoid the infinite loop, @wankunde . Also, please add a unit test case for the invalid `fileDir` (maybe, nonExist)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2019551339

   A move between disks requires actual data copy, while a mv within the same disk is simply a metadata operation.
   Having said that, if the disk doing < 1MBps transfer rates, there are some severe infra issues to be sorted out.
   
   I agree with @cloud-fan - I am not in favor of complicating spark code for handling extremely degenerate cases like this.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1540475779


##########
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java:
##########
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more details.
     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();
+      finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
+        .orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);

Review Comment:
   ```suggestion
         finalDataFileDir.filter(v -> spills.isEmpty()).map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
           .orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);
   ```
   
   We need this only when there is a single output file, else we want to spread it.



##########
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java:
##########
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more details.
     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();

Review Comment:
   That would assume the final output has to go to `blockResolver.getDataFile(shuffleId, mapId)` right @cloud-fan  ?
   Currently at this layer we do not make that assumption ...
   
   I was initially toying with the idea of passing `mapId` and `shuffleId` as constructor params ... and do something similar when I realized this would make assumptions that the code currently does not make - and so why the base directory is being passed around.
   
   (And then ofcourse I thought we could solve it in `LocalDiskSingleSpillMapOutputWriter` [here](https://github.com/apache/spark/pull/45661#issuecomment-2020527368), but was completely wrong :-( ).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2020527368

   `ExecutorDiskUtils.getFilePath` and `DiskBlockManager.getFile` determines the layout of the files ...
   and given this is specific to an optimization in `LocalDiskShuffleExecutorComponents` (which determines the output file), why not simply modify `LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile` to leverage this layout information and 'host' the final output in the same disk ?
   
   ```
   
     public void transferMapSpillFile(
         File mapSpillFile,
         long[] partitionLengths,
         long[] checksums) throws IOException {
       // The map spill file already has the proper format, and it contains all of the partition data.
       // So just transfer it directly to the destination without any merging.
       File parent = ExecutorDiskUtils.getLocalDir(mapSpillFile);
       File outputFile = blockResolver.getDataFile(shuffleId, mapId, Some(Array(parent.getAbsolutePath)));
       File tempFile = Utils.tempFileWith(outputFile);
       Files.move(mapSpillFile.toPath(), tempFile.toPath());
       blockResolver
         .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile);
     }
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2020350812

   I think renaming and reading/writing should be faster if the files are co-located on the same disk?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2017066187

   > To @wankunde , to be clear, the below two comments are critical to this PR. You had better answer to resolve them.
   > 
   > * https://github.com/apache/spark/pull/45661/files#r1535871592
   > * https://github.com/apache/spark/pull/45661/files#r1535982390
   
   My replies were not submitted , sorry about that.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2017723213

   > Before this PR, will Spark write any temp files to the final shuffle file directory?
   
   In most cases, spark will manage its internal files as block files, including `ShuffleDataBlockId`, `ShuffleIndexBlockId`, `TempShuffleBlockId` and some other blocks. These block files are located by `hash(block name)` and the full block file path will be `localDirs(hash % localDirs.length)` / `(hash / localDirs.length) % subDirsPerLocalDir` / filename
   
   If the block files have the same hash code, then will be in the same directory.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2018403978

   To confirm: is the goal to write the last spill file to the same directory as the final shuffle file, so that the following `transfer` operation can be cheap?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2019855591

   I think a more general approach is, `DiskBlockManager#createTempShuffleBlock` should co-locate temp shuffle files and final shuffle files. This benefits more than one spill files as well.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1535864859


##########
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java:
##########
@@ -219,7 +221,15 @@ void closeAndWriteOutput() throws IOException {
     updatePeakMemoryUsed();
     serBuffer = null;
     serOutputStream = null;
-    final SpillInfo[] spills = sorter.closeAndGetSpills();
+    Optional<File> finalDataFileDir;
+    if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {
+      File dataFile =
+        new IndexShuffleBlockResolver(sparkConf, blockManager).getDataFile(shuffleId, mapId);

Review Comment:
   Yes



##########
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala:
##########
@@ -226,6 +226,24 @@ private[spark] class DiskBlockManager(
     (blockId, getFile(blockId))
   }
 
+  /** Produces a unique block id and File suitable for storing shuffled intermediate results
+   * in the input directory. */
+  def createTempShuffleBlockInDir(fileDir: File): (TempShuffleBlockId, File) = {
+    var blockId = TempShuffleBlockId(UUID.randomUUID())
+    var tmpFile = new File(fileDir, blockId.name)
+    while (tmpFile.exists()) {
+      blockId = TempShuffleBlockId(UUID.randomUUID())
+      tmpFile = new File(fileDir, blockId.name)

Review Comment:
   Added UT, if `fileDir` is invalid, tmpFile will not exists, exit this loop.
   Generate new block only when the `fileDir` is valid and the first `TempShuffleBlockId` file already created by some other task.



##########
core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala:
##########
@@ -226,6 +226,24 @@ private[spark] class DiskBlockManager(
     (blockId, getFile(blockId))
   }
 
+  /** Produces a unique block id and File suitable for storing shuffled intermediate results
+   * in the input directory. */

Review Comment:
   Fixed



##########
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java:
##########
@@ -219,7 +221,15 @@ void closeAndWriteOutput() throws IOException {
     updatePeakMemoryUsed();
     serBuffer = null;
     serOutputStream = null;
-    final SpillInfo[] spills = sorter.closeAndGetSpills();
+    Optional<File> finalDataFileDir;
+    if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {

Review Comment:
   Sorry, I'm not familiar with the block storage in KubernetesLocalDiskShuffleExecutorComponents, so only handle LocalDiskShuffleExecutorComponents here.
   
   Or should I add a new method `getDataFile()` in trait `ShuffleExecutorComponents` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2015445690

   Thanks. Could you put that into the PR description, @wankunde .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2017078333

   I'm a bit worried about this change, as it changes the assumption of always having two phases during shuffle: first phase only write to temp files, the second phase "commit" it to the final destination in a short time. The shuffle and task scheduling process is quite convoluted in Spark and I can't be 100% sure that this is a safe change.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2020449340

   > I think renaming and reading/writing should be faster if the files are co-located on the same disk?
   
   At these disk speeds, it will be slower if they are on same disk.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2021934964

   Actually, thinking more, what I suggested will not work - since the local directory is chosen based on the hash of the block id - which is different, sigh.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2015444180

   Test `mv` a 2.6 G file on a node which has a high iowait :
   ```
   $ time mv /hadoop/2/yarn/local/yarn-yarn-nodemanager.log.10 /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.10
   
   real	2m19.694s
   user	0m0.029s
   sys	0m6.120s
   $ time mv /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.10 /hadoop/3/yarn/local/yarn-yarn-nodemanager.log.11
   
   real	0m0.003s
   user	0m0.001s
   sys	0m0.001s
   ```
   When we mv file to another disk which has a high iowait , it will be quite slow.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1535858230


##########
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java:
##########
@@ -219,7 +221,15 @@ void closeAndWriteOutput() throws IOException {
     updatePeakMemoryUsed();
     serBuffer = null;
     serOutputStream = null;
-    final SpillInfo[] spills = sorter.closeAndGetSpills();
+    Optional<File> finalDataFileDir;
+    if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {
+      File dataFile =
+        new IndexShuffleBlockResolver(sparkConf, blockManager).getDataFile(shuffleId, mapId);

Review Comment:
   Is this only used to invoke `getParentFile`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1535851898


##########
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java:
##########
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more details.
     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();
+      finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
+        .orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);

Review Comment:
   This is the main change, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1535982390


##########
core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java:
##########
@@ -219,7 +221,15 @@ void closeAndWriteOutput() throws IOException {
     updatePeakMemoryUsed();
     serBuffer = null;
     serOutputStream = null;
-    final SpillInfo[] spills = sorter.closeAndGetSpills();
+    Optional<File> finalDataFileDir;
+    if (shuffleExecutorComponents instanceof LocalDiskShuffleExecutorComponents) {

Review Comment:
   Hmm, it looks a bit hacky to handle local disk shuffle specially here. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2019761838

   > To confirm: is the goal to write the last spill file to the same directory as the final shuffle file, so that the following `transfer` operation can be cheap?
   
   Yes


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2016767129

   UT `org.apache.spark.util.collection.SorterSuite` failed:
   ```
   2024-03-23T03:43:57.7101735Z [info] - SPARK-5984 TimSort bug (32 seconds, 966 milliseconds)
   2024-03-23T03:43:58.5733341Z [info] org.apache.spark.util.collection.SorterSuite *** ABORTED *** (33 seconds, 954 milliseconds)
   2024-03-23T03:43:58.5736160Z [info]   java.lang.OutOfMemoryError: Java heap space
   2024-03-23T03:43:58.5739209Z [info]   at org.apache.spark.util.collection.SorterSuite.$anonfun$new$11(SorterSuite.scala:145)
   2024-03-23T03:43:58.5747591Z [info]   at org.apache.spark.util.collection.SorterSuite$$Lambda$17217/0x00007f618e8d2610.apply$mcV$sp(Unknown Source)
   2024-03-23T03:43:58.5751054Z [info]   at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.scala:18)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1540284614


##########
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java:
##########
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more details.
     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();

Review Comment:
   Do you mean change the parameters of `createTempShuffleBlockInDir` from **finalDataFileDir** to **Tuple2<ShuffleId, MapId>** ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1539281125


##########
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java:
##########
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more details.
     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();

Review Comment:
   one idea: if `isFinalFile` is true, then we call a special version of `createTempShuffleBlock` that takes shuffle & map id, and returns a file path under the same directory of the final shuffle file. Then we don't need to change other places?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1540582339


##########
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java:
##########
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more details.
     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();
+      finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
+        .orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);

Review Comment:
   Commit this change, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2016767157

   Retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2017376161

   Before this PR, will Spark write any temp files to the final shuffle file directory?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2017108745

   > I'm a bit worried about this change, as it changes the assumption of always having two phases during shuffle: first phase only write to temp files, the second phase "commit" it to the final destination in a short time. The shuffle and task scheduling process is quite convoluted in Spark and I can't be 100% sure that this is a safe change.
   
   Hi, @cloud-fan , the shuffle in this PR still need two phases during shuffle:
   * Stage 1 will write to a TempShuffleBlockId file in the final shuffle data file directory, and will make sure no conflict.
   * Stage 2 is "commit phase", and will just rename the TempShuffleBlockId file to the final shuffle data file if only one shuffle spill file.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [WIP][SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #45661:
URL: https://github.com/apache/spark/pull/45661#discussion_r1535851898


##########
core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java:
##########
@@ -198,7 +201,8 @@ private void writeSortedFile(boolean isFinalFile) {
     // spark.shuffle.compress instead of spark.shuffle.spill.compress, so we need to use
     // createTempShuffleBlock here; see SPARK-3426 for more details.
     final Tuple2<TempShuffleBlockId, File> spilledFileInfo =
-      blockManager.diskBlockManager().createTempShuffleBlock();
+      finalDataFileDir.map(blockManager.diskBlockManager()::createTempShuffleBlockInDir)
+        .orElseGet(blockManager.diskBlockManager()::createTempShuffleBlock);

Review Comment:
   This is the main change, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "ulysses-you (via GitHub)" <gi...@apache.org>.
ulysses-you commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2017116015

   > Test mv a 2.6 G file on a node which has a high iowait :
   
   @wankunde  what kind of disk did you test ? Have you tried ssd ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2017129724

   > > Test mv a 2.6 G file on a node which has a high iowait :
   > 
   > @wankunde what kind of disk did you test ? Have you tried ssd ?
   
   This test is on HDD disks, copy 2.6 file on NVMe disks is 10x times faster than HDD.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2019992484

   > I think a more general approach is, `DiskBlockManager#createTempShuffleBlock` should co-locate temp shuffle files and final shuffle files (with the same shuffle id and map id). This benefits more than one spill files as well.
   
   Spark will rename the TempShuffleBlock file to the final data file when there is only one TempShuffleBlock file.
   
   https://github.com/apache/spark/blob/0bbef2090680a7bc2d5a1d8a959ea94a6445291f/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java#L277-L287
   
   If there are multiple TempShuffleBlock files, spark will always read all the shuffle data into the final shuffle data file.
   At this time, the workload of reading and writing is unavoidable.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2021723677

   > `ExecutorDiskUtils.getFilePath` and `DiskBlockManager.getFile` determines the layout of the files ... and given this is specific to an optimization in `LocalDiskShuffleExecutorComponents` (which determines the output file), why not simply modify `LocalDiskSingleSpillMapOutputWriter.transferMapSpillFile` to leverage this layout information and 'host' the final output in the same disk ?
   > 
   > `KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore` for example, does something similar as well (using the layout info to recover - not this specific layout).
   > 
   > ```
   > 
   >   public void transferMapSpillFile(
   >       File mapSpillFile,
   >       long[] partitionLengths,
   >       long[] checksums) throws IOException {
   >     // The map spill file already has the proper format, and it contains all of the partition data.
   >     // So just transfer it directly to the destination without any merging.
   >     File parent = ExecutorDiskUtils.getLocalDir(mapSpillFile);
   >     File outputFile = blockResolver.getDataFile(shuffleId, mapId, Some(Array(parent.getAbsolutePath)));
   >     File tempFile = Utils.tempFileWith(outputFile);
   >     Files.move(mapSpillFile.toPath(), tempFile.toPath());
   >     blockResolver
   >       .writeMetadataFileAndCommit(shuffleId, mapId, partitionLengths, checksums, tempFile);
   >   }
   > ```
   > 
   > This is only going to marginally alleviate the issues though, the slow disks are the RC here - and their impact will manifest in a lot of other ways as well. (We built push based shuffle when we started seeing very high disk issues of this sort - though unfortunately it is only supported in yarn right now)
   
   Thanks @mridulm
   Do you mean to change the location of the final shuffle data file? I prefer keep the current shuffle files layout, the executors write shuffle files, and then the external shuffle service can read them without any additional work. 
   `KubernetesLocalDiskShuffleExecutorComponents.recoverDiskStore(sparkConf, blockManager)` will perform a heavy disk scan in only `KubernetesLocalDiskShuffleExecutorComponents.initializeExecutor()` method, the internal layout of shuffle data files is the same as in yarn.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47518][CORE] Skip transfer the last spilled shuffle data [spark]

Posted by "wankunde (via GitHub)" <gi...@apache.org>.
wankunde commented on PR #45661:
URL: https://github.com/apache/spark/pull/45661#issuecomment-2020081597

   mima check failed:
   ```
   [error] spark-core: Failed binary compatibility check against org.apache.spark:spark-core_2.13:3.5.0! Found 1 potential problems (filtered 3976)
   [error]  * abstract method getFinalDataFile(Int,Long)java.util.Optional in interface org.apache.spark.shuffle.api.ShuffleExecutorComponents is present only in current version
   [error]    filter with: ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.shuffle.api.ShuffleExecutorComponents.getFinalDataFile")
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org