You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by je...@apache.org on 2017/06/22 15:44:14 UTC
tez git commit: TEZ-3758. Vertex can hang in RUNNING state when two
task attempts finish very closely and have retroactive failures (Kuhu Shukla
via jeagles)
Repository: tez
Updated Branches:
refs/heads/master 4fa2e8641 -> 4a7719b0c
TEZ-3758. Vertex can hang in RUNNING state when two task attempts finish very closely and have retroactive failures (Kuhu Shukla via jeagles)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/4a7719b0
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/4a7719b0
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/4a7719b0
Branch: refs/heads/master
Commit: 4a7719b0c164eae41845815d103ceb7e6e2548c3
Parents: 4fa2e86
Author: Jonathan Eagles <je...@yahoo-inc.com>
Authored: Thu Jun 22 15:44:08 2017 +0000
Committer: Jonathan Eagles <je...@yahoo-inc.com>
Committed: Thu Jun 22 15:44:08 2017 +0000
----------------------------------------------------------------------
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 32 +++-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 145 ++++++++++++++++++-
2 files changed, 166 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/4a7719b0/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index f25e583..c8e911e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -151,6 +151,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
ATTEMPT_KILLED_TRANSITION = new AttemptKilledTransition();
private static final SingleArcTransition<TaskImpl, TaskEvent>
KILL_TRANSITION = new KillTransition();
+ private static final SingleArcTransition<TaskImpl, TaskEvent>
+ REDUNDANT_COMPLETED_TRANSITION = new AttemptRedundantCompletedTransition();
private static final TaskStateChangedCallback STATE_CHANGED_CALLBACK = new TaskStateChangedCallback();
@@ -244,12 +246,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
.addTransition(TaskStateInternal.SUCCEEDED, //only possible for map tasks
EnumSet.of(TaskStateInternal.SCHEDULED, TaskStateInternal.SUCCEEDED),
TaskEventType.T_ATTEMPT_KILLED, new TaskRetroactiveKilledTransition())
- // Ignore-able transitions.
.addTransition(
TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
- EnumSet.of(TaskEventType.T_ADD_SPEC_ATTEMPT,
+ TaskEventType.T_ATTEMPT_SUCCEEDED, REDUNDANT_COMPLETED_TRANSITION)
+ // Ignore-able transitions.
+ .addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
+ EnumSet.of(
+ TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_TERMINATE,
- TaskEventType.T_ATTEMPT_SUCCEEDED, // Maybe track and reuse later
TaskEventType.T_ATTEMPT_LAUNCHED))
.addTransition(TaskStateInternal.SUCCEEDED, TaskStateInternal.SUCCEEDED,
TaskEventType.T_SCHEDULE)
@@ -257,12 +261,14 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// Transitions from FAILED state
.addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
EnumSet.of(
- TaskEventType.T_TERMINATE,
- TaskEventType.T_SCHEDULE,
- TaskEventType.T_ADD_SPEC_ATTEMPT,
TaskEventType.T_ATTEMPT_FAILED,
TaskEventType.T_ATTEMPT_KILLED,
- TaskEventType.T_ATTEMPT_SUCCEEDED))
+ TaskEventType.T_ATTEMPT_SUCCEEDED), REDUNDANT_COMPLETED_TRANSITION)
+ .addTransition(TaskStateInternal.FAILED, TaskStateInternal.FAILED,
+ EnumSet.of(
+ TaskEventType.T_TERMINATE,
+ TaskEventType.T_SCHEDULE,
+ TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from KILLED state
// Ignorable event: T_ATTEMPT_KILLED
@@ -1263,7 +1269,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
!failedAttemptId.equals(task.successfulAttempt)) {
// don't allow a different task attempt to override a previous
- // succeeded state
+ // succeeded state and mark the attempt status as done
+ task.taskAttemptStatus.put(failedAttemptId.getId(), true);
return TaskStateInternal.SUCCEEDED;
}
@@ -1346,6 +1353,15 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
+ private static class AttemptRedundantCompletedTransition
+ implements SingleArcTransition<TaskImpl, TaskEvent> {
+ @Override
+ public void transition(TaskImpl task, TaskEvent event) {
+ TezTaskAttemptID successTaId = ((TaskEventTAUpdate)event).getTaskAttemptID();
+ task.taskAttemptStatus.put(successTaId.getId(), true);
+ }
+ }
+
private static class TaskStateChangedCallback
implements OnStateChangedCallback<TaskStateInternal, TaskImpl> {
http://git-wip-us.apache.org/repos/asf/tez/blob/4a7719b0/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index da25927..e5d564e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -32,14 +32,21 @@ import java.util.List;
import java.util.Map;
import org.apache.tez.common.TezAbstractEvent;
-import org.apache.tez.dag.app.dag.DAGScheduler;
-import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
+import org.apache.tez.dag.api.TezConstants;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAFailed;
import org.apache.tez.dag.app.dag.event.TaskEventTAKilled;
import org.apache.tez.dag.app.dag.event.TaskEventTALaunched;
import org.apache.tez.dag.app.dag.event.TaskEventTASucceeded;
+import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
+import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
@@ -127,6 +134,7 @@ public class TestTaskImpl {
private MockTaskImpl mockTask;
private TaskSpec mockTaskSpec;
+ private Vertex mockVertex;
@SuppressWarnings("rawtypes")
class TestEventHandler implements EventHandler<Event> {
@@ -152,9 +160,13 @@ public class TestTaskImpl {
dagId = TezDAGID.getInstance(appId, 1);
vertexId = TezVertexID.getInstance(dagId, 1);
appContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
+ when(appContext.getDAGRecoveryData()).thenReturn(null);
+ appContext.setDAGRecoveryData(null);
mockContainerId = mock(ContainerId.class);
mockContainer = mock(Container.class);
mockAMContainer = mock(AMContainer.class);
+ when(mockAMContainer.getContainer()).thenReturn(mockContainer);
+ when(mockContainer.getNodeHttpAddress()).thenReturn("localhost:1234");
mockNodeId = mock(NodeId.class);
mockHistoryHandler = mock(HistoryEventHandler.class);
when(mockContainer.getId()).thenReturn(mockContainerId);
@@ -178,6 +190,11 @@ public class TestTaskImpl {
taskHeartbeatHandler, appContext, leafVertex,
taskResource, containerContext, vertex);
mockTaskSpec = mock(TaskSpec.class);
+ mockVertex = mock(Vertex.class);
+ ServicePluginInfo servicePluginInfo = new ServicePluginInfo()
+ .setContainerLauncherName(TezConstants.getTezYarnServicePluginName());
+ when(mockVertex.getServicePluginInfo()).thenReturn(servicePluginInfo);
+ when(mockVertex.getVertexConfig()).thenReturn(new VertexImpl.VertexConfigImpl(conf));
}
private TezTaskID getNewTaskID() {
@@ -947,6 +964,123 @@ public class TestTaskImpl {
Assert.assertEquals(mockDestId, newAttempt.getSchedulingCausalTA());
}
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testSucceededAttemptStatusWithRetroActiveFailures() throws InterruptedException {
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
+ launchTaskAttempt(firstMockTaskAttempt.getID());
+ mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+ MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
+ launchTaskAttempt(secondMockTaskAttempt.getID());
+
+ firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+ TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
+ secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+ TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
+ firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+ TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
+ secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+ TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
+
+ secondMockTaskAttempt.handle(
+ new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
+ firstMockTaskAttempt.handle(
+ new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
+ secondMockTaskAttempt.handle(
+ new TaskAttemptEvent(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
+ TaskAttemptEventType.TA_DONE));
+ firstMockTaskAttempt.handle(
+ new TaskAttemptEvent(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
+ TaskAttemptEventType.TA_DONE));
+
+ mockTask.handle(new TaskEventTASucceeded(secondMockTaskAttempt.getID()));
+ mockTask.handle(new TaskEventTASucceeded(firstMockTaskAttempt.getID()));
+ assertTrue("Attempts should have succeeded!",
+ firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED
+ && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED);
+ assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
+ assertTrue("Task should have Succeeded!", mockTask.getState() == TaskState.SUCCEEDED);
+ //Failing the attempt that finished after the task was marked succeeded, should not schedule another attempt
+ failAttempt(firstMockTaskAttempt, 0, 0);
+ assertTaskSucceededState();
+ //Failing the attempt that allowed the task to succeed, should schedule another attempt
+ failAttempt(secondMockTaskAttempt, 1, 1);
+ assertTaskScheduledState();
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
+ public void testFailedAttemptStatus() throws InterruptedException {
+ Configuration newConf = new Configuration(conf);
+ newConf.setInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS, 1);
+ Vertex vertex = mock(Vertex.class);
+ doReturn(new VertexImpl.VertexConfigImpl(newConf)).when(vertex).getVertexConfig();
+ mockTask = new MockTaskImpl(vertexId, partition,
+ eventHandler, conf, taskCommunicatorManagerInterface, clock,
+ taskHeartbeatHandler, appContext, leafVertex,
+ taskResource, containerContext, vertex);
+ TezTaskID taskId = getNewTaskID();
+ scheduleTaskAttempt(taskId);
+ MockTaskAttemptImpl firstMockTaskAttempt = mockTask.getAttemptList().get(0);
+ launchTaskAttempt(firstMockTaskAttempt.getID());
+ mockTask.handle(createTaskTAAddSpecAttempt(mockTask.getLastAttempt().getID()));
+ MockTaskAttemptImpl secondMockTaskAttempt = mockTask.getAttemptList().get(1);
+ launchTaskAttempt(secondMockTaskAttempt.getID());
+
+ firstMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+ TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), 10, 10));
+ secondMockTaskAttempt.handle(new TaskAttemptEventSchedule(
+ TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), 10, 10));
+ firstMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+ TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()), mockContainer.getId()));
+ secondMockTaskAttempt.handle(new TaskAttemptEventSubmitted(
+ TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()), mockContainer.getId()));
+
+ secondMockTaskAttempt.handle(
+ new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString())));
+ firstMockTaskAttempt.handle(
+ new TaskAttemptEventStartedRemotely(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString())));
+ secondMockTaskAttempt.handle(
+ new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(secondMockTaskAttempt.toString()),
+ TaskAttemptEventType.TA_FAILED,TaskFailureType.NON_FATAL, "test",
+ TaskAttemptTerminationCause.NO_PROGRESS));
+ firstMockTaskAttempt.handle(
+ new TaskAttemptEventAttemptFailed(TezTaskAttemptID.fromString(firstMockTaskAttempt.toString()),
+ TaskAttemptEventType.TA_FAILED, TaskFailureType.NON_FATAL, "test",
+ TaskAttemptTerminationCause.NO_PROGRESS));
+
+ firstMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
+ firstMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
+ secondMockTaskAttempt.handle(new TaskAttemptEventContainerTerminated(mockContainerId,
+ secondMockTaskAttempt.getID(), "test", TaskAttemptTerminationCause.NO_PROGRESS));
+ mockTask.handle(new TaskEventTAFailed(secondMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
+ mock(TaskAttemptEvent.class)));
+ mockTask.handle(new TaskEventTAFailed(firstMockTaskAttempt.getID(), TaskFailureType.NON_FATAL,
+ mock(TaskAttemptEvent.class)));
+ assertTrue("Attempts should have failed!",
+ firstMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED
+ && secondMockTaskAttempt.getInternalState() == TaskAttemptStateInternal.FAILED);
+ assertEquals("Task should have no uncompleted attempts!", 0, mockTask.getUncompletedAttemptsCount());
+ assertTrue("Task should have failed!", mockTask.getState() == TaskState.FAILED);
+ }
+
+ private void failAttempt(MockTaskAttemptImpl taskAttempt, int index, int expectedIncompleteAttempts) {
+ InputReadErrorEvent mockReEvent = InputReadErrorEvent.create("", 0, index);
+ TezTaskAttemptID mockDestId = mock(TezTaskAttemptID.class);
+ EventMetaData meta = new EventMetaData(EventProducerConsumerType.INPUT, "Vertex", "Edge", mockDestId);
+ TezEvent tzEvent = new TezEvent(mockReEvent, meta);
+ TaskAttemptEventOutputFailed outputFailedEvent =
+ new TaskAttemptEventOutputFailed(mockDestId, tzEvent, 1);
+ taskAttempt.handle(
+ outputFailedEvent);
+ TaskEvent tEventFail1 = new TaskEventTAFailed(taskAttempt.getID(), TaskFailureType.NON_FATAL, outputFailedEvent);
+ mockTask.handle(tEventFail1);
+ assertEquals("Unexpected number of incomplete attempts!",
+ expectedIncompleteAttempts, mockTask.getUncompletedAttemptsCount());
+ }
+
// TODO Add test to validate the correct commit attempt.
@@ -1053,7 +1187,12 @@ public class TestTaskImpl {
appContext, isRescheduled, resource, containerContext, false, null,
locationHint, mockTaskSpec, schedCausalTA);
}
-
+
+ @Override
+ protected Vertex getVertex() {
+ return mockVertex;
+ }
+
@Override
public float getProgress() {
return progress;