You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2020/08/04 06:22:11 UTC

[tez] branch master updated: TEZ-4172: Let tasks be killed after too many overall attempts (László Bodor reviewed by Jonathan Turner Eagles)

This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/tez.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c405c9  TEZ-4172: Let tasks be killed after too many overall attempts (László Bodor reviewed by Jonathan Turner Eagles)
3c405c9 is described below

commit 3c405c96f4da9713968ff463af051d0ca8cd25ee
Author: László Bodor <bo...@gmail.com>
AuthorDate: Tue Aug 4 08:17:54 2020 +0200

    TEZ-4172: Let tasks be killed after too many overall attempts (László Bodor reviewed by Jonathan Turner Eagles)
    
    Signed-off-by: Laszlo Bodor <bo...@gmail.com>
---
 .../org/apache/tez/dag/api/TezConfiguration.java   | 10 +++++
 .../java/org/apache/tez/dag/app/dag/Vertex.java    |  1 +
 .../org/apache/tez/dag/app/dag/impl/TaskImpl.java  | 49 +++++++++++++++------
 .../apache/tez/dag/app/dag/impl/VertexImpl.java    |  8 ++++
 .../apache/tez/dag/app/dag/impl/TestTaskImpl.java  | 51 ++++++++++++++++++----
 5 files changed, 97 insertions(+), 22 deletions(-)

diff --git a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
index 58aecda..7dc412b 100644
--- a/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
+++ b/tez-api/src/main/java/org/apache/tez/dag/api/TezConfiguration.java
@@ -709,6 +709,16 @@ public class TezConfiguration extends Configuration {
   public static final int TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT = 4;
 
   /**
+   * Int value. The maximum number of attempts that can run for a particular task before the task is
+   * failed. This count every attempts, including failed, killed attempts. Task failure results in
+   * DAG failure. Default is 0, which disables this feature.
+   */
+  @ConfigurationScope(Scope.VERTEX)
+  @ConfigurationProperty(type = "integer")
+  public static final String TEZ_AM_TASK_MAX_ATTEMPTS = TEZ_AM_PREFIX + "task.max.attempts";
+  public static final int TEZ_AM_TASK_MAX_ATTEMPTS_DEFAULT = 0;
+
+  /**
    * Boolean value. Specifies whether a re-scheduled attempt of a task, caused by previous
    * failures gets higher priority
    */
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
index f3ef72b..ba3079d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/Vertex.java
@@ -214,6 +214,7 @@ public interface Vertex extends Comparable<Vertex> {
 
   interface VertexConfig {
     int getMaxFailedTaskAttempts();
+    int getMaxTaskAttempts();
     boolean getTaskRescheduleHigherPriority();
     boolean getTaskRescheduleRelaxedLocality();
 
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 3470216..0b4b116 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -126,6 +126,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private final TezTaskID taskId;
   private Map<TezTaskAttemptID, TaskAttempt> attempts;
   protected final int maxFailedAttempts;
+  protected final int maxAttempts; //overall max number of attempts (consider preempted task attempts)
   protected final Clock clock;
   private final Vertex vertex;
   private final Lock readLock;
@@ -159,7 +160,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   private final Set<NodeId> nodesWithRunningAttempts = Collections
       .newSetFromMap(new ConcurrentHashMap<NodeId, Boolean>());
 
-  private static final SingleArcTransition<TaskImpl , TaskEvent>
+  private static final MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal>
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      KILL_TRANSITION = new KillTransition();
@@ -193,7 +194,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
      .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.KILL_WAIT,
          TaskEventType.T_TERMINATE,
          KILL_TRANSITION)
-     .addTransition(TaskStateInternal.SCHEDULED, TaskStateInternal.SCHEDULED,
+     .addTransition(TaskStateInternal.SCHEDULED, EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
          TaskEventType.T_ATTEMPT_KILLED, ATTEMPT_KILLED_TRANSITION)
      .addTransition(TaskStateInternal.SCHEDULED,
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.FAILED),
@@ -216,7 +217,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         EnumSet.of(TaskStateInternal.SUCCEEDED),
         TaskEventType.T_ATTEMPT_SUCCEEDED,
         new AttemptSucceededTransition())
-    .addTransition(TaskStateInternal.RUNNING, TaskStateInternal.RUNNING,
+    .addTransition(TaskStateInternal.RUNNING, EnumSet.of(TaskStateInternal.RUNNING, TaskStateInternal.FAILED),
         TaskEventType.T_ATTEMPT_KILLED,
         ATTEMPT_KILLED_TRANSITION)
     .addTransition(TaskStateInternal.RUNNING,
@@ -380,6 +381,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     writeLock = readWriteLock.writeLock();
     this.attempts = Collections.emptyMap();
     maxFailedAttempts = vertex.getVertexConfig().getMaxFailedTaskAttempts();
+    maxAttempts = vertex.getVertexConfig().getMaxTaskAttempts();
     taskId = TezTaskID.getInstance(vertexId, taskIndex);
     this.taskCommunicatorManagerInterface = taskCommunicatorManagerInterface;
     this.taskHeartbeatHandler = thh;
@@ -776,7 +778,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   // This is always called in the Write Lock
-  private void addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) {
+  private boolean addAndScheduleAttempt(TezTaskAttemptID schedulingCausalTA) {
     TaskAttempt attempt = createAttempt(attempts.size(), schedulingCausalTA);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Created attempt " + attempt.getID());
@@ -794,13 +796,23 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
             attempt.getID() + " already existed");
         break;
-
       default:
         Preconditions.checkArgument(attempts.put(attempt.getID(), attempt) == null,
             attempt.getID() + " already existed");
         break;
     }
 
+    if (maxAttempts > 0 && attempts.size() == maxAttempts) {
+      TaskImpl task = (TaskImpl) attempt.getTask();
+      LOG.error("Cannot schedule new attempt for task as max number of attempts ({}) reached: {}",
+          maxAttempts, task);
+
+      task.logJobHistoryTaskFailedEvent(TaskState.FAILED);
+      task.eventHandler.handle(new VertexEventTaskCompleted(task.taskId, TaskState.FAILED));
+
+      return false;
+    }
+
     // TODO: Recovery
     /*
     // Update nextATtemptNumber
@@ -819,7 +831,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // picture in mind
     eventHandler.handle(new DAGEventSchedulerUpdate(
         DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, attempt));
-
+    return true;
   }
 
   @Override
@@ -1007,7 +1019,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       task.locationHint = scheduleEvent.getTaskLocationHint();
       task.baseTaskSpec = scheduleEvent.getBaseTaskSpec();
       // For now, initial scheduling dependency is due to vertex manager scheduling
-      task.addAndScheduleAttempt(null);
+      if (!task.addAndScheduleAttempt(null)) {
+        return TaskStateInternal.FAILED;
+      }
       return TaskStateInternal.SCHEDULED;
     }
   }
@@ -1108,7 +1122,9 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
           LOG.info("Can not recover the successful task attempt, schedule new task attempt,"
               + "taskId=" + task.getTaskId());
           task.successfulAttempt = null;
-          task.addAndScheduleAttempt(successTaId);
+          if (!task.addAndScheduleAttempt(successTaId)) {
+            task.finished(TaskStateInternal.FAILED);
+          }
           task.eventHandler.handle(new TaskAttemptEventAttemptKilled(successTaId,
               errorMsg, TaskAttemptTerminationCause.TERMINATED_AT_RECOVERY, true));
           return TaskStateInternal.RUNNING;
@@ -1167,9 +1183,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
   }
 
   private static class AttemptKilledTransition implements
-      SingleArcTransition<TaskImpl, TaskEvent> {
+      MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
     @Override
-    public void transition(TaskImpl task, TaskEvent event) {
+    public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
+      TaskStateInternal originalState = task.getInternalState();
+
       TaskEventTAUpdate castEvent = (TaskEventTAUpdate) event;
       task.addDiagnosticInfo("TaskAttempt " + castEvent.getTaskAttemptID().getId() + " killed");
       if (task.commitAttempt !=null &&
@@ -1197,8 +1215,11 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         task.getVertex().incrementKilledTaskAttemptCount();
       }
       if (task.shouldScheduleNewAttempt()) {
-        task.addAndScheduleAttempt(castEvent.getTaskAttemptID());
+        if (!task.addAndScheduleAttempt(castEvent.getTaskAttemptID())) {
+          return task.finished(TaskStateInternal.FAILED);
+        }
       }
+      return originalState;
     }
   }
 
@@ -1258,8 +1279,10 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         if (task.shouldScheduleNewAttempt()) {
           LOG.info("Scheduling new attempt for task: " + task.getTaskId()
               + ", currentFailedAttempts: " + task.failedAttempts + ", maxFailedAttempts: "
-              + task.maxFailedAttempts);
-          task.addAndScheduleAttempt(getSchedulingCausalTA());
+              + task.maxFailedAttempts + ", maxAttempts: " + task.maxAttempts);
+          if (!task.addAndScheduleAttempt(getSchedulingCausalTA())){
+            return task.finished(TaskStateInternal.FAILED);
+          }
         }
       } else {
         if (castEvent.getTaskFailureType() == TaskFailureType.NON_FATAL) {
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index db0cd46..711d028 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -4828,6 +4828,7 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
   static class VertexConfigImpl implements VertexConfig {
 
     private final int maxFailedTaskAttempts;
+    private final int maxTaskAttempts;
     private final boolean taskRescheduleHigherPriority;
     private final boolean taskRescheduleRelaxedLocality;
 
@@ -4847,6 +4848,8 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     public VertexConfigImpl(Configuration conf) {
       this.maxFailedTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
           TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT);
+      this.maxTaskAttempts = conf.getInt(TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS,
+          TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS_DEFAULT);
       this.taskRescheduleHigherPriority =
           conf.getBoolean(TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY,
               TezConfiguration.TEZ_AM_TASK_RESCHEDULE_HIGHER_PRIORITY_DEFAULT);
@@ -4873,6 +4876,11 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex, EventHandl
     }
 
     @Override
+    public int getMaxTaskAttempts() {
+      return maxTaskAttempts;
+    }
+
+    @Override
     public boolean getTaskRescheduleHigherPriority() {
       return taskRescheduleHigherPriority;
     }
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 1af6092..a28e786 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -286,10 +286,15 @@ public class TestTaskImpl {
     attempt.setState(s);
   }
 
-  private void killRunningTaskAttempt(TezTaskAttemptID attemptId) {
+  private void killRunningTaskAttempt(TezTaskAttemptID attemptId, TaskState stateToVerify) {
+    killRunningTaskAttempt(attemptId, stateToVerify, 1);
+  }
+
+  private void killRunningTaskAttempt(TezTaskAttemptID attemptId, TaskState stateToVerify,
+      int killedCountToVerify) {
     mockTask.handle(createTaskTAKilledEvent(attemptId));
-    assertTaskRunningState();
-    verify(mockTask.getVertex(), times(1)).incrementKilledTaskAttemptCount();
+    assertTaskState(stateToVerify);
+    verify(mockTask.getVertex(), times(killedCountToVerify)).incrementKilledTaskAttemptCount();
   }
 
   private void failRunningTaskAttempt(TezTaskAttemptID attemptId) {
@@ -310,21 +315,25 @@ public class TestTaskImpl {
    * {@link TaskState#NEW}
    */
   private void assertTaskNewState() {
-    assertEquals(TaskState.NEW, mockTask.getState());
+    assertTaskState(TaskState.NEW);
   }
 
   /**
    * {@link TaskState#SCHEDULED}
    */
   private void assertTaskScheduledState() {
-    assertEquals(TaskState.SCHEDULED, mockTask.getState());
+    assertTaskState(TaskState.SCHEDULED);
   }
 
   /**
    * {@link TaskState#RUNNING}
    */
   private void assertTaskRunningState() {
-    assertEquals(TaskState.RUNNING, mockTask.getState());
+    assertTaskState(TaskState.RUNNING);
+  }
+
+  private void assertTaskState(TaskState state) {
+    assertEquals(state, mockTask.getState());
   }
 
   /**
@@ -385,7 +394,7 @@ public class TestTaskImpl {
   }
 
   @Test(timeout = 5000)
-  public void testTooManyFailedAtetmpts() {
+  public void testTooManyFailedAttempts() {
     LOG.info("--- START: testTooManyFailedAttempts ---");
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId, TaskState.SCHEDULED);
@@ -409,6 +418,30 @@ public class TestTaskImpl {
   }
 
   @Test(timeout = 5000)
+  public void testTooManyAttempts() {
+    LOG.info("--- START: testTooManyAttempts ---");
+
+    conf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_ATTEMPTS, 3);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(conf)).when(vertex).getVertexConfig();
+    mockTask = new MockTaskImpl(vertexId, partition,
+        eventHandler, conf, taskCommunicatorManagerInterface, clock,
+        taskHeartbeatHandler, appContext, leafVertex,
+        taskResource, containerContext, vertex);
+
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId, TaskState.SCHEDULED); // attempt_0
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.RUNNING, 1); // attempt_1
+
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.FAILED, 2); // attempt_2 -> reached 3
+
+    assertEquals(TaskStateInternal.FAILED, mockTask.getInternalState());
+    verifyOutgoingEvents(eventHandler.events, VertexEventType.V_TASK_COMPLETED);
+  }
+
+  @Test(timeout = 5000)
   public void testFailedAttemptWithFatalError() {
     LOG.info("--- START: testFailedAttemptWithFatalError ---");
     TezTaskID taskId = getNewTaskID();
@@ -428,7 +461,7 @@ public class TestTaskImpl {
     TezTaskID taskId = getNewTaskID();
     scheduleTaskAttempt(taskId);
     launchTaskAttempt(mockTask.getLastAttempt().getID());
-    killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+    killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.RUNNING);
     assertEquals(TaskStateInternal.RUNNING, mockTask.getInternalState());
     killTask(taskId);
     mockTask.handle(createTaskTAKilledEvent(mockTask.getLastAttempt().getID()));
@@ -503,7 +536,7 @@ public class TestTaskImpl {
     scheduleTaskAttempt(taskId);
     TezTaskAttemptID lastTAId = mockTask.getLastAttempt().getID();
     launchTaskAttempt(mockTask.getLastAttempt().getID());
-    killRunningTaskAttempt(mockTask.getLastAttempt().getID());
+    killRunningTaskAttempt(mockTask.getLastAttempt().getID(), TaskState.RUNNING);
     // last killed attempt should be causal TA of next attempt
     Assert.assertEquals(lastTAId, mockTask.getLastAttempt().getSchedulingCausalTA());
   }