You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by rb...@apache.org on 2015/06/18 17:04:04 UTC

tez git commit: TEZ-2552. CRC errors can cause job to run for very long time in large jobs (rbalamohan)

Repository: tez
Updated Branches:
  refs/heads/master 24fb21e65 -> 41f8a9737


TEZ-2552. CRC errors can cause job to run for very long time in large jobs (rbalamohan)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/41f8a973
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/41f8a973
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/41f8a973

Branch: refs/heads/master
Commit: 41f8a9737a6d2424f0453db3e93404323b248d85
Parents: 24fb21e
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Thu Jun 18 20:35:40 2015 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Thu Jun 18 20:35:40 2015 +0530

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/api/TezConfiguration.java    | 24 ++++++++++++
 tez-dag/findbugs-exclude.xml                    | 10 +++++
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   | 29 +++++++++++---
 .../tez/dag/app/dag/impl/TestTaskAttempt.java   | 41 ++++++++++++++++++--
 5 files changed, 96 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 7629232..a331b0f 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -5,6 +5,7 @@ Apache Tez Change Log
 Release 0.8.0: Unreleased
 
 INCOMPATIBLE CHANGES
+  TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
   TEZ-2468. Change the minimum Java version to Java 7.
 
 ALL CHANGES:

http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 15b1333..88b1dee 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -208,6 +208,30 @@ public class TezConfiguration extends Configuration {
   public static final String TEZ_TASK_LOG_LEVEL_DEFAULT = "INFO";
 
   /**
+   * double value. Represents ratio of unique failed outputs / number of consumer
+   * tasks. When this condition or value mentioned in {@link
+   * #TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES} is met, task would be declared as failed by AM.
+   *
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION =
+      TEZ_TASK_PREFIX + "max.allowed.output.failures.fraction";
+  public static final double TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT = 0.1;
+
+  /**
+   * Int value. Represents maximum allowed unique failures after which a task would be
+   * declared as failed by AM. When this condition or the threshold mentioned in {@link
+   * #TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION} is met, task would be relaunched by AM.
+   *
+   * Expert level setting.
+   */
+  @ConfigurationScope(Scope.AM)
+  public static final String TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES =
+      TEZ_TASK_PREFIX + "max.allowed.output.failures";
+  public static final int TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT = 10;
+
+  /**
    * Boolean value. Determines when the final outputs to data sinks are committed. Commit is an
    * output specific operation and typically involves making the output visible for consumption. 
    * If the config is true, then the outputs are committed at the end of DAG completion after all 

http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-dag/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/tez-dag/findbugs-exclude.xml b/tez-dag/findbugs-exclude.xml
index c89a9d2..ab7ae5c 100644
--- a/tez-dag/findbugs-exclude.xml
+++ b/tez-dag/findbugs-exclude.xml
@@ -229,4 +229,14 @@
     <Bug pattern="URF_UNREAD_PUBLIC_OR_PROTECTED_FIELD"/>
   </Match>
 
+  <!-- TEZ-2552 -->
+  <Match>
+    <Class name="org.apache.tez.dag.app.dag.impl.TaskAttemptImpl"/>
+    <Or>
+      <Field name="MAX_ALLOWED_OUTPUT_FAILURES_FRACTION"/>
+      <Field name="MAX_ALLOWED_OUTPUT_FAILURES"/>
+    </Or>
+    <Bug pattern="ST_WRITE_TO_STATIC_FROM_INSTANCE_METHOD"/>
+  </Match>
+
 </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 9e47287..f015155 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -155,10 +155,11 @@ public class TaskAttemptImpl implements TaskAttempt,
   // Used to store locality information when
   Set<String> taskHosts = new HashSet<String>();
   Set<String> taskRacks = new HashSet<String>();
-  
+
   private Set<TezTaskAttemptID> uniquefailedOutputReports = 
       new HashSet<TezTaskAttemptID>();
-  private static final double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = 0.25;
+  private static double MAX_ALLOWED_OUTPUT_FAILURES_FRACTION;
+  private static int MAX_ALLOWED_OUTPUT_FAILURES;
 
   protected final boolean isRescheduled;
   private final Resource taskResource;
@@ -373,6 +374,14 @@ public class TaskAttemptImpl implements TaskAttempt,
       boolean isRescheduled,
       Resource resource, ContainerContext containerContext, boolean leafVertex,
       Task task) {
+
+    this.MAX_ALLOWED_OUTPUT_FAILURES = conf.getInt(TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_DEFAULT);
+
+    this.MAX_ALLOWED_OUTPUT_FAILURES_FRACTION = conf.getDouble(TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION, TezConfiguration
+        .TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES_FRACTION_DEFAULT);
     ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
     this.readLock = rwLock.readLock();
     this.writeLock = rwLock.writeLock();
@@ -1408,15 +1417,23 @@ public class TaskAttemptImpl implements TaskAttempt,
       attempt.uniquefailedOutputReports.add(failedDestTaId);
       float failureFraction = ((float) attempt.uniquefailedOutputReports.size())
           / outputFailedEvent.getConsumerTaskNumber();
-      
-      // If needed we can also use the absolute number of reported output errors
+
+      boolean withinFailureFractionLimits =
+          (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION);
+      boolean withinOutputFailureLimits =
+          (attempt.uniquefailedOutputReports.size() < MAX_ALLOWED_OUTPUT_FAILURES);
+
       // If needed we can launch a background task without failing this task
       // to generate a copy of the output just in case.
       // If needed we can consider only running consumer tasks
-      if (failureFraction <= MAX_ALLOWED_OUTPUT_FAILURES_FRACTION) {
+      if (withinFailureFractionLimits && withinOutputFailureLimits) {
         return attempt.getInternalState();
       }
-      String message = attempt.getID() + " being failed for too many output errors";
+      String message = attempt.getID() + " being failed for too many output errors. "
+          + "failureFraction=" + failureFraction + ", "
+          + "MAX_ALLOWED_OUTPUT_FAILURES_FRACTION=" + MAX_ALLOWED_OUTPUT_FAILURES_FRACTION + ", "
+          + "uniquefailedOutputReports=" + attempt.uniquefailedOutputReports.size() + ", "
+          + "MAX_ALLOWED_OUTPUT_FAILURES=" + MAX_ALLOWED_OUTPUT_FAILURES;
       LOG.info(message);
       attempt.addDiagnosticInfo(message);
       // send input failed event

http://git-wip-us.apache.org/repos/asf/tez/blob/41f8a973/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 15d5609..d21f715 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -1169,14 +1169,14 @@ public class TestTaskAttempt {
     TezTaskAttemptID mockDestId1 = mock(TezTaskAttemptID.class);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
     TezEvent tzEvent = new TezEvent(mockReEvent, mockMeta);
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     
     // failure threshold not met. state is SUCCEEDED
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
 
     // sending same error again doesnt change anything
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
         TaskAttemptState.SUCCEEDED);
     // default value of error cause
@@ -1185,7 +1185,7 @@ public class TestTaskAttempt {
     // different destination attempt reports error. now threshold crossed
     TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
     when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId2);    
-    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
+    taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
     
     assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
         TaskAttemptState.FAILED);
@@ -1210,6 +1210,41 @@ public class TestTaskAttempt {
     // No new events.
     verify(eventHandler, times(expectedEventsAfterFetchFailure)).handle(
         arg.capture());
+
+    taskConf.setInt(TezConfiguration.TEZ_TASK_MAX_ALLOWED_OUTPUT_FAILURES, 1);
+    TezTaskID taskID2 = TezTaskID.getInstance(vertexID, 2);
+    MockTaskAttemptImpl taImpl2 = new MockTaskAttemptImpl(taskID2, 1, eventHandler,
+        taListener, taskConf, new SystemClock(),
+        mockHeartbeatHandler, appCtx, false,
+        resource, createFakeContainerContext(), false);
+    TezTaskAttemptID taskAttemptID2 = taImpl2.getID();
+
+    taImpl2.handle(new TaskAttemptEventSchedule(taskAttemptID2, 0, 0));
+    // At state STARTING.
+    taImpl2.handle(new TaskAttemptEventStartedRemotely(taskAttemptID2, contId, null));
+    verify(mockHeartbeatHandler).register(taskAttemptID2);
+    taImpl2.handle(new TaskAttemptEvent(taskAttemptID2, TaskAttemptEventType.TA_DONE));
+    assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
+        TaskAttemptState.SUCCEEDED);
+    verify(mockHeartbeatHandler).unregister(taskAttemptID2);
+
+    mockReEvent = InputReadErrorEvent.create("", 1, 1);
+    mockMeta = mock(EventMetaData.class);
+    mockDestId1 = mock(TezTaskAttemptID.class);
+    when(mockMeta.getTaskAttemptID()).thenReturn(mockDestId1);
+    tzEvent = new TezEvent(mockReEvent, mockMeta);
+    //This should fail even when MAX_ALLOWED_OUTPUT_FAILURES_FRACTION is within limits, as
+    // MAX_ALLOWED_OUTPUT_FAILURES has crossed the limit.
+    taImpl2.handle(new TaskAttemptEventOutputFailed(taskAttemptID2, tzEvent, 8));
+    assertEquals("Task attempt is not in succeeded state", taImpl2.getState(),
+        TaskAttemptState.FAILED);
+
+    assertEquals("Task attempt is not in FAILED state", taImpl2.getState(),
+        TaskAttemptState.FAILED);
+    assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl2.getTerminationCause());
+    // verify unregister is not invoked again
+    verify(mockHeartbeatHandler, times(1)).unregister(taskAttemptID2);
+
   }
 
   @SuppressWarnings("deprecation")