You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/03/31 04:17:41 UTC
[spark] branch master updated: [SPARK-38706][CORE] Use URI in `FallbackStorage.copy`
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 60d0921 [SPARK-38706][CORE] Use URI in `FallbackStorage.copy`
60d0921 is described below
commit 60d09213105f235793f3418d79e6755561a19b15
Author: William Hyun <wi...@apache.org>
AuthorDate: Wed Mar 30 21:15:50 2022 -0700
[SPARK-38706][CORE] Use URI in `FallbackStorage.copy`
### What changes were proposed in this pull request?
This PR aims to use URI in `FallbackStorage.copy` method.
### Why are the changes needed?
Like the case of SPARK-38652, the current fallback feature is broken with `S3A` due to Hadoop 3.3.2's `org.apache.hadoop.fs.PathIOException`.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Manually start one master and executor and decommission the executor.
```
spark.decommission.enabled true
spark.storage.decommission.enabled true
spark.storage.decommission.shuffleBlocks.enabled true
spark.storage.decommission.fallbackStorage.path s3a://spark/storage/
```
```
$ curl -v -X POST -d "host=hostname" http://hostname:8080/workers/kill/
```
Closes #36017 from williamhyun/fallbackstorage.
Authored-by: William Hyun <wi...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
index 0c1206c..e644ffe 100644
--- a/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
+++ b/core/src/main/scala/org/apache/spark/storage/FallbackStorage.scala
@@ -63,14 +63,14 @@ private[storage] class FallbackStorage(conf: SparkConf) extends Logging {
if (indexFile.exists()) {
val hash = JavaUtils.nonNegativeHash(indexFile.getName)
fallbackFileSystem.copyFromLocalFile(
- new Path(indexFile.getAbsolutePath),
+ new Path(Utils.resolveURI(indexFile.getAbsolutePath)),
new Path(fallbackPath, s"$appId/$shuffleId/$hash/${indexFile.getName}"))
val dataFile = r.getDataFile(shuffleId, mapId)
if (dataFile.exists()) {
val hash = JavaUtils.nonNegativeHash(dataFile.getName)
fallbackFileSystem.copyFromLocalFile(
- new Path(dataFile.getAbsolutePath),
+ new Path(Utils.resolveURI(dataFile.getAbsolutePath)),
new Path(fallbackPath, s"$appId/$shuffleId/$hash/${dataFile.getName}"))
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org