You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by jl...@apache.org on 2016/02/25 21:24:04 UTC
tez git commit: TEZ-3102. Fetch failure of a speculated task causes
job hang (jlowe) (cherry picked from commit
cc9dd2799ff67243017edb9ae5df42dc887032c9)
Repository: tez
Updated Branches:
refs/heads/branch-0.7 e2d584e40 -> dedec2a78
TEZ-3102. Fetch failure of a speculated task causes job hang (jlowe)
(cherry picked from commit cc9dd2799ff67243017edb9ae5df42dc887032c9)
Conflicts:
CHANGES.txt
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/dedec2a7
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/dedec2a7
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/dedec2a7
Branch: refs/heads/branch-0.7
Commit: dedec2a7804e4846198c4ed34d76f43b756e7e12
Parents: e2d584e
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Feb 25 20:23:45 2016 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Feb 25 20:23:45 2016 +0000
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 22 ++---
.../tez/dag/app/dag/impl/TestTaskImpl.java | 89 ++++++++++++++++++++
3 files changed, 97 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/dedec2a7/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index b97c129..ec25e5e 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -9,6 +9,7 @@ INCOMPATIBLE CHANGES
TEZ-2972. Avoid task rescheduling when a node turns unhealthy
ALL CHANGES
+ TEZ-3102. Fetch failure of a speculated task causes job hang
TEZ-3137. Tez task failed with illegal state exception in recovery
TEZ-3126. Log reason for not reducing parallelism
TEZ-3123. Containers can get re-used even with conflicting local resources.
http://git-wip-us.apache.org/repos/asf/tez/blob/dedec2a7/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index f78932b..e9ef69f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -1445,14 +1445,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskEventTAUpdate attemptEvent = (TaskEventTAUpdate) event;
TezTaskAttemptID attemptId = attemptEvent.getTaskAttemptID();
- if(task.successfulAttempt == attemptId) {
- // successful attempt is now killed. reschedule
- // tell the job about the rescheduling
- unSucceed(task);
- task.handleTaskAttemptCompletion(
- attemptId,
- TaskAttemptStateInternal.KILLED);
- task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+ TaskStateInternal resultState = TaskStateInternal.SUCCEEDED;
+ if(task.successfulAttempt.equals(attemptId)) {
// typically we are here because this map task was run on a bad node and
// we want to reschedule it on a different node.
// Depending on whether there are previous failed attempts or not this
@@ -1461,14 +1455,12 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// from the map splitInfo. So the bad node might be sent as a location
// to the RM. But the RM would ignore that just like it would ignore
// currently pending container requests affinitized to bad nodes.
- task.addAndScheduleAttempt(attemptId);
- return TaskStateInternal.SCHEDULED;
- } else {
- // nothing to do
- LOG.info("Ignoring kill of attempt: " + attemptId + " because attempt: " +
- task.successfulAttempt + " is already successful");
- return TaskStateInternal.SUCCEEDED;
+ unSucceed(task);
+ task.eventHandler.handle(new VertexEventTaskReschedule(task.taskId));
+ resultState = TaskStateInternal.SCHEDULED;
}
+ ATTEMPT_KILLED_TRANSITION.transition(task, event);
+ return resultState;
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/dedec2a7/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 8852d93..3b154a5 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -57,7 +57,9 @@ import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventScheduleTask;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.app.dag.event.TaskEventTermination;
@@ -644,6 +646,33 @@ public class TestTaskImpl {
Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
}
+ @SuppressWarnings("rawtypes")
+ @Test(timeout = 5000)
+ public void testTaskSucceedAndRetroActiveKilled() {
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ launchTaskAttempt(mockTask.getLastAttempt().getID());
+ updateAttemptState(mockTask.getLastAttempt(), TaskAttemptState.RUNNING);
+
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt().getID(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ // The task should now have succeeded
+ assertTaskSucceededState();
+ verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+ eq(mockTask.getLastAttempt().getID().getId()));
+
+ eventHandler.events.clear();
+ // Now kill the attempt after it has succeeded
+ mockTask.handle(new TaskEventTAUpdate(mockTask.getLastAttempt()
+ .getID(), TaskEventType.T_ATTEMPT_KILLED));
+
+ // The task should still be in the scheduled state
+ assertTaskScheduledState();
+ Event event = eventHandler.events.get(0);
+ Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+ }
+
@Test(timeout = 5000)
public void testDiagnostics_TAUpdate(){
TezTaskID taskId = getNewTaskID();
@@ -750,6 +779,66 @@ public class TestTaskImpl {
assertEquals(2, mockTask.getAttemptList().size());
}
+ @Test(timeout = 20000)
+ public void testSpeculatedThenRetroactiveFailure() {
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(firstAttempt.getID());
+ updateAttemptState(firstAttempt, TaskAttemptState.RUNNING);
+
+ // Add a speculative task attempt
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ADD_SPEC_ATTEMPT));
+ MockTaskAttemptImpl specAttempt = mockTask.getLastAttempt();
+ launchTaskAttempt(specAttempt.getID());
+ updateAttemptState(specAttempt, TaskAttemptState.RUNNING);
+ assertEquals(2, mockTask.getAttemptList().size());
+
+ // Have the first task succeed
+ eventHandler.events.clear();
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+
+ // The task should now have succeeded and sent kill to other attempt
+ assertTaskSucceededState();
+ verify(mockTask.stateChangeNotifier).taskSucceeded(any(String.class), eq(taskId),
+ eq(firstAttempt.getID().getId()));
+ @SuppressWarnings("rawtypes")
+ Event event = eventHandler.events.get(eventHandler.events.size()-1);
+ assertEquals(TaskAttemptEventType.TA_KILL_REQUEST, event.getType());
+ assertEquals(specAttempt.getID(),
+ ((TaskAttemptEventKillRequest) event).getTaskAttemptID());
+
+ // Emulate the spec attempt being killed
+ mockTask.handle(new TaskEventTAUpdate(specAttempt.getID(),
+ TaskEventType.T_ATTEMPT_KILLED));
+ assertTaskSucceededState();
+
+ // Now fail the attempt after it has succeeded
+ TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+ TezEvent mockTezEvent = mock(TezEvent.class);
+ EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
+ when(mockTezEvent.getSourceInfo()).thenReturn(meta);
+ TaskAttemptEventOutputFailed outputFailedEvent =
+ new TaskAttemptEventOutputFailed(mockDestId, mockTezEvent, 1);
+ eventHandler.events.clear();
+ mockTask.handle(new TaskEventTAUpdate(firstAttempt.getID(),
+ TaskEventType.T_ATTEMPT_FAILED, outputFailedEvent));
+
+ // The task should still be in the scheduled state
+ assertTaskScheduledState();
+ event = eventHandler.events.get(eventHandler.events.size()-1);
+ Assert.assertEquals(VertexEventType.V_TASK_RESCHEDULED, event.getType());
+
+ // There should be a new attempt, and report of output read error
+ // should be the causal TA
+ List<MockTaskAttemptImpl> attempts = mockTask.getAttemptList();
+ Assert.assertEquals(3, attempts.size());
+ MockTaskAttemptImpl newAttempt = attempts.get(2);
+ Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
+ }
+
// TODO Add test to validate the correct commit attempt.