You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/18 08:09:16 UTC

tez git commit: TEZ-814. Improve heuristic for determining a task has failed outputs (bikas)

Repository: tez
Updated Branches:
  refs/heads/master 3be9c53b2 -> 94488e79d


TEZ-814. Improve heuristic for determining a task has failed outputs (bikas)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/94488e79
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/94488e79
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/94488e79

Branch: refs/heads/master
Commit: 94488e79d8d3d73e93337a57a48d82f82182591b
Parents: 3be9c53
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Sep 17 23:09:11 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Sep 17 23:09:11 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  4 ++
 .../apache/tez/dag/api/TezConfiguration.java    | 16 +++++++
 tez-dag/findbugs-exclude.xml                    |  1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 33 +++++++++----
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 49 ++++++++++++++++++--
 5 files changed, 90 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/94488e79/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 180a26d..9c34cca 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ Release 0.8.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2832. Support tests for both SimpleHistory logging and ATS logging
   TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
   TEZ-2827. Increase timeout for TestFetcher testInputAttemptIdentifierMap
@@ -183,6 +184,7 @@ Release 0.7.1: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2840. MRInputLegacy.init should set splitInfoViaEvents.
   TEZ-2829. Tez UI: minor fixes to in-progress update of UI from AM
   TEZ-2663. SessionNotRunning exceptions are wrapped in a ServiceException from a dying AM.
@@ -439,6 +441,7 @@ Release 0.6.3: Unreleased
 INCOMPATIBLE CHANGES
 
 ALL CHANGES:
+  TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2809. Minimal distribution compiled on 2.6 fails to run on 2.7
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
   down an AM.
@@ -659,6 +662,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-814.  Improve heuristic for determining a task has failed outputs
   TEZ-2768. Log a useful error message when the summary stream cannot be closed when shutting
   down an AM.
   TEZ-2745. ClassNotFoundException of user code should fail dag

http://git-wip-us.apache.org/repos/asf/tez/blob/94488e79/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 4d12740..f7fd8da 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -243,6 +243,22 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10;
 
   /**
+   * int value. Represents the maximum time in seconds for which a consumer attempt can report 
+   * a read error against its producer attempt, after which the producer attempt will be re-run 
+   * to re-generate the output. There are other heuristics which determine the retry and mainly 
+   * try to guard against a flurry of re-runs due to intermittent read errors 
+   * (due to network issues). This configuration puts a time limit on those heuristics to ensure
+   * jobs dont hang indefinitely due to lack of closure in those heuristics 
+   *
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  @ConfigurationProperty(type="integer")
+  public static final String TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC =
+      TEZ_AM_PREFIX + "max.allowed.time-sec.for-read-error";
+  public static final int TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT = 300;
+
+  /**
    * Boolean value. Determines when the final outputs to data sinks are committed. Commit is an
    * output specific operation and typically involves making the output visible for consumption. 
    * If the config is true, then the outputs are committed at the end of DAG completion after all 

http://git-wip-us.apache.org/repos/asf/tez/blob/94488e79/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index 6db3b7c..2122c5e 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -240,6 +240,7 @@
     <Or>
       <Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/>
       <Field name="MAX_ALLOWED_OUTPUT_FAILURES"/>
+      <Field name="MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC"/>
     </Or>
     <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
   </Match>

http://git-wip-us.apache.org/repos/asf/tez/blob/94488e79/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 2f228bd..35a23f9 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -110,6 +110,7 @@ import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 public class TaskAttemptImpl implements TaskAttempt,
     EventHandler<TaskAttemptEvent> {
@@ -195,10 +196,10 @@ public class TaskAttemptImpl implements TaskAttempt,
   Set<String> taskHosts = new HashSet<String>();
   Set<String> taskRacks = new HashSet<String>();
 
-  private Set<TezTaskAttemptID> uniquefailedOutputReports = 
-      new HashSet<TezTaskAttemptID>();
+  private Map<TezTaskAttemptID, Long> uniquefailedOutputReports = Maps.newHashMap();
   private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION;
   private static int MAX_ALLOWED_OUTPUT_FAILURES;
+  private static int MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC;
 
   protected final boolean isRescheduled;
   private final Resource taskResource;
@@ -458,6 +459,10 @@ public class TaskAttemptImpl implements TaskAttempt,
     MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
         .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
+    
+    MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC = conf.getInt(
+        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC,
+        TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC_DEFAULT);
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -1565,7 +1570,17 @@ public class TaskAttemptImpl implements TaskAttempt,
       LOG.info(attempt.getID()
             + " blamed for read error from " + failedDestTaId
             + " at inputIndex " + failedInputIndexOnDestTa);
-      attempt.uniquefailedOutputReports.add(failedDestTaId);
+      long time = attempt.clock.getTime();
+      Long firstErrReportTime = attempt.uniquefailedOutputReports.get(failedDestTaId);
+      if (firstErrReportTime == null) {
+        attempt.uniquefailedOutputReports.put(failedDestTaId, time);
+        firstErrReportTime = time;
+      }
+      
+      int readErrorTimespanSec = (int)((time - firstErrReportTime)/1000);
+      boolean crossTimeDeadline = readErrorTimespanSec >=
+      MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC ? true : false;
+
       float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
           / outputFailedEvent.getConsumerTaskNumber();
 
@@ -1577,14 +1592,16 @@ public class TaskAttemptImpl implements TaskAttempt,
       // If needed we can launch a background task without failing this task
       // to generate a copy of the output just in case.
       // If needed we can consider only running consumer tasks
-      if (withinFailureFractionLimits && withinOutputFailureLimits) {
+      if (!crossTimeDeadline && withinFailureFractionLimits && withinOutputFailureLimits) {
         return attempt.getInternalState();
       }
       String message = attempt.getID() + " being failed for too many output errors. "
-          + "failureFraction=" + failureFraction + ", "
-          + "MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION + ", "
-          + "uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() + ", "
-          + "MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES;
+          + "failureFraction=" + failureFraction
+          + ", MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION
+          + ", uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size()
+          + ", MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES
+          + ", MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC=" + MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC
+          + ", readErrorTimespan=" + readErrorTimespanSec;
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
       // send input failed event

http://git-wip-us.apache.org/repos/asf/tez/blob/94488e79/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 2d30a6f..a55d4a6 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1178,7 +1178,7 @@ public class TestTaskAttempt {
     assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
   }
 
-  @Test(timeout = 5000)
+  @Test//(timeout = 5000)
   // Verifies that multiple TooManyFetchFailures are handled correctly by the
   // TaskAttempt.
   public void testMultipleOutputFailed() throws Exception {
@@ -1324,15 +1324,54 @@ public class TestTaskAttempt {
     //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as
     // MAX_ALLOWED_OUTPUT_FAILURES has crossed the limit.
     taImpl2.handle(new TaskAttemptEventOutputFailed(taskAttemptID2, tzEvent, 8));
-    assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
-        TaskAttemptState.FAILED);
-
-    assertEquals("Task attempt is not in FAILED state", taImpl2.getState(),
+    assertEquals("Task attempt is not in failed state", taImpl2.getState(),
         TaskAttemptState.FAILED);
     assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl2.getTerminationCause());
     // verify unregister is not invoked again
     verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID2);
 
+    Clock mockClock = mock(Clock.class); 
+    int readErrorTimespanSec = 1;
+    taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 10);
+    taskConf.setInt(TezConfiguration.TEZ_AM_MAX_ALLOWED_TIME_FOR_TASK_READ_ERROR_SEC, readErrorTimespanSec);
+    TezTaskID taskID3 = TezTaskID.getInstance(vertexID, 3);
+    MockTaskAttemptImpl taImpl3 = new MockTaskAttemptImpl(taskID3, 1, eventHandler,
+        taListener, taskConf, mockClock,
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID3 = taImpl3.getID();
+
+    taImpl3.handle(new TaskAttemptEventSchedule(taskAttemptID3, 0, 0));
+    // At state STARTING.
+    taImpl3.handle(new TaskAttemptEventStartedRemotely(taskAttemptID3, contId, null));
+    verify(mockHeartbeatHandler).register(taskAttemptID3);
+    taImpl3.handle(new TaskAttemptEvent(taskAttemptID3, TaskAttemptEventType.TA_DONE));
+    assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
+        TaskAttemptState.SUCCEEDED);
+    verify(mockHeartbeatHandler).unregister(taskAttemptID3);
+
+    mockReEvent = InputReadErrorEvent.create("", 1, 1);
+    mockMeta = mock(EventMetaData.class);
+    mockDestId1 = mock(TezTaskAttemptID.class);
+    when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+    tzEvent = new TezEvent(mockReEvent, mockMeta);
+    when(mockClock.getTime()).thenReturn(1000L);
+    // time deadline not exceeded for a couple of read error events
+    taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000));
+    assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
+        TaskAttemptState.SUCCEEDED);
+    when(mockClock.getTime()).thenReturn(1500L);
+    taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000));
+    assertEquals("Task attempt is not in succeeded state", taImpl3.getState(),
+        TaskAttemptState.SUCCEEDED);
+    // exceed the time threshold
+    when(mockClock.getTime()).thenReturn(2001L);
+    taImpl3.handle(new TaskAttemptEventOutputFailed(taskAttemptID3, tzEvent, 1000));
+    assertEquals("Task attempt is not in FAILED state", taImpl3.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl3.getTerminationCause());
+    // verify unregister is not invoked again
+    verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID3);
   }
 
   @SuppressWarnings("deprecation")