You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2023/03/06 15:30:21 UTC
[tez] branch master updated: TEZ-4334: Fix deadlock in ShuffleScheduler between ShuffleScheduler.close() and the ShufflePenaltyReferee thread (#273) (Laszlo Bodor, Sungwoo Park, reviewed by Rajesh Balamohan)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/master by this push:
new 25a953677 TEZ-4334: Fix deadlock in ShuffleScheduler between ShuffleScheduler.close() and the ShufflePenaltyReferee thread (#273) (Laszlo Bodor, Sungwoo Park, reviewed by Rajesh Balamohan)
25a953677 is described below
commit 25a9536777af26fdb5aeae866c25524f03ffa997
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Mon Mar 6 16:30:14 2023 +0100
TEZ-4334: Fix deadlock in ShuffleScheduler between ShuffleScheduler.close() and the ShufflePenaltyReferee thread (#273) (Laszlo Bodor, Sungwoo Park, reviewed by Rajesh Balamohan)
---
.../shuffle/orderedgrouped/ShuffleScheduler.java | 59 +++++++++++-----------
1 file changed, 30 insertions(+), 29 deletions(-)
diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 470b04cc5..967f58250 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -759,20 +759,22 @@ class ShuffleScheduler {
}
}
- public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
+ public void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
boolean readError, boolean connectError) {
failedShuffleCounter.increment(1);
inputContext.notifyProgress();
- int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
+ int failures;
- if (!fetchFailure.isLocalFetch()) {
- /**
- * Track the number of failures that has happened since last completion.
- * This gets reset on a successful copy.
- */
- failedShufflesSinceLastCompletion++;
+ synchronized (this) {
+ failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
+ if (!fetchFailure.isLocalFetch()) {
+ /**
+ * Track the number of failures that has happened since last completion.
+ * This gets reset on a successful copy.
+ */
+ failedShufflesSinceLastCompletion++;
+ }
}
-
/**
* Inform AM:
* - In case of read/connect error
@@ -794,14 +796,18 @@ class ShuffleScheduler {
}
//Restart consumer in case shuffle is not healthy
- if (!isShuffleHealthy(fetchFailure)) {
+ try {
+ checkShuffleHealthy(fetchFailure);
+ } catch (IOException e) {
+ // reportException should be called outside synchronized(this) due to TEZ-4334
+ exceptionReporter.reportException(e);
return;
}
penalizeHost(host, failures);
}
- private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
+ private void isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) throws IOException {
int attemptFailures = getFailureCount(srcAttempt);
if (attemptFailures >= abortFailureLimit) {
// This task has seen too many fetch failures - report it as failed. The
@@ -816,15 +822,11 @@ class ShuffleScheduler {
inputContext.getSourceVertexName(),
srcAttempt.getInputIdentifier(),
srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
- IOException ioe = new IOException(errorMsg);
- // Shuffle knows how to deal with failures post shutdown via the onFailure hook
- exceptionReporter.reportException(ioe);
- return true;
+ throw new IOException(errorMsg);
}
- return false;
}
- private void penalizeHost(MapHost host, int failures) {
+ private synchronized void penalizeHost(MapHost host, int failures) {
host.penalize();
HostPort hostPort = new HostPort(host.getHost(), host.getPort());
@@ -1008,14 +1010,15 @@ class ShuffleScheduler {
return fetcherHealthy;
}
- boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
+ /**
+ * This method checks if the current shuffle is healthy and throw IOException if it's not,
+ * then the caller is supposed to handle the IOException.
+ */
+ private synchronized void checkShuffleHealthy(InputAttemptFetchFailure fetchFailure)
+ throws IOException {
InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
- if (isAbortLimitExceeedFor(srcAttempt)) {
- return false;
- }
-
- final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
- final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;
+ // supposed to throw IOException if exceeded
+ isAbortLimitExceeedFor(srcAttempt);
int doneMaps = numInputs - remainingMaps.get();
@@ -1025,7 +1028,7 @@ class ShuffleScheduler {
// check if the reducer has progressed enough
boolean reducerProgressedEnough =
(((float)doneMaps / numInputs)
- >= MIN_REQUIRED_PROGRESS_PERCENT);
+ >= minReqProgressFraction);
// check if the reducer is stalled for a long time
// duration for which the reducer is stalled
@@ -1038,7 +1041,7 @@ class ShuffleScheduler {
boolean reducerStalled = (shuffleProgressDuration > 0) &&
(((float)stallDuration / shuffleProgressDuration)
- >= MAX_ALLOWED_STALL_TIME_PERCENT);
+ >= maxStallTimeFraction);
// kill if not healthy and has insufficient progress
if ((failureCounts.size() >= maxFailedUniqueFetches ||
@@ -1059,10 +1062,8 @@ class ShuffleScheduler {
LOG.debug("Host failures=" + hostFailures.keySet());
}
// Shuffle knows how to deal with failures post shutdown via the onFailure hook
- exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause()));
- return false;
+ throw new IOException(errorMsg, fetchFailure.getCause());
}
- return true;
}
public synchronized void addKnownMapOutput(String inputHostName,