You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/02/25 21:24:04 UTC

tez git commit: TEZ-3102. Fetch failure of a speculated task causes job hang (jlowe) (cherry picked from commit cc9dd2799ff67243017edb9ae5df42dc887032c9)

Repository: tez
Updated Branches:
  refs/heads/branch-0.7 e2d584e40 -> dedec2a78


TEZ-3102. Fetch failure of a speculated task causes job hang (jlowe)
(cherry picked from commit cc9dd2799ff67243017edb9ae5df42dc887032c9)

Conflicts:

	CHANGES.txt
	tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dedec2a7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dedec2a7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dedec2a7

Branch: refs/heads/branch-0.7
Commit: dedec2a7804e4846198c4ed34d76f43b756e7e12
Parents: e2d584e
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Feb 25 20:23:45 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Feb 25 20:23:45 2016 +0000

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../apache/tez/dag/app/dag/impl/TaskImpl.java   | 22 ++---
 .../tez/dag/app/dag/impl/TestTaskImpl.java      | 89 ++++++++++++++++++++
 3 files changed, 97 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/dedec2a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b97c129..ec25e5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
   TEZ-2972. Avoid task rescheduling when a node turns unhealthy
 
 ALL CHANGES
+  TEZ-3102. Fetch failure of a speculated task causes job hang
   TEZ-3137. Tez task failed with illegal state exception in recovery
   TEZ-3126. Log reason for not reducing parallelism
   TEZ-3123. Containers can get re-used even with conflicting local resources.

http://git-wip-us.apache.org/repos/asf/tez/blob/dedec2a7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index f78932b..e9ef69f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1445,14 +1445,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
 
       TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event;
       TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID();
-      if(task.successfulAttempt == attemptId) {
-        // successful attempt is now killed. reschedule
-        // tell the job about the rescheduling
-        unSucceed(task);
-        task.handleTaskAttemptCompletion(
-            attemptId,
-            TaskAttemptStateInternal.KILLED);
-        task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+      TaskStateInternal resultState = TaskStateInternal.SUCCEEDED;
+      if(task.successfulAttempt.equals(attemptId)) {
         // typically we are here because this map task was run on a bad node and
         // we want to reschedule it on a different node.
         // Depending on whether there are previous failed attempts or not this
@@ -1461,14 +1455,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
         // from the map splitInfo. So the bad node might be sent as a location
         // to the RM. But the RM would ignore that just like it would ignore
         // currently pending container requests affinitized to bad nodes.
-        task.addAndScheduleAttempt(attemptId);
-        return TaskStateInternal.SCHEDULED;
-      } else {
-        // nothing to do
-        LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
-            task.successfulAttempt + " is already successful");
-        return TaskStateInternal.SUCCEEDED;
+        unSucceed(task);
+        task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+        resultState = TaskStateInternal.SCHEDULED;
       }
+      ATTEMPT_KILLED_TRANSITION.transition(task, event);
+      return resultState;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/tez/blob/dedec2a7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 8852d93..3b154a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -57,7 +57,9 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
 import org.apache.tez.dag.app.dag.TaskStateInternal;
 import org.apache.tez.dag.app.dag.Vertex;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
 import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
 import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
 import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -644,6 +646,33 @@ public class TestTaskImpl {
     Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
   }
 
+  @SuppressWarnings("rawtypes")
+  @Test(timeout = 5000)
+  public void testTaskSucceedAndRetroActiveKilled() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    launchTaskAttempt(mockTask.getLastAttempt().getID());
+    updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    // The task should now have succeeded
+    assertTaskSucceededState();
+    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+        eq(mockTask.getLastAttempt().getID().getId()));
+
+    eventHandler.events.clear();
+    // Now kill the attempt after it has succeeded
+    mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
+        .getID(), TaskEventType.T_ATTEMPT_KILLED));
+
+    // The task should still be in the scheduled state
+    assertTaskScheduledState();
+    Event event = eventHandler.events.get(0);
+    Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+  }
+
   @Test(timeout = 5000)
   public void testDiagnostics_TAUpdate(){
     TezTaskID taskId = getNewTaskID();
@@ -750,6 +779,66 @@ public class TestTaskImpl {
     assertEquals(2, mockTask.getAttemptList().size());
   }
 
+  @Test(timeout = 20000)
+  public void testSpeculatedThenRetroactiveFailure() {
+    TezTaskID taskId = getNewTaskID();
+    scheduleTaskAttempt(taskId);
+    MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(firstAttempt.getID());
+    updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+    // Add a speculative task attempt
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ADD_SPEC_ATTEMPT));
+    MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+    launchTaskAttempt(specAttempt.getID());
+    updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+    assertEquals(2, mockTask.getAttemptList().size());
+
+    // Have the first task succeed
+    eventHandler.events.clear();
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+    // The task should now have succeeded and sent kill to other attempt
+    assertTaskSucceededState();
+    verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+        eq(firstAttempt.getID().getId()));
+    @SuppressWarnings("rawtypes")
+    Event event = eventHandler.events.get(eventHandler.events.size()-1);
+    assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
+    assertEquals(specAttempt.getID(),
+        ((TaskAttemptEventKillRequest) event).getTaskAttemptID());
+
+    // Emulate the spec attempt being killed
+    mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+        TaskEventType.T_ATTEMPT_KILLED));
+    assertTaskSucceededState();
+
+    // Now fail the attempt after it has succeeded
+    TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+    TezEvent mockTezEvent = mock(TezEvent.class);
+    EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
+    when(mockTezEvent.getSourceInfo()).thenReturn(meta);
+    TaskAttemptEventOutputFailed outputFailedEvent =
+        new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
+    eventHandler.events.clear();
+    mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+        TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
+
+    // The task should still be in the scheduled state
+    assertTaskScheduledState();
+    event = eventHandler.events.get(eventHandler.events.size()-1);
+    Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+
+    // There should be a new attempt, and report of output read error
+    // should be the causal TA
+    List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
+    Assert.assertEquals(3, attempts.size());
+    MockTaskAttemptImpl newAttempt = attempts.get(2);
+    Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
+  }
+
   // TODO Add test to validate the correct commit attempt.