You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/31 04:19:00 UTC

[spark] branch branch-3.3 updated: [SPARK-38706][CORE] Use URI in `FallbackStorage.copy`

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 48839f6  [SPARK-38706][CORE] Use URI in `FallbackStorage.copy`
48839f6 is described below

commit 48839f6cad14a3462c278d1a3c10b35dde1adcc3
Author: William Hyun <wi...@apache.org>
AuthorDate: Wed Mar 30 21:15:50 2022 -0700

    [SPARK-38706][CORE] Use URI in `FallbackStorage.copy`
    
    ### What changes were proposed in this pull request?
    
    This PR aims to use URI in `FallbackStorage.copy` method.
    
    ### Why are the changes needed?
    
    Like the case of SPARK-38652, the current fallback feature is broken with `S3A` due to Hadoop 3.3.2's `org.apache.hadoop.fs.PathIOException`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Manually start one master and executor and decommission the executor.
    
    ```
    spark.decommission.enabled                          true
    spark.storage.decommission.enabled                  true
    spark.storage.decommission.shuffleBlocks.enabled    true
    spark.storage.decommission.fallbackStorage.path     s3a://spark/storage/
    ```
    
    ```
    $ curl -v -X POST -d "host=hostname" http://hostname:8080/workers/kill/
    ```
    
    Closes #36017 from williamhyun/fallbackstorage.
    
    Authored-by: William Hyun <wi...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 60d09213105f235793f3418d79e6755561a19b15)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 0c1206c..e644ffe 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -63,14 +63,14 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
         if (indexFile.exists()) {
           val hash = JavaUtils.nonNegativeHash(indexFile.getName)
           fallbackFileSystem.copyFromLocalFile(
-            new Path(indexFile.getAbsolutePath),
+            new Path(Utils.resolveURI(indexFile.getAbsolutePath)),
             new Path(fallbackPath, s"$appId/$shuffleId/$hash/${indexFile.getName}"))
 
           val dataFile = r.getDataFile(shuffleId, mapId)
           if (dataFile.exists()) {
             val hash = JavaUtils.nonNegativeHash(dataFile.getName)
             fallbackFileSystem.copyFromLocalFile(
-              new Path(dataFile.getAbsolutePath),
+              new Path(Utils.resolveURI(dataFile.getAbsolutePath)),
               new Path(fallbackPath, s"$appId/$shuffleId/$hash/${dataFile.getName}"))
           }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org