You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2021/04/28 15:49:12 UTC

[GitHub] [spark] Ngone51 opened a new pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Ngone51 opened a new pull request #32385:
URL: https://github.com/apache/spark/pull/32385


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error message, please read the guideline first:
        https://spark.apache.org/error-message-guidelines.html
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   This PR proposes to add checksum support for shuffle blocks. The basic idea is: 
   
   On the mapper side, we'll wrap a `CheckedOutputStream` upon the `FileOutputStream` to calculate the checksum (use the same checksum calculator `Adler32` with broadcast) for each shuffle block (a.k.a partition) at the same time when we writing map output files.  And similar to the index file, we'll have a checksum file to save these checksums.
   
   On the reducer side, we'll also wrap a `CheckedInputStream` upon the `FileInputStream` to read the block.  When block corruption is detected, we'll try to diagnose corruption for the cause:
   
   First, we'll use the `CheckedInputStream` to consume the remaining data of the corrupted block to calculate the checksum (`c1`);
   
   Second, the reducer send an RPC request called `DiagnoseCorruption` (which contains `c1`) to the server (where the reducer executed)
   
   Third, the server will read (using a very small memory) the corresponding block back from the disk and calculate the checksum (`c2`) again for it. And also read back the checksum(`c3`) of the block saved in the checksum file. Then, if `c2 != c3`, we'll suspect the corruption is caused by the disk issue. Otherwise, if `c1 != c3`, we'll suspect the corruption is caused by the network issue. Otherwise, the cause remains unknown. The server then will reply to the reducer with `CorruptionCause` containing the cause.
   
   Fourth, the reducer needs to take action after it receives the cause. If it's a disk issue or unknown, it will throw fetch failure directly. If it's a network issue, it will re-fetch the block later. Also note that, if the corruption happens inside `BufferReleasingInputStream`, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs.  If corruption happens again after retry, the reducer will throw the fetch failure directly this time without the diagnosis.
   
   Overall, I think we don't introduce severe overhead with this proposal. In a normal case, the checksum is calculated in the streaming way as well as other streams, e.g., encryption, compression. And the major overhead here is that we need an extra data file traverse in the error case in order to calculate the checksum (`c2`).
   
   And the proposal in this PR is much simpler compared to the previous one https://github.com/apache/spark/pull/15894/ (abandoned due to complexity ), which introduce more overhead as it need to traverse the data file twice for every block. In that proposal, the checksum is appended to each block data, so it's also invasive to the existing code.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Shuffle data corruption is a long-standing issue in Spark. For example, in SPARK-18105, people continually reports corruption issue. However, data corruption is difficult to reproduce in most cases and even harder to tell the root cause. We don't know if it's a Spark issue or not.  With the checksum support for the shuffle, Spark itself can at least distinguish the cause between disk and network, which is very important for users.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   Yes.  
   
   1) Added a conf `spark.shuffle.checksum` to let user enables/disables the checksum (enabled by default)
   2) With checksum enabled, users can know the possible cause of corruption rather than "Stream is corrupted" only.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   
   Added an end-to-end unit test in `ShuffleSuite`.
   
   I'll add more tests if the community accepts the proposal.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828569773


   cc @mridulm @otterc  @attilapiros @tgravescs @cloud-fan Please take a look, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 edited a comment on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
Ngone51 edited a comment on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-856938701


   Hi @tgravescs @mridulm @otterc , I have resolved the regression issue (verified by running the TPCDS benchmark with 3tb data internally) and made the checksum as a built-in feature of Spark. 
   
   And I have updated PR #32401 (which adds checksum support at shuffle writer side only) and I think it's ready for review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828598847


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138049/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
pan3793 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-1053571929


   Hi @Ngone51, thanks for providing the checksum feature for shuffle, have a thought about the following case.
   
   > if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs.
   
   I think the checksum could handle this case. Pick the idea from #28525, we can consume the input stream at the beginning and verify the checksum immediately, then
   
   > If it's a disk issue or unknown, it will throw fetch failure directly. If it's a network issue, it will re-fetch the block later. 
   
   To achieve this, we may need to update the shuffle network protocol to support passing checksums when fetching shuffle blocks.
   
   Compare to the existing `Utils.copyStreamUpTo(input, maxBytesInFlight / 3)`, this approach use less memory and can verify any size of blocks, but it introduce another overhead because it need to read the data 2 times.
   
   We encounter this issues in our production everyday, for some jobs, the performance overhead is acceptable comparing to stability.
   
   @Ngone51 WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-861274556


   oh..@mridulm Sorry if I confused you here. I have planed to split this PR into two separate PRs to ease the review:
   
   * write checksum file (Ready to review https://github.com/apache/spark/pull/32401)
   * diagnose corruption (Not done yet)
   
   So please help review the smaller PR there.
   
   And I'll try to resolve your comments in the separate PRs. Thanks!
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-856938701


   
   Hi @tgravescs @mridulm @otterc , I have resolved the regression issue and made the checksum as a built-in feature of Spark. 
   
   And I have updated PR #32401 (which adds checksum support at shuffle writer side only) and I think it's ready for review. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828764868


   +CC @otterc This should be of interest given recent discussions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] pan3793 edited a comment on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
pan3793 edited a comment on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-1053571929


   Hi @Ngone51, thanks for providing the checksum feature for shuffle, have a thought about the following case.
   
   > if the corruption happens inside BufferReleasingInputStream, the reducer will throw the fetch failure immediately no matter what the cause is since the data has been partially consumed by downstream RDDs.
   
   I think the checksum could handle this case. Pick the idea from #28525, we can consume the input stream inside `ShuffleBlockFetcherIterator` and verify the checksum immediately, then
   
   > If it's a disk issue or unknown, it will throw fetch failure directly. If it's a network issue, it will re-fetch the block later. 
   
   To achieve this, we may need to update the shuffle network protocol to support passing checksums when fetching shuffle blocks.
   
   Compare to the existing `Utils.copyStreamUpTo(input, maxBytesInFlight / 3)`, this approach use less memory and can verify any size of blocks, but it introduce another overhead because it need to read the data 2 times.
   
   We encounter this issues in our production everyday, for some jobs, the performance overhead is acceptable comparing to stability.
   
   @Ngone51 WDYT?
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828567443


   **[Test build #138049 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138049/testReport)** for PR 32385 at commit [`c09ccd0`](https://github.com/apache/spark/commit/c09ccd0e63f4c569c55de93c6f57a65a91a16c41).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-857698644


   thanks @Ngone51, I'm very busy this week, so will take a look early next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-862024269


   lol, thanks for the links @Ngone51  :-)
   Glad I went through this once more anyway - will help me with better understanding of the sub-pr's !
   Will wait for the update before taking a look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828568544


   I marked PR as `WIP` because I want to hear the community's feedback before working further, e.g., adding more unit tests.
   And there will be two following PRs (if the community accept this proposal):
   
   1) Corruption diagnosis support for the batch fetched block
   
   2) Corruption diagnosis implementation for the external shuffle service


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828606569


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42568/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] closed pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
github-actions[bot] closed pull request #32385:
URL: https://github.com/apache/spark/pull/32385


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-857796097


   Sure, take your time :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm edited a comment on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
mridulm edited a comment on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-862024269


   lol, thanks for the links @Ngone51  :-)
   Glad I went through this once more anyway - will help me with better understanding of the sub-pr's !
   Will wait for the update before taking a look at #32401.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828598847


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/138049/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828577751


   **[Test build #138049 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138049/testReport)** for PR 32385 at commit [`c09ccd0`](https://github.com/apache/spark/commit/c09ccd0e63f4c569c55de93c6f57a65a91a16c41).
    * This patch **fails MiMa tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-829384725


   Hi all, to ease the review for everyone, I have planned to split these PRs into 2 smaller PR first:
   
   1) calculate checksums and save them into a checksum file
   
   (I have created the PR (https://github.com/apache/spark/pull/32401) for this part so you can start reviewing from there.)
   
   2) use the checksum file to diagnose the corruption.
   
   (I'll create the PR later)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] otterc commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
otterc commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828874582


   Thanks for copying me. I will take a look at it in few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828606525






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] tgravescs commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
tgravescs commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-829455864


   I haven't had time to look in detail, if this is a working prototype, have you done any performance measurements to see example impact this has?  I know you said it should be minimal but some numbers would be nice.
   
   Also at the high level how does this affect other shuffle work going on - like merging and pluggable?  Is it independent of that or would need to be reimplemented?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
mridulm commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-862024269


   lol, thanks for the links @Ngone51  :-)
   Glad I went through this once more anyway - will help me with better understanding of the sub-pr's !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] github-actions[bot] commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-926977283


   We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a change in pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
mridulm commented on a change in pull request #32385:
URL: https://github.com/apache/spark/pull/32385#discussion_r638118918



##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/DiagnoseCorruption.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/** Request to get the cause of a corrupted block. Returns {@link CorruptionCause} */
+public class DiagnoseCorruption extends BlockTransferMessage {
+    private final String appId;
+    private final String execId;
+    public final String blockId;
+    public final long checksum;
+
+    public DiagnoseCorruption(String appId, String execId, String blockId, long checksum) {
+      this.appId = appId;
+      this.execId = execId;
+      this.blockId = blockId;
+      this.checksum = checksum;
+    }
+
+    @Override
+    protected Type type() {
+      return Type.DIAGNOSE_CORRUPTION;
+    }
+
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+        .append("appId", appId)
+        .append("execId", execId)
+        .append("blockId", blockId)
+        .append("checksum", checksum)
+        .toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      DiagnoseCorruption that = (DiagnoseCorruption) o;
+
+      if (!appId.equals(that.appId)) return false;
+      if (!execId.equals(that.execId)) return false;
+      if (!blockId.equals(that.blockId)) return false;
+      return checksum == that.checksum;

Review comment:
       super nit: check `checksum` first ? cheapest check ..

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
##########
@@ -57,11 +58,15 @@
   private long currChannelPosition;
   private long bytesWrittenToMergedFile = 0L;
 
+  private Checksum checksumCal = null;
+  private long[] partitionChecksums = new long[0];
+
   private final File outputFile;
   private File outputTempFile;
   private FileOutputStream outputFileStream;
-  private FileChannel outputFileChannel;
+  private CountingWritableChannel outputChannel;
   private BufferedOutputStream outputBufferedFileStream;
+  private CheckedOutputStream checkedOutputStream;

Review comment:
       You dont need a reference to this or to `outputFileStream`, they can be removed.

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
##########
@@ -57,11 +58,15 @@
   private long currChannelPosition;
   private long bytesWrittenToMergedFile = 0L;
 
+  private Checksum checksumCal = null;

Review comment:
       nit: `checksumCal` -> `checksumAlgo` or `checksumImpl` ?

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskStore.scala
##########
@@ -328,23 +329,3 @@ private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize:
 
   override def deallocate(): Unit = source.close()
 }
-
-private class CountingWritableChannel(sink: WritableByteChannel) extends WritableByteChannel {
-
-  private var count = 0L
-
-  def getCount: Long = count
-
-  override def write(src: ByteBuffer): Int = {
-    val written = sink.write(src)
-    if (written > 0) {
-      count += written
-    }
-    written
-  }
-
-  override def isOpen(): Boolean = sink.isOpen()
-
-  override def close(): Unit = sink.close()
-
-}

Review comment:
       Nice unification !

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/DiagnoseCorruption.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.protocol.Encoders;
+
+/** Request to get the cause of a corrupted block. Returns {@link CorruptionCause} */
+public class DiagnoseCorruption extends BlockTransferMessage {
+    private final String appId;
+    private final String execId;
+    public final String blockId;
+    public final long checksum;
+
+    public DiagnoseCorruption(String appId, String execId, String blockId, long checksum) {
+      this.appId = appId;
+      this.execId = execId;
+      this.blockId = blockId;
+      this.checksum = checksum;
+    }
+
+    @Override
+    protected Type type() {
+      return Type.DIAGNOSE_CORRUPTION;
+    }
+
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+        .append("appId", appId)
+        .append("execId", execId)
+        .append("blockId", blockId)
+        .append("checksum", checksum)
+        .toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      DiagnoseCorruption that = (DiagnoseCorruption) o;
+
+      if (!appId.equals(that.appId)) return false;
+      if (!execId.equals(that.execId)) return false;
+      if (!blockId.equals(that.blockId)) return false;
+      return checksum == that.checksum;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = appId.hashCode();
+      result = 31 * result + execId.hashCode();
+      result = 31 * result + blockId.hashCode();
+      result = 31 * result + (int) checksum;

Review comment:
       nit: checksum -> `Long.hashCode(checksum)` ?

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/BypassMergeSortShuffleWriter.java
##########
@@ -21,6 +21,7 @@
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;

Review comment:
       Revert ?

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/protocol/CorruptionCause.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.shuffle.protocol;
+
+import io.netty.buffer.ByteBuf;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.commons.lang3.builder.ToStringStyle;
+import org.apache.spark.network.corruption.Cause;
+
+/** Response to the {@link DiagnoseCorruption} */
+public class CorruptionCause extends BlockTransferMessage {
+    public Cause cause;
+
+    public CorruptionCause(Cause cause) {
+      this.cause = cause;
+    }
+
+    @Override
+    protected Type type() {
+      return Type.CORRUPTION_CAUSE;
+    }
+
+    @Override
+    public String toString() {
+      return new ToStringBuilder(this, ToStringStyle.SHORT_PREFIX_STYLE)
+        .append("cause", cause)
+        .toString();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      CorruptionCause that = (CorruptionCause) o;
+      return cause == that.cause;
+    }
+
+    @Override
+    public int hashCode() {
+      return cause.hashCode();
+    }
+
+    @Override
+    public int encodedLength() {
+      return 4; /* encoded length of cause */
+    }
+
+    @Override
+    public void encode(ByteBuf buf) {
+      buf.writeInt(cause.ordinal());

Review comment:
       `int` -> `byte` ?

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
##########
@@ -100,12 +110,12 @@ public ShufflePartitionWriter getPartitionWriter(int reducePartitionId) throws I
   @Override
   public MapOutputCommitMessage commitAllPartitions() throws IOException {
     // Check the position after transferTo loop to see if it is in the right position and raise a
-    // exception if it is incorrect. The position will not be increased to the expected length
+    // exception if it is incorrect. The po sition will not be increased to the expected length

Review comment:
       Revert this ? Looks like an accidental change.

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
##########
@@ -243,6 +265,10 @@ public void close() {
       isClosed = true;
       partitionLengths[partitionId] = count;
       bytesWrittenToMergedFile += count;
+      if (checksumCal != null) {
+        partitionChecksums[partitionId] = checksumCal.getValue();
+        checksumCal.reset();
+      }

Review comment:
       nit: pull it into `saveChecksum(partitionId)` ?

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
##########
@@ -131,28 +141,40 @@ private void cleanUp() throws IOException {
     if (outputBufferedFileStream != null) {
       outputBufferedFileStream.close();
     }
-    if (outputFileChannel != null) {
-      outputFileChannel.close();
+    if (outputChannel != null) {
+      outputChannel.close();
+    }
+    if (checkedOutputStream != null) {
+      checkedOutputStream.close();
     }
     if (outputFileStream != null) {
       outputFileStream.close();
     }
+    if (checksumCal != null) {
+      checksumCal.reset();
+    }
   }
 
   private void initStream() throws IOException {
     if (outputFileStream == null) {
       outputFileStream = new FileOutputStream(outputTempFile, true);
     }
+    if (checksumCal != null && checkedOutputStream == null) {
+      checkedOutputStream = new CheckedOutputStream(outputFileStream, checksumCal);
+    }
     if (outputBufferedFileStream == null) {
-      outputBufferedFileStream = new BufferedOutputStream(outputFileStream, bufferSize);
+      outputBufferedFileStream = new BufferedOutputStream(
+        checksumCal != null ? checkedOutputStream : outputFileStream, bufferSize);
     }

Review comment:
       If `outputFileStream` is `null`, we should reset `checkedOutputStream` and `outputBufferedFileStream` - not sure why we had individual `null` checks earlier.
   
   Also, need these to be reset to `null` in `cleanUp` ...
   Btw, if we remove references to these streams as I suggested above, it will make the `initChannel`/`initStream`/`cleanUp` simpler as well (and also fix this comment).
   
   Note: these are not specifically due to this PR, but get added to here.
   

##########
File path: common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/BlockStoreClient.java
##########
@@ -47,6 +48,15 @@
   protected volatile TransportClientFactory clientFactory;
   protected String appId;
 
+  public Cause diagnoseCorruption(

Review comment:
       Include javadoc ?

##########
File path: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
##########
@@ -1762,7 +1762,7 @@ private[spark] class DAGScheduler(
           }
 
           if (shouldAbortStage) {
-            val abortMessage = if (disallowStageRetryForTest) {
+            val abortMessage = if (false) {

Review comment:
       revert ?

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/SpillInfo.java
##########
@@ -26,12 +26,12 @@
  */
 final class SpillInfo {
   final long[] partitionLengths;
+  final long[] partitionChecksums;
   final File file;
-  final TempShuffleBlockId blockId;
 
-  SpillInfo(int numPartitions, File file, TempShuffleBlockId blockId) {
+  SpillInfo(int numPartitions, File file, boolean checksumEnabled) {
     this.partitionLengths = new long[numPartitions];
+    this.partitionChecksums = checksumEnabled ? new long[numPartitions] : new long[0];

Review comment:
       We are using `null` in `MapOutputCommitMessage ` while empty array here when checksum is disabled.
   Unify to a single idiom ? Given `writeMetadataFileAndCommit` is depending on empty array (based on how it is written up right now), thoughts on using `long[0]` ?
   
   (Btw, use a constant EMPTY_LONG_ARRAY if deciding to using `new long[0]`

##########
File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##########
@@ -104,6 +108,40 @@ private[spark] class NettyBlockTransferService(
     }
   }
 
+  override def diagnoseCorruption(
+      host: String,
+      port: Int,
+      execId: String,
+      blockId: String,
+      checksum: Long): Cause = {
+    // A monitor for the thread to wait on.
+    val result = Promise[Cause]()
+    val client = clientFactory.createClient(host, port)
+    client.sendRpc(new DiagnoseCorruption(appId, execId, blockId, checksum).toByteBuffer,
+      new RpcResponseCallback {
+        override def onSuccess(response: ByteBuffer): Unit = {
+          val cause = BlockTransferMessage.Decoder
+            .fromByteBuffer(response).asInstanceOf[CorruptionCause]
+          result.success(cause.cause)
+        }
+
+        override def onFailure(e: Throwable): Unit = {
+          logger.warn("Failed to get the corruption cause.", e)
+          result.success(Cause.UNKNOWN)
+        }
+    })
+    val timeout = new RpcTimeout(
+      conf.get(Network.NETWORK_TIMEOUT).seconds,
+      Network.NETWORK_TIMEOUT.key)
+    try {
+      timeout.awaitResult(result.future)

Review comment:
       Any implications of making this a sync call where we are blocking the thread ?

##########
File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -174,6 +192,26 @@ private[spark] class IndexShuffleBlockResolver(
     }
   }
 
+  private def getChecksums(checksumFile: File, blockNum: Int): Array[Long] = {
+    if (!checksumFile.exists()) return null
+    val checksums = new ArrayBuffer[Long]
+    // Read the checksums of blocks
+    var in: DataInputStream = null
+    try {
+      in = new DataInputStream(new NioBufferedFileInputStream(checksumFile))
+      while (checksums.size < blockNum) {
+        checksums += in.readLong()
+      }
+    } catch {
+      case _: IOException | _: EOFException =>
+        return null
+    } finally {
+      in.close()
+    }
+

Review comment:
       Something like this might be better ?
   
   ```suggestion
       Try(Utils.tryWithResource(new DataInputStream(new NioBufferedFileInputStream(checksumFile))) {
         in => Array.tabulate(blockNum)(_ => in.readLong())
       }).getOrElse(null)
   ```

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -275,6 +279,45 @@ private[spark] class BlockManager(
 
   override def getLocalDiskDirs: Array[String] = diskBlockManager.localDirsString
 
+  override def diagnoseShuffleBlockCorruption(blockId: BlockId, clientChecksum: Long): Cause = {
+    assert(blockId.isInstanceOf[ShuffleBlockId],
+      s"Corruption diagnosis only supports shuffle block yet, but got $blockId")
+    val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
+    val resolver = shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver]
+    val checksumFile = resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId)
+    val reduceId = shuffleBlock.reduceId
+    if (checksumFile.exists()) {
+      var in: DataInputStream = null
+      try {
+        val channel = Files.newByteChannel(checksumFile.toPath)
+        channel.position(reduceId * 8L)
+        in = new DataInputStream(Channels.newInputStream(channel))
+        val goldenChecksum = in.readLong()

Review comment:
       Extract out a `readChecksum` and `computeChecksum` methods ?
   Btw, tryWithResource { DataInputStream(FileInputStream()).skip(reduceId * 8L).readLong() } would do the trick for readChecksum.

##########
File path: core/src/main/scala/org/apache/spark/storage/BlockManager.scala
##########
@@ -275,6 +279,45 @@ private[spark] class BlockManager(
 
   override def getLocalDiskDirs: Array[String] = diskBlockManager.localDirsString
 
+  override def diagnoseShuffleBlockCorruption(blockId: BlockId, clientChecksum: Long): Cause = {
+    assert(blockId.isInstanceOf[ShuffleBlockId],
+      s"Corruption diagnosis only supports shuffle block yet, but got $blockId")
+    val shuffleBlock = blockId.asInstanceOf[ShuffleBlockId]
+    val resolver = shuffleManager.shuffleBlockResolver.asInstanceOf[IndexShuffleBlockResolver]
+    val checksumFile = resolver.getChecksumFile(shuffleBlock.shuffleId, shuffleBlock.mapId)
+    val reduceId = shuffleBlock.reduceId
+    if (checksumFile.exists()) {
+      var in: DataInputStream = null
+      try {
+        val channel = Files.newByteChannel(checksumFile.toPath)
+        channel.position(reduceId * 8L)
+        in = new DataInputStream(Channels.newInputStream(channel))
+        val goldenChecksum = in.readLong()
+        val blockData = resolver.getBlockData(blockId)
+        val checksumIn = new CheckedInputStream(blockData.createInputStream(), new Adler32)
+        val buffer = new Array[Byte](8192)
+        while (checksumIn.read(buffer, 0, 8192) != -1) {}
+        val recalculatedChecksum = checksumIn.getChecksum.getValue

Review comment:
       We are not closing `checksumIn` btw.

##########
File path: core/src/main/java/org/apache/spark/shuffle/sort/io/LocalDiskShuffleMapOutputWriter.java
##########
@@ -76,6 +81,11 @@ public LocalDiskShuffleMapOutputWriter(
       (int) (long) sparkConf.get(
         package$.MODULE$.SHUFFLE_UNSAFE_FILE_OUTPUT_BUFFER_SIZE()) * 1024;
     this.partitionLengths = new long[numPartitions];
+    boolean checksumEnabled = (boolean) sparkConf.get(package$.MODULE$.SHUFFLE_CHECKSUM());
+    if (checksumEnabled) {
+      this.checksumCal = new Adler32();

Review comment:
       Pull the `Checksum` management out and get everyone to depend on that ? Will also allow us to change it in future if required.

##########
File path: core/src/main/scala/org/apache/spark/storage/DiskBlockObjectWriter.scala
##########
@@ -76,6 +77,9 @@ private[spark] class DiskBlockObjectWriter(
   private var initialized = false
   private var streamOpen = false
   private var hasBeenClosed = false
+  private var checksumEnabled = false
+  private var checksumCal: Checksum = null
+  private var checksumOutputStream: CheckedOutputStream = null

Review comment:
       Same comment as above - reduce the number of streaming as fields ?

##########
File path: core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala
##########
@@ -104,6 +108,40 @@ private[spark] class NettyBlockTransferService(
     }
   }
 
+  override def diagnoseCorruption(
+      host: String,
+      port: Int,
+      execId: String,
+      blockId: String,
+      checksum: Long): Cause = {
+    // A monitor for the thread to wait on.
+    val result = Promise[Cause]()
+    val client = clientFactory.createClient(host, port)
+    client.sendRpc(new DiagnoseCorruption(appId, execId, blockId, checksum).toByteBuffer,
+      new RpcResponseCallback {
+        override def onSuccess(response: ByteBuffer): Unit = {
+          val cause = BlockTransferMessage.Decoder
+            .fromByteBuffer(response).asInstanceOf[CorruptionCause]
+          result.success(cause.cause)
+        }
+
+        override def onFailure(e: Throwable): Unit = {
+          logger.warn("Failed to get the corruption cause.", e)
+          result.success(Cause.UNKNOWN)
+        }
+    })
+    val timeout = new RpcTimeout(
+      conf.get(Network.NETWORK_TIMEOUT).seconds,

Review comment:
       `seconds` -> `millis` ?

##########
File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -333,13 +394,40 @@ private[spark] class IndexShuffleBlockResolver(
           if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
             throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
           }
+
+          checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, checksumFile) =>
+            val out = new DataOutputStream(
+              new BufferedOutputStream(
+                new FileOutputStream(checksumTmp)
+              )
+            )
+            Utils.tryWithSafeFinally {
+              checksums.foreach(out.writeLong)
+            } {
+              out.close()
+            }
+
+            if (checksumFile.exists()) {
+              checksumFile.delete()
+            }
+            if (!checksumTmp.renameTo(checksumFile)) {
+              // It's not worthwhile to fail here after index file and data file are already
+              // successfully stored due to checksum is only used for the corner error case.
+              logWarning("fail to rename file " + checksumTmp + " to " + checksumFile)
+            }
+          }
         }
       }
     } finally {
       logDebug(s"Shuffle index for mapId $mapId: ${lengths.mkString("[", ",", "]")}")
       if (indexTmp.exists() && !indexTmp.delete()) {
         logError(s"Failed to delete temporary index file at ${indexTmp.getAbsolutePath}")
       }
+      checksumTmpOpt.foreach { checksumTmp =>
+        if (checksumTmp.exists() && !checksumTmp.delete()) {
+          logError(s"Failed to delete temporary checksum file at ${checksumTmp.getAbsolutePath}")

Review comment:
       `logInfo` ?

##########
File path: common/network-common/src/main/java/org/apache/spark/network/corruption/Cause.java
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.network.corruption;
+
+public enum Cause {
+    DISK, NETWORK, UNKNOWN;

Review comment:
       `UNKNOWN` is handling three cases right now:
   * No checksum available for validation.
   * diagnosis failed due to some reason (timeout/failure/etc).
   * Checksum matches, no corruption detected.
   
   Anything else ?
   For the last, move it to a separate `Cause` ?

##########
File path: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala
##########
@@ -333,13 +394,40 @@ private[spark] class IndexShuffleBlockResolver(
           if (dataTmp != null && dataTmp.exists() && !dataTmp.renameTo(dataFile)) {
             throw new IOException("fail to rename file " + dataTmp + " to " + dataFile)
           }
+
+          checksumTmpOpt.zip(checksumFileOpt).foreach { case (checksumTmp, checksumFile) =>
+            val out = new DataOutputStream(
+              new BufferedOutputStream(
+                new FileOutputStream(checksumTmp)
+              )
+            )
+            Utils.tryWithSafeFinally {
+              checksums.foreach(out.writeLong)
+            } {
+              out.close()
+            }
+
+            if (checksumFile.exists()) {
+              checksumFile.delete()
+            }
+            if (!checksumTmp.renameTo(checksumFile)) {
+              // It's not worthwhile to fail here after index file and data file are already
+              // successfully stored due to checksum is only used for the corner error case.
+              logWarning("fail to rename file " + checksumTmp + " to " + checksumFile)
+            }
+          }

Review comment:
       ```suggestion
             checksumFileOpt.foreach { checksumFile =>
               val checksumTmp = checksumTmpOpt.get
               Utils.tryWithResource(new DataOutputStream(new BufferedOutputStream(
                 new FileOutputStream(checksumTmp))) { out =>
                   checksums.foreach(out.writeLong)
               })
   
               if (checksumFile.exists()) {
                 checksumFile.delete()
               }
               if (!checksumTmp.renameTo(checksumFile)) {
                 // It's not worthwhile to fail here after index file and data file are already
                 // successfully stored due to checksum is only used for the corner error case.
                 logWarning("fail to rename file " + checksumTmp + " to " + checksumFile)
               }
             }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828606569


   
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder-K8s/42568/
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #32385: [WIP][SPARK-35275][CORE] Add checksum for shuffle blocks and diagnose corruption

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-843722683


   @tgravescs Thanks for the good points!
   
   I did find some perf regression by benchmarking with the change. I'll double-check it for sure and try to get rid of it if possible.
   
   > Also at the high level how does this affect other shuffle work going on - like merging and pluggable? Is it independent of that or would need to be implemented?
   
   For merging, it needs extension to send checksum values along with the block data while merging. The extension is also needed for the decommission feature. 
   
   For pluggable, my current implementation is added at `LocalDiskShuffleMapOutputWriter`, which is supposed to be the default shuffle writer plugin for Spark. It means, in this way, other custom plugins needs its own implementation for checksum support. I adopted that way becase I realized it's easier  and more clear to implement at that time.
   
   An alternative way to support checksum for all plugins or say to make it a built-in feature maybe is to implement it in `DiskBlockObjectWriter`/`ShufflePartitionPairsWriter`, which is the upstream to the shuffle I/O plugin. I need more investigation on this.
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #32385: [WIP][SPARK-18188][CORE] Add checksum for shuffle blocks

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #32385:
URL: https://github.com/apache/spark/pull/32385#issuecomment-828567443


   **[Test build #138049 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/138049/testReport)** for PR 32385 at commit [`c09ccd0`](https://github.com/apache/spark/commit/c09ccd0e63f4c569c55de93c6f57a65a91a16c41).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org