You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2013/12/05 02:14:02 UTC
git commit: TEZ-660. Successful TaskAttempts belonging to
LeafVertices should not be marked as KILLED in case of NodeFailure. (sseth)
case
Updated Branches:
refs/heads/master af170946e -> 47e897922
TEZ-660. Successful TaskAttempts belonging to LeafVertices should not be
marked as KILLED in case of NodeFailure. (sseth)
case
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/47e89792
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/47e89792
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/47e89792
Branch: refs/heads/master
Commit: 47e897922c63046fb56088420755bd1b857e5451
Parents: af17094
Author: Siddharth Seth <ss...@apache.org>
Authored: Wed Dec 4 17:12:35 2013 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Wed Dec 4 17:12:35 2013 -0800
----------------------------------------------------------------------
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 30 ++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 6 +-
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 198 ++++++++++++++++++-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 2 +-
4 files changed, 217 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index d6e9d0b..28b8809 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -144,6 +144,7 @@ public class TaskAttemptImpl implements TaskAttempt,
protected final boolean isRescheduled;
private final Resource taskResource;
private final ContainerContext containerContext;
+ private final boolean leafVertex;
protected static final FailedTransitionHelper FAILED_HELPER =
new FailedTransitionHelper();
@@ -154,6 +155,9 @@ public class TaskAttemptImpl implements TaskAttempt,
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION =
new DiagnosticInformationUpdater();
+
+ private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
+ TERMINATED_AFTER_SUCCESS_HELPER = new TerminatedAfterSuccessHelper(KILLED_HELPER);
private static SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent>
STATUS_UPDATER = new StatusUpdaterTransition();
@@ -224,8 +228,8 @@ public class TaskAttemptImpl implements TaskAttempt,
// How will duplicate history events be handled ?
// TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition(KILLED_HELPER))
- .addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_KILL_REQUEST, new TerminatedAfterSuccessTransition())
+ .addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_NODE_FAILED, new TerminatedAfterSuccessTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.SUCCEEDED), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED, EnumSet.of(TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED))
@@ -239,7 +243,7 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptListener tal, Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
TaskLocationHint locationHint, boolean isRescheduled,
- Resource resource, ContainerContext containerContext) {
+ Resource resource, ContainerContext containerContext, boolean leafVertex) {
ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
this.readLock = rwLock.readLock();
this.writeLock = rwLock.writeLock();
@@ -258,6 +262,7 @@ public class TaskAttemptImpl implements TaskAttempt,
this.isRescheduled = isRescheduled;
this.taskResource = resource;
this.containerContext = containerContext;
+ this.leafVertex = leafVertex;
}
@@ -1128,7 +1133,7 @@ public class TaskAttemptImpl implements TaskAttempt,
}
- protected static class TerminatedAfterSuccessTransition extends
+ protected static class TerminatedAfterSuccessHelper extends
TerminatedBeforeRunningTransition {
@Override
@@ -1138,7 +1143,7 @@ public class TaskAttemptImpl implements TaskAttempt,
return false;
}
- public TerminatedAfterSuccessTransition(TerminatedTransitionHelper helper) {
+ public TerminatedAfterSuccessHelper(TerminatedTransitionHelper helper) {
super(helper);
}
@@ -1150,6 +1155,18 @@ public class TaskAttemptImpl implements TaskAttempt,
}
+ protected static class TerminatedAfterSuccessTransition implements
+ MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
+ @Override
+ public TaskAttemptStateInternal transition(TaskAttemptImpl attempt, TaskAttemptEvent event) {
+ if (attempt.leafVertex) {
+ return TaskAttemptStateInternal.SUCCEEDED;
+ }
+ TaskAttemptImpl.TERMINATED_AFTER_SUCCESS_HELPER.transition(attempt, event);
+ return TaskAttemptStateInternal.KILLED;
+ }
+ }
+
protected static class OutputReportedFailedTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@@ -1185,8 +1202,9 @@ public class TaskAttemptImpl implements TaskAttempt,
String message = attempt.getID() + " being failed for too many output errors";
LOG.info(message);
attempt.addDiagnosticInfo(message);
+ // Not checking for leafVertex since a READ_ERROR should only be reported for intermediate tasks.
if (attempt.getInternalState() == TaskAttemptStateInternal.SUCCEEDED) {
- (new TerminatedAfterSuccessTransition(FAILED_HELPER)).transition(
+ (new TerminatedAfterSuccessHelper(FAILED_HELPER)).transition(
attempt, event);
return TaskAttemptStateInternal.FAILED;
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index 30e6e2a..416f9f8 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -114,7 +114,6 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final List<TezEvent> EMPTY_TASK_ATTEMPT_TEZ_EVENTS =
new ArrayList(0);
-
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
private int numberUncompletedAttempts = 0;
@@ -677,7 +676,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
TaskAttemptImpl createAttempt(int attemptNumber) {
return new TaskAttemptImpl(getTaskId(), attemptNumber, eventHandler,
taskAttemptListener, conf, clock, taskHeartbeatHandler, appContext,
- locationHint, (failedAttempts > 0), taskResource, containerContext);
+ locationHint, (failedAttempts > 0), taskResource, containerContext, leafVertex);
}
protected TaskAttempt getSuccessfulAttempt() {
@@ -1083,7 +1082,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
if (task.leafVertex) {
- LOG.error("Unexpected event for task of leaf vertex " + event.getType());
+ LOG.error("Unexpected event for task of leaf vertex " + event.getType() + ", taskId: "
+ + task.getTaskId());
task.internalError(event.getType());
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index dcf9aef..97aee2b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -71,6 +71,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventKillRequest;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
@@ -89,6 +90,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+import org.mortbay.log.Log;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class TestTaskAttempt {
@@ -134,7 +136,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
mock(TaskAttemptListener.class), new Configuration(), new SystemClock(),
mock(TaskHeartbeatHandler.class), mock(AppContext.class),
- locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext());
+ locationHint, false, Resource.newInstance(1024, 1), createFakeContainerContext(), false);
TaskAttemptEventSchedule sEvent = mock(TaskAttemptEventSchedule.class);
@@ -178,7 +180,7 @@ public class TestTaskAttempt {
mock(TaskAttemptListener.class), new Configuration(),
new SystemClock(), mock(TaskHeartbeatHandler.class),
mock(AppContext.class), locationHint, false, Resource.newInstance(1024,
- 1), createFakeContainerContext());
+ 1), createFakeContainerContext(), false);
TaskAttemptImpl spyTa = spy(taImpl);
when(spyTa.resolveHosts(hosts)).thenReturn(
@@ -324,7 +326,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), mockAppContext, locationHint, false,
- resource, createFakeContainerContext());
+ resource, createFakeContainerContext(), false);
NodeId nid = NodeId.newInstance("127.0.0.1", 0);
ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
@@ -387,7 +389,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
- resource, createFakeContainerContext());
+ resource, createFakeContainerContext(), false);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -478,7 +480,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
- resource, createFakeContainerContext());
+ resource, createFakeContainerContext(), false);
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
// At state STARTING.
@@ -540,7 +542,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
- resource, createFakeContainerContext());
+ resource, createFakeContainerContext(), false);
ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
@@ -583,6 +585,184 @@ public class TestTaskAttempt {
assertEquals(0, taImpl.getDiagnostics().size());
}
+ @Test
+ // Ensure node failure on Successful Non-Leaf tasks cause them to be marked as KILLED
+ public void testNodeFailedNonLeafVertex() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
+
+ MockEventHandler eventHandler = spy(new MockEventHandler());
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+ TaskLocationHint locationHint = new TaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AppContext appCtx = mock(AppContext.class);
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ appCtx);
+ containers.addContainerIfNew(container);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+ resource, createFakeContainerContext(), false);
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+ taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+ null));
+ assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING,
+ taImpl.getState());
+
+ int expectedEventsAtRunning = 3;
+ verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+
+ taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
+
+ assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED,
+ taImpl.getState());
+
+ assertEquals(0, taImpl.getDiagnostics().size());
+
+ int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
+ arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+
+ // Send out a Node Failure.
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+
+ // Verify one event to the Task informing it about FAILURE. No events to scheduler. Counter event.
+ int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 2;
+ arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture());
+ verifyEventType(
+ arg.getAllValues().subList(expectedEvenstAfterTerminating,
+ expectedEventsNodeFailure), TaskEventTAUpdate.class, 1);
+
+ // Verify still in KILLED state
+ assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED,
+ taImpl.getState());
+ }
+
+ @Test
+ // Ensure node failure on Successful Leaf tasks do not cause them to be marked as KILLED
+ public void testNodeFailedLeafVertex() throws Exception {
+ ApplicationId appId = ApplicationId.newInstance(1, 2);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 0);
+ TezDAGID dagID = TezDAGID.getInstance(appId, 1);
+ TezVertexID vertexID = TezVertexID.getInstance(dagID, 1);
+ TezTaskID taskID = TezTaskID.getInstance(vertexID, 1);
+ TezTaskAttemptID taskAttemptID = TezTaskAttemptID.getInstance(taskID, 0);
+
+ MockEventHandler eventHandler = spy(new MockEventHandler());
+ TaskAttemptListener taListener = mock(TaskAttemptListener.class);
+ when(taListener.getAddress()).thenReturn(
+ new InetSocketAddress("localhost", 0));
+
+ Configuration taskConf = new Configuration();
+ taskConf.setClass("fs.file.impl", StubbedFS.class, FileSystem.class);
+ taskConf.setBoolean("fs.file.impl.disable.cache", true);
+
+ TaskLocationHint locationHint = new TaskLocationHint(
+ new HashSet<String>(Arrays.asList(new String[] {"127.0.0.1"})), null);
+ Resource resource = Resource.newInstance(1024, 1);
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+
+ AppContext appCtx = mock(AppContext.class);
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ appCtx);
+ containers.addContainerIfNew(container);
+
+ doReturn(new ClusterInfo()).when(appCtx).getClusterInfo();
+ doReturn(containers).when(appCtx).getAllContainers();
+
+ TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
+ taListener, taskConf, new SystemClock(),
+ mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
+ resource, createFakeContainerContext(), true);
+
+ ArgumentCaptor<Event> arg = ArgumentCaptor.forClass(Event.class);
+
+ taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
+ // At state STARTING.
+ taImpl.handle(new TaskAttemptEventStartedRemotely(taskAttemptID, contId,
+ null));
+ assertEquals("Task attempt is not in the RUNNING state", TaskAttemptState.RUNNING,
+ taImpl.getState());
+
+ int expectedEventsAtRunning = 3;
+ verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
+
+ taImpl.handle(new TaskAttemptEvent(taskAttemptID, TaskAttemptEventType.TA_DONE));
+
+ assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED,
+ taImpl.getState());
+
+ assertEquals(0, taImpl.getDiagnostics().size());
+
+ int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
+ arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(expectedEvenstAfterTerminating)).handle(arg.capture());
+
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), TaskEventTAUpdate.class, 1);
+ verifyEventType(
+ arg.getAllValues().subList(expectedEventsAtRunning,
+ expectedEvenstAfterTerminating), AMSchedulerEventTAEnded.class, 1);
+
+ // Send out a Node Failure.
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+
+ // Verify no additional events
+ int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0;
+ arg = ArgumentCaptor.forClass(Event.class);
+ verify(eventHandler, times(expectedEventsNodeFailure)).handle(arg.capture());
+
+ // Verify still in SUCCEEDED state
+ assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED,
+ taImpl.getState());
+ }
@Test
// Verifies that multiple TooManyFetchFailures are handled correctly by the
@@ -629,7 +809,7 @@ public class TestTaskAttempt {
TaskAttemptImpl taImpl = new MockTaskAttemptImpl(taskID, 1, eventHandler,
taListener, taskConf, new SystemClock(),
mock(TaskHeartbeatHandler.class), appCtx, locationHint, false,
- resource, createFakeContainerContext());
+ resource, createFakeContainerContext(), false);
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, null));
// At state STARTING.
@@ -721,10 +901,10 @@ public class TestTaskAttempt {
Configuration conf, Clock clock,
TaskHeartbeatHandler taskHeartbeatHandler, AppContext appContext,
TaskLocationHint locationHint, boolean isRescheduled,
- Resource resource, ContainerContext containerContext) {
+ Resource resource, ContainerContext containerContext, boolean leafVertex) {
super(taskId, attemptNumber, eventHandler, tal, conf,
clock, taskHeartbeatHandler, appContext,
- locationHint, isRescheduled, resource, containerContext);
+ locationHint, isRescheduled, resource, containerContext, leafVertex);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/47e89792/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 4cdc2fd..7dd6df0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -539,7 +539,7 @@ public class TestTaskImpl {
TaskLocationHint locationHing, boolean isRescheduled,
Resource resource, ContainerContext containerContext) {
super(taskId, attemptNumber, eventHandler, tal, conf, clock, thh,
- appContext, locationHing, isRescheduled, resource, containerContext);
+ appContext, locationHing, isRescheduled, resource, containerContext, false);
}
@Override