You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2023/03/06 15:30:21 UTC

[tez] branch master updated: TEZ-4334: Fix deadlock in ShuffleScheduler between ShuffleScheduler.close() and the ShufflePenaltyReferee thread (#273) (Laszlo Bodor, Sungwoo Park, reviewed by Rajesh Balamohan)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 25a953677 TEZ-4334: Fix deadlock in ShuffleScheduler between ShuffleScheduler.close() and the ShufflePenaltyReferee thread (#273) (Laszlo Bodor,  Sungwoo Park, reviewed by Rajesh Balamohan)
25a953677 is described below

commit 25a9536777af26fdb5aeae866c25524f03ffa997
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Mon Mar 6 16:30:14 2023 +0100

    TEZ-4334: Fix deadlock in ShuffleScheduler between ShuffleScheduler.close() and the ShufflePenaltyReferee thread (#273) (Laszlo Bodor,  Sungwoo Park, reviewed by Rajesh Balamohan)
---
 .../shuffle/orderedgrouped/ShuffleScheduler.java   | 59 +++++++++++-----------
 1 file changed, 30 insertions(+), 29 deletions(-)

diff --git a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
index 470b04cc5..967f58250 100644
--- a/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
+++ b/tez-runtime-library/src/main/java/org/apache/tez/runtime/library/common/shuffle/orderedgrouped/ShuffleScheduler.java
@@ -759,20 +759,22 @@ class ShuffleScheduler {
     }
   }
 
-  public synchronized void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
+  public void copyFailed(InputAttemptFetchFailure fetchFailure, MapHost host,
       boolean readError, boolean connectError) {
     failedShuffleCounter.increment(1);
     inputContext.notifyProgress();
-    int failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
+    int failures;
 
-    if (!fetchFailure.isLocalFetch()) {
-      /**
-       * Track the number of failures that has happened since last completion.
-       * This gets reset on a successful copy.
-       */
-      failedShufflesSinceLastCompletion++;
+    synchronized (this) {
+      failures = incrementAndGetFailureAttempt(fetchFailure.getInputAttemptIdentifier());
+      if (!fetchFailure.isLocalFetch()) {
+        /**
+         * Track the number of failures that has happened since last completion.
+         * This gets reset on a successful copy.
+         */
+        failedShufflesSinceLastCompletion++;
+      }
     }
-
     /**
      * Inform AM:
      *    - In case of read/connect error
@@ -794,14 +796,18 @@ class ShuffleScheduler {
     }
 
     //Restart consumer in case shuffle is not healthy
-    if (!isShuffleHealthy(fetchFailure)) {
+    try {
+      checkShuffleHealthy(fetchFailure);
+    } catch (IOException e) {
+      // reportException should be called outside synchronized(this) due to TEZ-4334
+      exceptionReporter.reportException(e);
       return;
     }
 
     penalizeHost(host, failures);
   }
 
-  private boolean isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) {
+  private void isAbortLimitExceeedFor(InputAttemptIdentifier srcAttempt) throws IOException {
     int attemptFailures = getFailureCount(srcAttempt);
     if (attemptFailures >= abortFailureLimit) {
       // This task has seen too many fetch failures - report it as failed. The
@@ -816,15 +822,11 @@ class ShuffleScheduler {
           inputContext.getSourceVertexName(),
           srcAttempt.getInputIdentifier(),
           srcAttempt.getAttemptNumber()) + ". threshold=" + abortFailureLimit;
-      IOException ioe = new IOException(errorMsg);
-      // Shuffle knows how to deal with failures post shutdown via the onFailure hook
-      exceptionReporter.reportException(ioe);
-      return true;
+      throw new IOException(errorMsg);
     }
-    return false;
   }
 
-  private void penalizeHost(MapHost host, int failures) {
+  private synchronized void penalizeHost(MapHost host, int failures) {
     host.penalize();
 
     HostPort hostPort = new HostPort(host.getHost(), host.getPort());
@@ -1008,14 +1010,15 @@ class ShuffleScheduler {
     return fetcherHealthy;
   }
 
-  boolean isShuffleHealthy(InputAttemptFetchFailure fetchFailure) {
+  /**
+   * This method checks if the current shuffle is healthy and throw IOException if it's not,
+   * then the caller is supposed to handle the IOException.
+   */
+  private synchronized void checkShuffleHealthy(InputAttemptFetchFailure fetchFailure)
+      throws IOException {
     InputAttemptIdentifier srcAttempt = fetchFailure.getInputAttemptIdentifier();
-    if (isAbortLimitExceeedFor(srcAttempt)) {
-      return false;
-    }
-
-    final float MIN_REQUIRED_PROGRESS_PERCENT = minReqProgressFraction;
-    final float MAX_ALLOWED_STALL_TIME_PERCENT = maxStallTimeFraction;
+    // supposed to throw IOException if exceeded
+    isAbortLimitExceeedFor(srcAttempt);
 
     int doneMaps = numInputs - remainingMaps.get();
 
@@ -1025,7 +1028,7 @@ class ShuffleScheduler {
     // check if the reducer has progressed enough
     boolean reducerProgressedEnough =
       (((float)doneMaps / numInputs)
-          >= MIN_REQUIRED_PROGRESS_PERCENT);
+          >= minReqProgressFraction);
 
     // check if the reducer is stalled for a long time
     // duration for which the reducer is stalled
@@ -1038,7 +1041,7 @@ class ShuffleScheduler {
 
     boolean reducerStalled = (shuffleProgressDuration > 0) &&
       (((float)stallDuration / shuffleProgressDuration)
-          >= MAX_ALLOWED_STALL_TIME_PERCENT);
+          >= maxStallTimeFraction);
 
     // kill if not healthy and has insufficient progress
     if ((failureCounts.size() >= maxFailedUniqueFetches ||
@@ -1059,10 +1062,8 @@ class ShuffleScheduler {
         LOG.debug("Host failures=" + hostFailures.keySet());
       }
       // Shuffle knows how to deal with failures post shutdown via the onFailure hook
-      exceptionReporter.reportException(new IOException(errorMsg, fetchFailure.getCause()));
-      return false;
+      throw new IOException(errorMsg, fetchFailure.getCause());
     }
-    return true;
   }
 
   public synchronized void addKnownMapOutput(String inputHostName,