You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/06/22 15:44:14 UTC

tez git commit: TEZ-3758. Vertex can hang in RUNNING state when two task attempts finish very closely and have retroactive failures (Kuhu Shukla via jeagles)

Repository: tez
Updated Branches:
  refs/heads/master 4fa2e8641 -> 4a7719b0c


TEZ-3758. Vertex can hang in RUNNING state when two task attempts finish very closely and have retroactive failures (Kuhu Shukla via jeagles)


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a7719b0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a7719b0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a7719b0

Branch: refs/heads/master
Commit: 4a7719b0c164eae41845815d103ceb7e6e2548c3
Parents: 4fa2e86
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Thu Jun 22 15:44:08 2017 +0000
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Thu Jun 22 15:44:08 2017 +0000

----------------------------------------------------------------------
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   |  32 +++-
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 145 ++++++++++++++++++-
 2 files changed, 166 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/4a7719b0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index f25e583..c8e911e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -151,6 +151,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
      ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
   private static final SingleArcTransition<TaskImpl, TaskEvent>
      KILL_TRANSITION = new KillTransition();
+  private static final SingleArcTransition<TaskImpl, TaskEvent>
+      REDUNDANT_COMPLETED_TRANSITION = new AttemptRedundantCompletedTransition();
 
 
   private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback();
@@ -244,12 +246,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     .addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
         EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
         TaskEventType.T_ATTEMPT_KILLED, new TaskRetroactiveKilledTransition())
-    // Ignore-able transitions.
     .addTransition(
         TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
-        EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
+        TaskEventType.T_ATTEMPT_SUCCEEDED, REDUNDANT_COMPLETED_TRANSITION)
+    // Ignore-able transitions.
+    .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+        EnumSet.of(
+            TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_TERMINATE,
-            TaskEventType.T_ATTEMPT_SUCCEEDED, // Maybe track and reuse later
             TaskEventType.T_ATTEMPT_LAUNCHED))
     .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
         TaskEventType.T_SCHEDULE)
@@ -257,12 +261,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     // Transitions from FAILED state
     .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
         EnumSet.of(
-            TaskEventType.T_TERMINATE,
-            TaskEventType.T_SCHEDULE,
-            TaskEventType.T_ADD_SPEC_ATTEMPT,
             TaskEventType.T_ATTEMPT_FAILED,
             TaskEventType.T_ATTEMPT_KILLED,
-            TaskEventType.T_ATTEMPT_SUCCEEDED))
+            TaskEventType.T_ATTEMPT_SUCCEEDED), REDUNDANT_COMPLETED_TRANSITION)
+    .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+        EnumSet.of(
+            TaskEventType.T_TERMINATE,
+            TaskEventType.T_SCHEDULE,
+            TaskEventType.T_ADD_SPEC_ATTEMPT))
 
     // Transitions from KILLED state
     // Ignorable event: T_ATTEMPT_KILLED
@@ -1263,7 +1269,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
       if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
           !failedAttemptId.equals(task.successfulAttempt)) {
         // don't allow a different task attempt to override a previous
-        // succeeded state
+        // succeeded state and mark the attempt status as done
+        task.taskAttemptStatus.put(failedAttemptId.getId(), true);
         return TaskStateInternal.SUCCEEDED;
       }
       
@@ -1346,6 +1353,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
     }
   }
 
+  private static class AttemptRedundantCompletedTransition
+      implements SingleArcTransition<TaskImpl, TaskEvent> {
+    @Override
+    public void transition(TaskImpl task, TaskEvent event) {
+      TezTaskAttemptID successTaId = ((TaskEventTAUpdate)event).getTaskAttemptID();
+      task.taskAttemptStatus.put(successTaId.getId(), true);
+    }
+  }
+
   private static class TaskStateChangedCallback
       implements OnStateChangedCallback<TaskStateInternal, TaskImpl> {
 

http://git-wip-us.apache.org/repos/asf/tez/blob/4a7719b0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index da25927..e5d564e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -32,14 +32,21 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.tez.common.TezAbstractEvent;
-import org.apache.tez.dag.app.dag.DAGScheduler;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
 import org.apache.tez.dag.app.dag.event.DAGEventType;
 import org.apache.tez.dag.app.dag.event.TaskEvent;
 import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
 import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
 import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
 import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
 import org.apache.tez.runtime.api.TaskFailureType;
 import org.apache.tez.dag.history.DAGHistoryEvent;
 import org.apache.tez.dag.history.HistoryEvent;
@@ -127,6 +134,7 @@ public class TestTaskImpl {
 
   private MockTaskImpl mockTask;
   private TaskSpec mockTaskSpec;
+  private Vertex mockVertex;
   
   @SuppressWarnings("rawtypes")
   class TestEventHandler implements EventHandler<Event> {
@@ -152,9 +160,13 @@ public class TestTaskImpl {
     dagId = TezDAGID.getInstance(appId, 1);
     vertexId = TezVertexID.getInstance(dagId, 1);
     appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+    when(appContext.getDAGRecoveryData()).thenReturn(null);
+    appContext.setDAGRecoveryData(null);
     mockContainerId = mock(ContainerId.class);
     mockContainer = mock(Container.class);
     mockAMContainer = mock(AMContainer.class);
+    when(mockAMContainer.getContainer()).thenReturn(mockContainer);
+    when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:1234");
     mockNodeId = mock(NodeId.class);
     mockHistoryHandler = mock(HistoryEventHandler.class);
     when(mockContainer.getId()).thenReturn(mockContainerId);
@@ -178,6 +190,11 @@ public class TestTaskImpl {
         taskHeartbeatHandler, appContext, leafVertex,
         taskResource, containerContext, vertex);
     mockTaskSpec = mock(TaskSpec.class);
+    mockVertex = mock(Vertex.class);
+    ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
+      .setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
+    when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
+    when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(conf));
   }
 
   private TezTaskID getNewTaskID() {
@@ -947,6 +964,123 @@ public class TestTaskImpl {
     Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
+    launchTaskAttempt(firstMockTaskAttempt.getID());
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
+    launchTaskAttempt(secondMockTaskAttempt.getID());
+
+    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
+    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
+
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
+        TaskAttemptEventType.TA_DONE));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEvent(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
+        TaskAttemptEventType.TA_DONE));
+
+    mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID()));
+    mockTask.handle(new TaskEventTASucceeded(firstMockTaskAttempt.getID()));
+    assertTrue("Attempts should have succeeded!",
+        firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED
+        && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED);
+    assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
+    assertTrue("Task should have Succeeded!", mockTask.getState() == TaskState.SUCCEEDED);
+    //Failing the attempt that finished after the task was marked succeeded, should not schedule another attempt
+    failAttempt(firstMockTaskAttempt, 0, 0);
+    assertTaskSucceededState();
+    //Failing the attempt that allowed the task to succeed, should schedule another attempt
+    failAttempt(secondMockTaskAttempt, 1, 1);
+    assertTaskScheduledState();
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Test
+  public void testFailedAttemptStatus() throws InterruptedException {
+    Configuration newConf = new Configuration(conf);
+    newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+    Vertex vertex = mock(Vertex.class);
+    doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
+    mockTask = new MockTaskImpl(vertexId, partition,
+      eventHandler, conf, taskCommunicatorManagerInterface, clock,
+      taskHeartbeatHandler, appContext, leafVertex,
+      taskResource, containerContext, vertex);
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
+    launchTaskAttempt(firstMockTaskAttempt.getID());
+    mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+    MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
+    launchTaskAttempt(secondMockTaskAttempt.getID());
+
+    firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
+    firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
+    secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+        TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
+
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
+    secondMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
+        TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test",
+        TaskAttemptTerminationCause.NO_PROGRESS));
+    firstMockTaskAttempt.handle(
+        new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
+        TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
+        TaskAttemptTerminationCause.NO_PROGRESS));
+
+    firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
+        firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
+    secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
+        secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
+    mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
+        mock(TaskAttemptEvent.class)));
+    mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
+        mock(TaskAttemptEvent.class)));
+    assertTrue("Attempts should have failed!",
+        firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED
+        && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED);
+    assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
+    assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
+  }
+
+  private void failAttempt(MockTaskAttemptImpl taskAttempt, int index, int expectedIncompleteAttempts) {
+    InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, index);
+    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
+    TezEvent tzEvent = new TezEvent(mockReEvent, meta);
+    TaskAttemptEventOutputFailed outputFailedEvent =
+        new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1);
+    taskAttempt.handle(
+        outputFailedEvent);
+    TaskEvent tEventFail1 = new TaskEventTAFailed(taskAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent);
+    mockTask.handle(tEventFail1);
+    assertEquals("Unexpected number of incomplete attempts!",
+        expectedIncompleteAttempts, mockTask.getUncompletedAttemptsCount());
+  }
+
   // TODO Add test to validate the correct commit attempt.
 
 
@@ -1053,7 +1187,12 @@ public class TestTaskImpl {
           appContext, isRescheduled, resource, containerContext, false, null,
           locationHint, mockTaskSpec, schedCausalTA);
     }
-    
+
+    @Override
+    protected Vertex getVertex() {
+      return mockVertex;
+    }
+
     @Override
     public float getProgress() {
       return progress;