You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2023/02/09 16:36:29 UTC

[spark] branch master updated: [SPARK-42366][SHUFFLE] Log shuffle data corruption diagnose cause

This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 201a91bab0e [SPARK-42366][SHUFFLE] Log shuffle data corruption diagnose cause
201a91bab0e is described below

commit 201a91bab0e3d540ee262bd73b597b137f3f987b
Author: sychen <sy...@ctrip.com>
AuthorDate: Thu Feb 9 10:36:11 2023 -0600

    [SPARK-42366][SHUFFLE] Log shuffle data corruption diagnose cause
    
    ### What changes were proposed in this pull request?
    Output the cause in the `diagnoseCorruption` method.
    
    ### Why are the changes needed?
    It is convenient to collect the reason of shuffle corruption from the shuffle service Log deployed by YARN Nodemanager.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    exist UT
    
    Closes #39918 from cxzl25/SPARK-42366.
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../shuffle/checksum/ShuffleChecksumHelper.java      | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
index 4071088fe4b..6993be4c430 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
@@ -125,17 +125,18 @@ public class ShuffleChecksumHelper {
       ManagedBuffer partitionData,
       long checksumByReader) {
     Cause cause;
+    long duration = -1L;
+    long checksumByWriter = -1L;
+    long checksumByReCalculation = -1L;
     try {
       long diagnoseStartNs = System.nanoTime();
       // Try to get the checksum instance before reading the checksum file so that
       // `UnsupportedOperationException` can be thrown first before `FileNotFoundException`
       // when the checksum algorithm isn't supported.
       Checksum checksumAlgo = getChecksumByAlgorithm(algorithm);
-      long checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
-      long checksumByReCalculation = calculateChecksumForPartition(partitionData, checksumAlgo);
-      long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - diagnoseStartNs);
-      logger.info("Shuffle corruption diagnosis took {} ms, checksum file {}",
-        duration, checksumFile.getAbsolutePath());
+      checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
+      checksumByReCalculation = calculateChecksumForPartition(partitionData, checksumAlgo);
+      duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - diagnoseStartNs);
       if (checksumByWriter != checksumByReCalculation) {
         cause = Cause.DISK_ISSUE;
       } else if (checksumByWriter != checksumByReader) {
@@ -153,6 +154,15 @@ public class ShuffleChecksumHelper {
       logger.warn("Unable to diagnose shuffle block corruption", e);
       cause = Cause.UNKNOWN_ISSUE;
     }
+    if (logger.isDebugEnabled()) {
+      logger.debug("Shuffle corruption diagnosis took {} ms, checksum file {}, cause {}, " +
+        "checksumByReader {}, checksumByWriter {}, checksumByReCalculation {}",
+        duration, checksumFile.getAbsolutePath(), cause,
+        checksumByReader, checksumByWriter, checksumByReCalculation);
+    } else {
+      logger.info("Shuffle corruption diagnosis took {} ms, checksum file {}, cause {}",
+        duration, checksumFile.getAbsolutePath(), cause);
+    }
     return cause;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org