You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ukby1234 (via GitHub)" <gi...@apache.org> on 2023/10/17 21:34:12 UTC

[PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

ukby1234 opened a new pull request, #43409:
URL: https://github.com/apache/spark/pull/43409

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   As documented in the JIRA ticket, FallbackStorage.copy sometimes will throw FileNotFoundException even though we check for file that exists. This will cause the BlockManagerDecommissioner to be stuck in endless loops and prevent executors from exiting. 
   We should ignore any FileNotFoundException in this case, and set keepRunning to false for all other exceptions for retries. 
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   Fix a bug documented in the JIRA ticket
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Tests weren't added due to difficulty to replicate the race condition. 
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363210467


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
+                    case NonFatal(e) =>
+                      logError(s"Fallback storage for $shuffleBlockInfo failed", e)
+                      keepRunning = false
+                  }

Review Comment:
   This is different from the existing `NonFatal` block because it will retry the failed blocks but the existing one is really a catch-all and leave some blocks not retried. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363322495


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
+                    case NonFatal(e) =>
+                      logError(s"Fallback storage for $shuffleBlockInfo failed", e)
+                      keepRunning = false
+                  }

Review Comment:
   There isn't a behavior change. If we remove the added `NonFatal` block, this [section](https://github.com/apache/spark/blob/2ab7aa87c25a0fd9eaa6047f02465130e5b22a18/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala#L171-L185) won't get executed. This means there are shuffle blocks that never trigger `numMigratedShuffles.incrementAndGet()` and the decommissioner will loop forever because the `numMigratedShuffles` is always less than `migratingShuffles`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1769550598

   hmm looks like the SQL test just timed out and I retried a couple times already. cc @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1904534183

   @dongjoon-hyun friendly bump


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363264970


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
   @volatile private var stopped = false
   @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
-  @volatile private var stoppedShuffle =
+  @volatile private[storage] var stoppedShuffle =

Review Comment:
   Let us avoid this practice and keep state private, particularly for volatiles.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "steveloughran (via GitHub)" <gi...@apache.org>.
steveloughran commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1779174663

   @ukby1234 thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1767231771

   Thank you for making a PR, @ukby1234 .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363264970


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
   @volatile private var stopped = false
   @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
-  @volatile private var stoppedShuffle =
+  @volatile private[storage] var stoppedShuffle =

Review Comment:
   Let us avoid this practice and keep state private, particularly for volatiles (unlike bookkeeping info like time, etc which are added for tests).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1767512160

   > Do you think we can have a test coverage here?
   > 
   > https://github.com/apache/spark/blob/f1ae56b152bdf19246d698b65e553790ad54306b/core/src/test/scala/org/apache/spark/storage/FallbackStorageSuite.scala#L43
   
   Added a unit test coverage. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363255523


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
+                    case NonFatal(e) =>
+                      logError(s"Fallback storage for $shuffleBlockInfo failed", e)
+                      keepRunning = false
+                  }

Review Comment:
   It was not clear from the PR description that this behavior was being attempted.
   +CC @dongjoon-hyun as you know this part more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363255523


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
+                    case NonFatal(e) =>
+                      logError(s"Fallback storage for $shuffleBlockInfo failed", e)
+                      keepRunning = false
+                  }

Review Comment:
   It was not clear from the PR description that this behavior change was being made.
   +CC @dongjoon-hyun as you know this part more.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "dongjoon-hyun (via GitHub)" <gi...@apache.org>.
dongjoon-hyun commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363336381


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
+                    case NonFatal(e) =>
+                      logError(s"Fallback storage for $shuffleBlockInfo failed", e)
+                      keepRunning = false
+                  }

Review Comment:
   Is this true?
   > If we remove the added NonFatal block, this [section](https://github.com/apache/spark/blob/2ab7aa87c25a0fd9eaa6047f02465130e5b22a18/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala#L171-L185) won't get executed.
   
   We have line 166, doesn't it?
   https://github.com/apache/spark/blob/2ab7aa87c25a0fd9eaa6047f02465130e5b22a18/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala#L166-L168
   
   Do you think you can provide a test case as the evidence for your claim, @ukby1234 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363368707


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
+                    case NonFatal(e) =>
+                      logError(s"Fallback storage for $shuffleBlockInfo failed", e)
+                      keepRunning = false
+                  }

Review Comment:
   Well this exception is thrown in this [catch block](https://github.com/apache/spark/blob/2ab7aa87c25a0fd9eaa6047f02465130e5b22a18/core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala#L136), so this line 166 won't get executed. 
   And updated tests `"SPARK-45579: abort for other errors"` to show this situation. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363369438


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
   @volatile private var stopped = false
   @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
-  @volatile private var stoppedShuffle =
+  @volatile private[storage] var stoppedShuffle =

Review Comment:
   removed this access as it turns out to be unnecessary for my tests. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1365028437


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
   @volatile private var stopped = false
   @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
-  @volatile private var stoppedShuffle =
+  @volatile private[storage] var stoppedShuffle =

Review Comment:
   Even better !



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "steveloughran (via GitHub)" <gi...@apache.org>.
steveloughran commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1775197510

   1. Does this happen with any fs client other than the s3a one?
   2. Does anyone know why it happens?
   3. There's a pr up to turn off use of the AWS SDK for its uploads, which will switch back to the classic sequential block read/upload algorithm of everything else. Reviews encouraged https://github.com/apache/hadoop/pull/6163


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1775537797

   I think I can answer 2). It seems shuffle blocks are deleted in between the `fs.exists` and `fs.copyFromLocal` calls. From the stack trace linked in the jira ticket, it fails inside the `org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation.checkSource`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363172153


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)

Review Comment:
   ```suggestion
                         logWarning(s"Skipping block $shuffleBlockInfo, block deleted.", e)
   ```



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
   @volatile private var stopped = false
   @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
-  @volatile private var stoppedShuffle =
+  @volatile private[storage] var stoppedShuffle =

Review Comment:
   Instead of exposing the volatile boolean, add a method to expose the read only state.
   
   ```suggestion
     @volatile private var stoppedShuffle =
         !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
         
     private[storage] def isShuffleStopped: Boolean = stoppedShuffle
     
   ```



##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -145,7 +145,15 @@ private[storage] class BlockManagerDecommissioner(
                     // Confirm peer is not the fallback BM ID because fallbackStorage would already
                     // have been used in the try-block above so there's no point trying again
                     && peer != FallbackStorage.FALLBACK_BLOCK_MANAGER_ID) {
-                  fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  try {
+                    fallbackStorage.foreach(_.copy(shuffleBlockInfo, bm))
+                  } catch {
+                    case e: FileNotFoundException =>
+                      logWarning("Skipping block $shuffleBlockInfo, block deleted.", e)
+                    case NonFatal(e) =>
+                      logError(s"Fallback storage for $shuffleBlockInfo failed", e)
+                      keepRunning = false
+                  }

Review Comment:
   Drop this ? The existing `NonFatal` block at the end does this currently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "mridulm (via GitHub)" <gi...@apache.org>.
mridulm commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363176084


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
   @volatile private var stopped = false
   @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
-  @volatile private var stoppedShuffle =
+  @volatile private[storage] var stoppedShuffle =

Review Comment:
   Instead of exposing the volatile boolean, add a method to expose the current value read only.
   
   ```suggestion
     @volatile private var stoppedShuffle =
         !conf.get(config.STORAGE_DECOMMISSION_SHUFFLE_BLOCKS_ENABLED)
         
     private[storage] def isShuffleStopped: Boolean = stoppedShuffle
     
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on code in PR #43409:
URL: https://github.com/apache/spark/pull/43409#discussion_r1363207673


##########
core/src/main/scala/org/apache/spark/storage/BlockManagerDecommissioner.scala:
##########
@@ -203,7 +211,7 @@ private[storage] class BlockManagerDecommissioner(
   @volatile private var stopped = false
   @volatile private[storage] var stoppedRDD =
     !conf.get(config.STORAGE_DECOMMISSION_RDD_BLOCKS_ENABLED)
-  @volatile private var stoppedShuffle =
+  @volatile private[storage] var stoppedShuffle =

Review Comment:
   looks like other people are exposing other volatile variables for unit testing only. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45579][CORE] Catch errors for FallbackStorage.copy [spark]

Posted by "ukby1234 (via GitHub)" <gi...@apache.org>.
ukby1234 commented on PR #43409:
URL: https://github.com/apache/spark/pull/43409#issuecomment-1767685315

   /test


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org