You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by mr...@apache.org on 2023/02/09 16:36:29 UTC
[spark] branch master updated: [SPARK-42366][SHUFFLE] Log shuffle data corruption diagnose cause
This is an automated email from the ASF dual-hosted git repository.
mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 201a91bab0e [SPARK-42366][SHUFFLE] Log shuffle data corruption diagnose cause
201a91bab0e is described below
commit 201a91bab0e3d540ee262bd73b597b137f3f987b
Author: sychen <sy...@ctrip.com>
AuthorDate: Thu Feb 9 10:36:11 2023 -0600
[SPARK-42366][SHUFFLE] Log shuffle data corruption diagnose cause
### What changes were proposed in this pull request?
Output the cause in the `diagnoseCorruption` method.
### Why are the changes needed?
It is convenient to collect the reason of shuffle corruption from the shuffle service Log deployed by YARN Nodemanager.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
exist UT
Closes #39918 from cxzl25/SPARK-42366.
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
.../shuffle/checksum/ShuffleChecksumHelper.java | 20 +++++++++++++++-----
1 file changed, 15 insertions(+), 5 deletions(-)
diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
index 4071088fe4b..6993be4c430 100644
--- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
+++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/checksum/ShuffleChecksumHelper.java
@@ -125,17 +125,18 @@ public class ShuffleChecksumHelper {
ManagedBuffer partitionData,
long checksumByReader) {
Cause cause;
+ long duration = -1L;
+ long checksumByWriter = -1L;
+ long checksumByReCalculation = -1L;
try {
long diagnoseStartNs = System.nanoTime();
// Try to get the checksum instance before reading the checksum file so that
// `UnsupportedOperationException` can be thrown first before `FileNotFoundException`
// when the checksum algorithm isn't supported.
Checksum checksumAlgo = getChecksumByAlgorithm(algorithm);
- long checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
- long checksumByReCalculation = calculateChecksumForPartition(partitionData, checksumAlgo);
- long duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - diagnoseStartNs);
- logger.info("Shuffle corruption diagnosis took {} ms, checksum file {}",
- duration, checksumFile.getAbsolutePath());
+ checksumByWriter = readChecksumByReduceId(checksumFile, reduceId);
+ checksumByReCalculation = calculateChecksumForPartition(partitionData, checksumAlgo);
+ duration = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - diagnoseStartNs);
if (checksumByWriter != checksumByReCalculation) {
cause = Cause.DISK_ISSUE;
} else if (checksumByWriter != checksumByReader) {
@@ -153,6 +154,15 @@ public class ShuffleChecksumHelper {
logger.warn("Unable to diagnose shuffle block corruption", e);
cause = Cause.UNKNOWN_ISSUE;
}
+ if (logger.isDebugEnabled()) {
+ logger.debug("Shuffle corruption diagnosis took {} ms, checksum file {}, cause {}, " +
+ "checksumByReader {}, checksumByWriter {}, checksumByReCalculation {}",
+ duration, checksumFile.getAbsolutePath(), cause,
+ checksumByReader, checksumByWriter, checksumByReCalculation);
+ } else {
+ logger.info("Shuffle corruption diagnosis took {} ms, checksum file {}, cause {}",
+ duration, checksumFile.getAbsolutePath(), cause);
+ }
return cause;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org