You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/23 01:33:35 UTC
[1/4] tez git commit: TEZ-1773. Add attempt failure cause enum to the
attempt failed/killed history record (bikaS) (cherry picked from commit
81eef37d9e1e9222ef09eed319c45cdcd9034cd8)
Repository: tez
Updated Branches:
refs/heads/branch-0.5 3e5991a5c -> 3b8a480c1
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index 0b59e79..eba8119 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -63,6 +63,7 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -282,7 +283,8 @@ public class TestTaskRecovery {
restoreFromTaskStartEvent();
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.KILLED, "", new TezCounters()));
+ 0L, 0L, TaskAttemptState.KILLED,
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT,"", new TezCounters()));
task.handle(new TaskEventRecoverTask(task.getTaskId()));
// wait for the second task attempt is scheduled
dispatcher.await();
@@ -303,7 +305,8 @@ public class TestTaskRecovery {
restoreFromTaskStartEvent();
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.FAILED, "", new TezCounters()));
+ 0L, 0L, TaskAttemptState.FAILED,
+ TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,"", new TezCounters()));
task.handle(new TaskEventRecoverTask(task.getTaskId()));
// wait for the second task attempt is scheduled
dispatcher.await();
@@ -325,7 +328,7 @@ public class TestTaskRecovery {
TezTaskAttemptID taId = getNewTaskAttemptID(task.getTaskId());
try {
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- 0L, 0L, TaskAttemptState.SUCCEEDED, "", new TezCounters()));
+ 0L, 0L, TaskAttemptState.SUCCEEDED, null ,"", new TezCounters()));
fail("Should fail due to no TaskAttemptStartedEvent but with TaskAttemptFinishedEvent(Succeeded)");
} catch (TezUncheckedException e) {
assertTrue(e.getMessage().contains("Could not find task attempt when trying to recover"));
@@ -367,8 +370,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -399,8 +402,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.FAILED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -432,8 +435,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -467,8 +470,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -509,8 +512,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -521,8 +524,8 @@ public class TestTaskRecovery {
// it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure.
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.FAILED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -556,8 +559,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -568,8 +571,8 @@ public class TestTaskRecovery {
// it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure.
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -607,8 +610,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -647,8 +650,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -728,8 +731,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(TaskAttemptStateInternal.NEW,
((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -770,7 +773,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", ""));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.KILLED, "", null));
+ 0, TaskAttemptState.KILLED, null, "", null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(0, task.failedAttempts);
@@ -800,7 +803,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", ""));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, "", null));
+ 0, TaskAttemptState.FAILED, null, "", null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -830,7 +833,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", ""));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, "", null));
+ 0, TaskAttemptState.FAILED, null, "", null));
}
assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
assertEquals(maxFailedAttempts - 1, task.failedAttempts);
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index e868100..f391508 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -57,6 +57,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -102,12 +105,15 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
@@ -116,6 +122,8 @@ import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -134,10 +142,13 @@ import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.history.DAGHistoryEvent;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -155,6 +166,7 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.EdgeManagerForTest;
import org.apache.tez.test.VertexManagerPluginForTest;
@@ -2945,7 +2957,113 @@ public class TestVertexImpl {
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.abortCounter);
}
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexTaskAttemptProcessorFailure() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex1");
+
+ startVertex(v);
+ TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+ ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appContext);
+ containers.addContainerIfNew(container);
+ doReturn(containers).when(appContext).getAllContainers();
+
+ ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+ Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+ new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+ EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID())))));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause());
+ }
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexTaskAttemptInputFailure() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex1");
+
+ startVertex(v);
+ TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+ ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appContext);
+ containers.addContainerIfNew(container);
+ doReturn(containers).when(appContext).getAllContainers();
+
+ ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+ Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+ new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+ EventProducerConsumerType.INPUT, v.getName(), null, ta.getID())))));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexTaskAttemptOutputFailure() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex1");
+
+ startVertex(v);
+ TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+ ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appContext);
+ containers.addContainerIfNew(container);
+ doReturn(containers).when(appContext).getAllContainers();
+
+ ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+ Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+ new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+ EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID())))));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause());
+ }
+
@Test(timeout = 5000)
public void testSourceVertexStartHandling() {
LOG.info("Testing testSourceVertexStartHandling");
@@ -2962,21 +3080,6 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testCounters() {
- // FIXME need to test counters at vertex level
- }
-
- @Test(timeout = 5000)
- public void testDiagnostics() {
- // FIXME need to test diagnostics in various cases
- }
-
- @Test(timeout = 5000)
- public void testTaskAttemptCompletionEvents() {
- // FIXME need to test handling of task attempt events
- }
-
- @Test(timeout = 5000)
public void testSourceTaskAttemptCompletionEvents() {
LOG.info("Testing testSourceTaskAttemptCompletionEvents");
initAllVertices(VertexState.INITED);
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 4ec1916..d2dece3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.junit.Assert;
import org.junit.Before;
@@ -179,6 +180,8 @@ public class TestTaskSchedulerEventHandler {
Assert.assertEquals("Container preempted externally. Container preempted by RM.",
completedEvent.getDiagnostics());
Assert.assertTrue(completedEvent.isPreempted());
+ Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
+ completedEvent.getTerminationCause());
Assert.assertFalse(completedEvent.isDiskFailed());
schedulerHandler.stop();
@@ -186,6 +189,31 @@ public class TestTaskSchedulerEventHandler {
}
@Test (timeout = 5000)
+ public void testContainerInternalPreempted() throws IOException {
+ Configuration conf = new Configuration(false);
+ schedulerHandler.init(conf);
+ schedulerHandler.start();
+
+ ContainerId mockCId = mock(ContainerId.class);
+ verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
+ schedulerHandler.preemptContainer(mockCId);
+ verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
+ Assert.assertEquals(1, mockEventHandler.events.size());
+ Event event = mockEventHandler.events.get(0);
+ Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+ AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
+ Assert.assertEquals(mockCId, completedEvent.getContainerId());
+ Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
+ Assert.assertFalse(completedEvent.isPreempted());
+ Assert.assertFalse(completedEvent.isDiskFailed());
+ Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+ completedEvent.getTerminationCause());
+
+ schedulerHandler.stop();
+ schedulerHandler.close();
+ }
+
+ @Test (timeout = 5000)
public void testContainerDiskFailed() throws IOException {
Configuration conf = new Configuration(false);
schedulerHandler.init(conf);
@@ -211,6 +239,8 @@ public class TestTaskSchedulerEventHandler {
completedEvent.getDiagnostics());
Assert.assertFalse(completedEvent.isPreempted());
Assert.assertTrue(completedEvent.isDiskFailed());
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+ completedEvent.getTerminationCause());
schedulerHandler.stop();
schedulerHandler.close();
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index c0be044..f273896 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -67,17 +67,21 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -87,9 +91,7 @@ import com.google.common.collect.Maps;
public class TestAMContainer {
-
-
- @Test
+ @Test (timeout=5000)
// Assign before launch.
public void tetSingleSuccessfulTaskFlow() {
WrappedContainer wc = new WrappedContainer();
@@ -135,7 +137,7 @@ public class TestAMContainer {
assertNull(wc.amContainer.getRunningTaskAttempt());
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -146,7 +148,7 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
// Assign after launch.
public void testSingleSuccessfulTaskFlow2() {
WrappedContainer wc = new WrappedContainer();
@@ -191,7 +193,7 @@ public class TestAMContainer {
assertNull(wc.amContainer.getRunningTaskAttempt());
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -202,7 +204,7 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testSingleSuccessfulTaskFlowStopRequest() {
WrappedContainer wc = new WrappedContainer();
@@ -225,7 +227,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.STOPPING);
wc.verifyNoOutgoingEvents();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -238,7 +240,7 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testSingleSuccessfulTaskFlowFailedNMStopRequest() {
WrappedContainer wc = new WrappedContainer();
@@ -264,7 +266,7 @@ public class TestAMContainer {
assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -278,7 +280,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testMultipleAllocationsAtIdle() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -303,7 +305,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -317,7 +319,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testAllocationAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -343,7 +345,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -357,7 +359,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testMultipleAllocationsAtLaunching() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -382,7 +384,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -396,7 +398,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerTimedOutAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -418,7 +420,7 @@ public class TestAMContainer {
NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
// TODO Should this be an RM DE-ALLOCATE instead ?
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -432,7 +434,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testLaunchFailure() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -449,22 +451,28 @@ public class TestAMContainer {
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+ for (Event e : outgoingEvents) {
+ if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) {
+ Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
+ ((TaskAttemptEventContainerTerminating)e).getTerminationCause());
+ }
+ }
- wc.containerCompleted(false);
+ wc.containerCompleted();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-
+
// Valid transition. Container complete, but not with an error.
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testContainerCompletedAtAllocated() {
WrappedContainer wc = new WrappedContainer();
wc.verifyState(AMContainerState.ALLOCATED);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -472,7 +480,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
// Verify that incoming NM launched events to COMPLETED containers are
// handled.
public void testContainerCompletedAtLaunching() {
@@ -484,7 +492,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -492,6 +500,8 @@ public class TestAMContainer {
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
+ ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
assertFalse(wc.amContainer.isInErrorState());
@@ -501,9 +511,71 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
+
+ @SuppressWarnings("rawtypes")
+ @Test (timeout=5000)
+ public void testContainerCompletedAtLaunchingSpecificClusterError() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+
+ wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Container launched generated by NM call.
+ wc.containerLaunched();
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
+ public void testContainerCompletedAtLaunchingSpecificError() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+
+ wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED,
+ ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Container launched generated by NM call.
+ wc.containerLaunched();
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test (timeout=5000)
public void testContainerCompletedAtIdle() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -514,7 +586,7 @@ public class TestAMContainer {
wc.containerLaunched();
wc.verifyState(AMContainerState.IDLE);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -538,7 +610,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerCompletedAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -550,7 +622,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -574,7 +646,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerPreemptedAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -586,7 +658,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(ContainerExitStatus.PREEMPTED);
+ wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -594,6 +666,8 @@ public class TestAMContainer {
verify(wc.chh).unregister(wc.containerID);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
@@ -608,9 +682,47 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
+
+ @SuppressWarnings("rawtypes")
+ @Test (timeout=5000)
+ public void testContainerInternallyPreemptedAtRunning() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.containerLaunched();
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+
+ wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).register(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+ ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Pending task complete. (Ideally, container should be dead at this point
+ // and this event should not be generated. Network timeout on NM-RM heartbeat
+ // can cause it to be genreated)
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+ wc.verifyNoOutgoingEvents();
+ wc.verifyHistoryStopEvent();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerDiskFailedAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -622,7 +734,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(ContainerExitStatus.DISKS_FAILED);
+ wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -630,6 +742,8 @@ public class TestAMContainer {
verify(wc.chh).unregister(wc.containerID);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
@@ -646,7 +760,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testTaskAssignedToCompletedContainer() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -657,7 +771,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.taskAttemptSucceeded(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
@@ -677,7 +791,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testTaskPullAtLaunching() {
WrappedContainer wc = new WrappedContainer();
@@ -690,7 +804,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtIdle() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -712,11 +826,11 @@ public class TestAMContainer {
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
- assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
}
}
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -726,7 +840,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtIdleMultipleAttempts() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -756,13 +870,13 @@ public class TestAMContainer {
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
- assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
}
}
assertFalse(wc.amContainer.isInErrorState());
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyNoOutgoingEvents();
wc.verifyHistoryStopEvent();
@@ -772,7 +886,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtRunningMultipleAttempts() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -801,11 +915,11 @@ public class TestAMContainer {
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
- assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
}
}
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -818,7 +932,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtCompletedMultipleSuccessfulTAs() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -835,7 +949,7 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID2);
wc.stopRequest();
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
wc.nodeFailed();
@@ -849,7 +963,7 @@ public class TestAMContainer {
assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
}
- @Test
+ @Test (timeout=5000)
public void testDuplicateCompletedEvents() {
WrappedContainer wc = new WrappedContainer();
@@ -865,17 +979,17 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID2);
wc.stopRequest();
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyNoOutgoingEvents();
wc.verifyHistoryStopEvent();
}
- @Test
+ @Test (timeout=5000)
public void testLocalResourceAddition() {
WrappedContainer wc = new WrappedContainer();
@@ -926,13 +1040,13 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID3);
// Verify references are cleared after a container completes.
- wc.containerCompleted(false);
+ wc.containerCompleted();
assertNull(wc.amContainer.containerLocalResources);
assertNull(wc.amContainer.additionalLocalResources);
}
@SuppressWarnings("unchecked")
- @Test
+ @Test (timeout=5000)
public void testCredentialsTransfer() {
WrappedContainerMultipleDAGs wc = new WrappedContainerMultipleDAGs();
@@ -1183,15 +1297,15 @@ public class TestAMContainer {
AMContainerEventType.C_NM_STOP_FAILED));
}
- public void containerCompleted(boolean preempted) {
+ public void containerCompleted() {
reset(eventHandler);
- amContainer.handle(new AMContainerEventCompleted(containerID,
- (preempted ? ContainerExitStatus.PREEMPTED : ContainerExitStatus.SUCCESS), null));
+ amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null,
+ TaskAttemptTerminationCause.CONTAINER_EXITED));
}
- public void containerCompleted(int exitStatus) {
+ public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) {
reset(eventHandler);
- amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null));
+ amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause));
}
public void containerTimedOut() {
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index bc0a642..0c450ab 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -45,6 +46,7 @@ import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -472,7 +474,7 @@ public class TestHistoryEventsProtoConversion {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- null, null);
+ null, null, null);
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -492,7 +494,7 @@ public class TestHistoryEventsProtoConversion {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- "diagnose", new TezCounters());
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters());
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -505,6 +507,8 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getState());
Assert.assertEquals(event.getCounters(),
deserializedEvent.getCounters());
+ Assert.assertEquals(event.getTaskAttemptError(),
+ deserializedEvent.getTaskAttemptError());
logEvents(event, deserializedEvent);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index d9e1a38..4349740 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -155,7 +156,7 @@ public class TestHistoryEventJsonConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.FAILED, null, null);
+ random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 3e355e6..f353890 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -393,6 +393,9 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+ if (event.getTaskAttemptError() != null) {
+ atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+ }
atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
atsEntity.addOtherInfo(ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getCounters()));
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index f32704e..4023c1c 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -66,6 +66,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -160,7 +161,7 @@ public class TestHistoryEventTimelineConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.FAILED, null, null);
+ random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
@@ -436,11 +437,13 @@ public class TestHistoryEventTimelineConversion {
long finishTime = startTime + 1234;
TaskAttemptState state = TaskAttemptState
.values()[random.nextInt(TaskAttemptState.values().length)];
+ TaskAttemptTerminationCause error = TaskAttemptTerminationCause
+ .values()[random.nextInt(TaskAttemptTerminationCause.values().length)];
String diagnostics = "random diagnostics message";
TezCounters counters = new TezCounters();
TaskAttemptFinishedEvent event = new TaskAttemptFinishedEvent(tezTaskAttemptID, vertexName,
- startTime, finishTime, state, diagnostics, counters);
+ startTime, finishTime, state, error, diagnostics, counters);
TimelineEntity timelineEntity = HistoryEventTimelineConversion.convertToTimelineEntity(event);
Assert.assertEquals(tezTaskAttemptID.toString(), timelineEntity.getEntityId());
Assert.assertEquals(EntityTypes.TEZ_TASK_ATTEMPT_ID.name(), timelineEntity.getEntityType());
@@ -467,6 +470,7 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
+ Assert.assertEquals(error.name(), otherInfo.get(ATSConstants.TASK_ATTEMPT_ERROR_ENUM));
Assert.assertEquals(diagnostics, otherInfo.get(ATSConstants.DIAGNOSTICS));
Assert.assertTrue(otherInfo.containsKey(ATSConstants.COUNTERS));
}
[3/4] tez git commit: TEZ-1929. pre-empted tasks should be marked as
killed instead of failed (bikas) (cherry picked from commit
6ba1339d57a5d05fa14f35f352076131bffea483) (cherry picked from commit
8e5d18e63e6c98aa4e89f568d159b3c1903b1dae)
Posted by bi...@apache.org.
TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas)
(cherry picked from commit 6ba1339d57a5d05fa14f35f352076131bffea483)
(cherry picked from commit 8e5d18e63e6c98aa4e89f568d159b3c1903b1dae)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c30d261
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c30d261
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c30d261
Branch: refs/heads/branch-0.5
Commit: 9c30d2612aba8e280bc2b84d0a3e925a8f9baf5f
Parents: 84a7ef0
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 16:40:11 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 22 14:46:20 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../rm/container/AMContainerEventCompleted.java | 5 +--
.../dag/app/rm/container/AMContainerImpl.java | 4 +--
.../tez/dag/app/TestMockDAGAppMaster.java | 37 ++++++++++++++++++++
.../app/rm/TestTaskSchedulerEventHandler.java | 2 +-
.../dag/app/rm/container/TestAMContainer.java | 6 ++--
6 files changed, 47 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cbc6751..986ab9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-1929. pre-empted tasks should be marked as killed instead of failed
TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
history record
TEZ-2203. Intern strings in tez counters
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index a455f1e..9bb6d7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -37,14 +37,15 @@ public class AMContainerEventCompleted extends AMContainerEvent {
}
public boolean isPreempted() {
- return (exitStatus == ContainerExitStatus.PREEMPTED);
+ return (exitStatus == ContainerExitStatus.PREEMPTED ||
+ errCause == TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
}
public boolean isDiskFailed() {
return (exitStatus == ContainerExitStatus.DISKS_FAILED);
}
- public boolean isClusterAction() {
+ public boolean isSystemAction() {
return isPreempted() || isDiskFailed();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d4f46b..3dc4bec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -662,7 +662,7 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
- if (event.isClusterAction()) {
+ if (event.isSystemAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
errorMessage, event.getTerminationCause());
} else {
@@ -921,7 +921,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
- if (event.isClusterAction()) {
+ if (event.isSystemAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
getMessage(container, event), event.getTerminationCause());
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 8650aea..16f0f7e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -35,10 +35,16 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
@@ -102,6 +108,37 @@ public class TestMockDAGAppMaster {
tezClient.stop();
}
+ @Test (timeout = 5000)
+ public void testInternalPreemption() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+ // there is only 1 task whose first attempt will be preempted
+ DAG dag = DAG.create("test");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
+ dag.addVertex(vA);
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ mockLauncher.waitTillContainersLaunched();
+ ContainerData cData = mockLauncher.getContainers().values().iterator().next();
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId);
+
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+ TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+ TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId);
+ Assert.assertEquals(TaskAttemptState.KILLED, killedTa.getState());
+ tezClient.stop();
+ }
+
@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
Map<String, LocalResource> lrDAG = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index d2dece3..62618cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -204,7 +204,7 @@ public class TestTaskSchedulerEventHandler {
AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
Assert.assertEquals(mockCId, completedEvent.getContainerId());
Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
- Assert.assertFalse(completedEvent.isPreempted());
+ Assert.assertTrue(completedEvent.isPreempted());
Assert.assertFalse(completedEvent.isDiskFailed());
Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
completedEvent.getTerminationCause());
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index f273896..438c50d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -704,10 +704,10 @@ public class TestAMContainer {
verify(wc.chh).unregister(wc.containerID);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
- Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
- ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
assertFalse(wc.amContainer.isInErrorState());
[4/4] tez git commit: TEZ-1773. Addendum for missed test update
Posted by bi...@apache.org.
TEZ-1773. Addendum for missed test update
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/3b8a480c
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/3b8a480c
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/3b8a480c
Branch: refs/heads/branch-0.5
Commit: 3b8a480c169973359ce6b2163f842a9cc644dcf3
Parents: 9c30d26
Author: Bikas Saha <bi...@apache.org>
Authored: Tue Sep 22 15:29:44 2015 -0700
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 22 15:29:44 2015 -0700
----------------------------------------------------------------------
.../history/logging/ats/TestHistoryEventTimelineConversion.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/3b8a480c/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index 4023c1c..28e310a 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -466,7 +466,7 @@ public class TestHistoryEventTimelineConversion {
Assert.assertEquals(finishTime, evt.getTimestamp());
final Map<String, Object> otherInfo = timelineEntity.getOtherInfo();
- Assert.assertEquals(5, otherInfo.size());
+ Assert.assertEquals(6, otherInfo.size());
Assert.assertEquals(finishTime, otherInfo.get(ATSConstants.FINISH_TIME));
Assert.assertEquals(finishTime - startTime, otherInfo.get(ATSConstants.TIME_TAKEN));
Assert.assertEquals(state.name(), otherInfo.get(ATSConstants.STATUS));
[2/4] tez git commit: TEZ-1773. Add attempt failure cause enum to the
attempt failed/killed history record (bikaS) (cherry picked from commit
81eef37d9e1e9222ef09eed319c45cdcd9034cd8)
Posted by bi...@apache.org.
TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history record (bikaS)
(cherry picked from commit 81eef37d9e1e9222ef09eed319c45cdcd9034cd8)
Conflicts:
CHANGES.txt
tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/84a7ef05
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/84a7ef05
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/84a7ef05
Branch: refs/heads/branch-0.5
Commit: 84a7ef0516cdcc0cd5a4d480ea6f235de4ad9db9
Parents: 3e5991a
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Nov 24 11:15:44 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 22 14:37:17 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tez/common/ATSConstants.java | 1 +
.../records/TaskAttemptTerminationCause.java | 45 ++++
.../tez/dag/app/TaskHeartbeatHandler.java | 3 +-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 2 +
.../event/TaskAttemptEventAttemptFailed.java | 13 +-
.../TaskAttemptEventContainerTerminated.java | 13 +-
...AttemptEventContainerTerminatedBySystem.java | 13 +-
.../TaskAttemptEventContainerTerminating.java | 12 +-
.../dag/event/TaskAttemptEventKillRequest.java | 13 +-
.../dag/event/TaskAttemptEventNodeFailed.java | 12 +-
.../dag/event/TaskAttemptEventOutputFailed.java | 9 +-
.../TaskAttemptEventTerminationCauseEvent.java | 26 +++
.../dag/app/dag/event/TaskAttemptEventType.java | 3 +-
.../dag/app/dag/event/TaskEventTermination.java | 28 ++-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 39 +++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 33 ++-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 36 ++-
.../app/launcher/LocalContainerLauncher.java | 9 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 14 +-
.../rm/container/AMContainerEventCompleted.java | 13 +-
.../dag/app/rm/container/AMContainerImpl.java | 96 ++++----
.../events/TaskAttemptFinishedEvent.java | 20 +-
.../impl/HistoryEventJsonConversion.java | 5 +-
tez-dag/src/main/proto/HistoryEvents.proto | 1 +
.../org/apache/tez/dag/app/TestPreemption.java | 2 +
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 43 +++-
.../app/dag/impl/TestTaskAttemptRecovery.java | 15 +-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 15 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 59 ++---
.../tez/dag/app/dag/impl/TestVertexImpl.java | 133 +++++++++--
.../app/rm/TestTaskSchedulerEventHandler.java | 30 +++
.../dag/app/rm/container/TestAMContainer.java | 230 ++++++++++++++-----
.../TestHistoryEventsProtoConversion.java | 8 +-
.../impl/TestHistoryEventJsonConversion.java | 3 +-
.../ats/HistoryEventTimelineConversion.java | 3 +
.../ats/TestHistoryEventTimelineConversion.java | 8 +-
37 files changed, 777 insertions(+), 233 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0646dfc..cbc6751 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,8 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
+ history record
TEZ-2203. Intern strings in tez counters
TEZ-2834. Make Tez preemption resilient to incorrect free resource reported
by YARN
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index aec6ca5..944ae87 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -63,6 +63,7 @@ public class ATSConstants {
public static final String FINISH_TIME = "endTime";
public static final String TIME_TAKEN = "timeTaken";
public static final String STATUS = "status";
+ public static final String TASK_ATTEMPT_ERROR_ENUM = "taskAttemptErrorEnum";
public static final String DIAGNOSTICS = "diagnostics";
public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";
public static final String COUNTERS = "counters";
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
new file mode 100644
index 0000000..ef0bb33
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public enum TaskAttemptTerminationCause {
+ UNKNOWN_ERROR, // The error cause is unknown. Usually means a gap in error propagation
+
+ TERMINATED_BY_CLIENT, // Killed by client command
+ TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
+ INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work
+ EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work
+ TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because original succeeded
+ TERMINATED_EFFECTIVE_SPECULATION, // Killed original attempt because speculation succeeded
+ TERMINATED_ORPHANED, // Attempt is no longer needed by the task
+
+ APPLICATION_ERROR, // Failed due to application code error
+ FRAMEWORK_ERROR, // Failed due to code error in Tez code
+ INPUT_READ_ERROR, // Failed due to error in reading inputs
+ OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs
+ OUTPUT_LOST, // Failed because attempts output were reported lost
+ TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
+
+ CONTAINER_LAUNCH_FAILED, // Failed to launch container
+ CONTAINER_EXITED, // Container exited. Indicates gap in specific error propagation from the cluster
+ CONTAINER_STOPPED, // Container stopped or released by Tez
+ NODE_FAILED, // Node for the container failed
+ NODE_DISK_ERROR, // Disk failed on the node runnign the task
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 6b698aa..d115b14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -60,6 +61,6 @@ public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID>
protected void handleTimeOut(TezTaskAttemptID attemptId) {
eventHandler.handle(new TaskAttemptEventAttemptFailed(attemptId,
TaskAttemptEventType.TA_TIMED_OUT, "AttemptID:" + attemptId.toString()
- + " Timed out after " + timeOut / 1000 + " secs"));
+ + " Timed out after " + timeOut / 1000 + " secs", TaskAttemptTerminationCause.TASK_HEARTBEAT_ERROR));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index f30fc5c..246324d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -69,6 +70,7 @@ public interface TaskAttempt {
TaskAttemptReport getReport();
List<String> getDiagnostics();
+ TaskAttemptTerminationCause getTerminationCause();
TezCounters getCounters();
float getProgress();
TaskAttemptState getState();
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index 5c7b956..b9c1d09 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -18,16 +18,19 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String diagnostics;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
- TaskAttemptEventType type, String diagnostics) {
+ TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) {
super(id, type);
this.diagnostics = diagnostics;
+ this.errorCause = errorCause;
}
@Override
@@ -35,5 +38,9 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
return diagnostics;
}
-
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
index 87aa313..5dd0141 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
@@ -17,20 +17,29 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message) {
+ public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message,
+ TaskAttemptTerminationCause errCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
this.message = message;
+ this.errorCause = errCause;
}
@Override
public String getDiagnosticInfo() {
return message;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
index a92aafd..a3c57e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -18,19 +18,28 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminatedBySystem extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String diagnostics;
- public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics) {
+ private final TaskAttemptTerminationCause errorCause;
+ public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics,
+ TaskAttemptTerminationCause errorCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
this.diagnostics = diagnostics;
+ this.errorCause = errorCause;
}
@Override
public String getDiagnosticInfo() {
return diagnostics;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
index 7da6e14..02d04a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
@@ -17,17 +17,20 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventContainerTerminating(TezTaskAttemptID id,
- String diagMessage) {
+ String diagMessage, TaskAttemptTerminationCause errCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
this.message = diagMessage;
+ this.errorCause = errCause;
}
@Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index 9bceb1d..985becd 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -17,19 +17,28 @@
*/
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
-public class TaskAttemptEventKillRequest extends TaskAttemptEvent {
+public class TaskAttemptEventKillRequest extends TaskAttemptEvent
+ implements TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message) {
+ public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, TaskAttemptTerminationCause err) {
super(id, TaskAttemptEventType.TA_KILL_REQUEST);
this.message = message;
+ this.errorCause = err;
}
public String getMessage() {
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
index 6d97466..541ef00 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
@@ -17,17 +17,20 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventNodeFailed extends TaskAttemptEvent
- implements DiagnosableEvent{
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventNodeFailed(TezTaskAttemptID id,
- String diagMessage) {
+ String diagMessage, TaskAttemptTerminationCause errorCause) {
super(id, TaskAttemptEventType.TA_NODE_FAILED);
this.message = diagMessage;
+ this.errorCause = errorCause;
}
@Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventNodeFailed extends TaskAttemptEvent
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
index 678e1e7..6bc110a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
@@ -18,10 +18,12 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TezEvent;
-public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
+public class TaskAttemptEventOutputFailed extends TaskAttemptEvent
+ implements TaskAttemptEventTerminationCauseEvent {
private TezEvent inputFailedEvent;
private int consumerTaskNumber;
@@ -40,5 +42,10 @@ public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
public int getConsumerTaskNumber() {
return consumerTaskNumber;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return TaskAttemptTerminationCause.OUTPUT_LOST;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
new file mode 100644
index 0000000..70c20e3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+
+public interface TaskAttemptEventTerminationCauseEvent {
+
+ public TaskAttemptTerminationCause getTerminationCause();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index e7db8d1..cae0b7d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -29,9 +29,8 @@ public enum TaskAttemptEventType {
//Producer: TaskAttemptListener
TA_STARTED_REMOTELY,
TA_STATUS_UPDATE,
- TA_DIAGNOSTICS_UPDATE,
TA_OUTPUT_CONSUMABLE, // TODO History event to indicate this ?
- TA_COMMIT_PENDING,
+ TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED
TA_DONE,
TA_FAILED,
TA_TIMED_OUT,
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
index 73d5744..d48a0bf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -18,22 +18,23 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskID;
-public class TaskEventTermination extends TaskEvent implements DiagnosableEvent{
+public class TaskEventTermination extends TaskEvent implements DiagnosableEvent,
+ TaskAttemptEventTerminationCauseEvent {
- private TaskTerminationCause terminationCause;
- private String diagnostics;
+ private final String diagnostics;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskEventTermination(TezTaskID taskID, TaskTerminationCause terminationCause) {
+ public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause errorCause, String diagnostics) {
super(taskID, TaskEventType.T_TERMINATE);
- this.terminationCause = terminationCause;
- this.diagnostics = "Task is terminated due to:" + terminationCause.name();
- }
-
- public TaskTerminationCause getTerminationCause() {
- return terminationCause;
+ this.errorCause = errorCause;
+ if (diagnostics != null) {
+ this.diagnostics = diagnostics;
+ } else {
+ this.diagnostics = "Task is terminated due to: " + errorCause.name();
+ }
}
@Override
@@ -41,4 +42,9 @@ public class TaskEventTermination extends TaskEvent implements DiagnosableEvent{
return diagnostics;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 1df07f8..f1bfefe 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
@@ -90,6 +91,7 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -124,6 +126,7 @@ public class TaskAttemptImpl implements TaskAttempt,
protected EventHandler eventHandler;
private final TezTaskAttemptID attemptId;
private final Clock clock;
+ private TaskAttemptTerminationCause terminationCause = TaskAttemptTerminationCause.UNKNOWN_ERROR;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
@@ -348,7 +351,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -371,7 +373,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -391,7 +392,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -412,7 +412,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
TaskAttemptEventType.TA_OUTPUT_CONSUMABLE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -569,6 +568,11 @@ public class TaskAttemptImpl implements TaskAttempt,
readLock.unlock();
}
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return terminationCause;
+ }
@Override
public TezCounters getCounters() {
@@ -815,6 +819,8 @@ public class TaskAttemptImpl implements TaskAttempt,
this.reportedStatus.counters = tEvent.getCounters();
this.reportedStatus.progress = 1f;
this.reportedStatus.state = tEvent.getState();
+ this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError()
+ : TaskAttemptTerminationCause.UNKNOWN_ERROR;
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
@@ -1032,8 +1038,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
- getFinishTime(), TaskAttemptState.SUCCEEDED, "",
- getCounters());
+ getFinishTime(), TaskAttemptState.SUCCEEDED, null,
+ "", getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1048,9 +1054,9 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
finishTime, state,
+ terminationCause,
StringUtils.join(
- getDiagnostics(), LINE_SEPARATOR),
- getCounters());
+ getDiagnostics(), LINE_SEPARATOR), getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1080,7 +1086,8 @@ public class TaskAttemptImpl implements TaskAttempt,
LOG.error(msg, e);
String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
- new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag));
+ new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag,
+ TaskAttemptTerminationCause.APPLICATION_ERROR));
return TaskAttemptStateInternal.FAILED;
}
// Create startTaskRequest
@@ -1162,6 +1169,13 @@ public class TaskAttemptImpl implements TaskAttempt,
if (event instanceof DiagnosableEvent) {
ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
}
+
+ // this should catch at test time if any new events are missing the error cause
+ assert event instanceof TaskAttemptEventTerminationCauseEvent;
+
+ if (event instanceof TaskAttemptEventTerminationCauseEvent) {
+ ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent) event).getTerminationCause());
+ }
ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
helper.getTaskAttemptState()));
@@ -1558,6 +1572,13 @@ public class TaskAttemptImpl implements TaskAttempt,
sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
}
}
+
+ private void trySetTerminationCause(TaskAttemptTerminationCause err) {
+ // keep only the first error cause
+ if (terminationCause == TaskAttemptTerminationCause.UNKNOWN_ERROR) {
+ terminationCause = err;
+ }
+ }
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index b6c7eb9..87ee55a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -85,6 +86,7 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -92,6 +94,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
@@ -742,7 +745,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (state != TaskState.RUNNING) {
LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID);
eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
- , "Task not running. Bad attempt."));
+ , "Task not running. Bad attempt.", TaskAttemptTerminationCause.TERMINATED_ORPHANED));
return false;
}
if (commitAttempt == null) {
@@ -1098,7 +1101,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
+ task.outputConsumableAttempt + " already has output ready");
}
task.eventHandler.handle(new TaskAttemptEventKillRequest(attemptId,
- "Alternate attemptId already serving output"));
+ "Alternate attemptId already serving output", TaskAttemptTerminationCause.UNKNOWN_ERROR));
}
}
@@ -1126,6 +1129,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
task.logJobHistoryTaskFinishedEvent();
+ TaskAttempt successfulAttempt = task.attempts.get(successTaId);
// issue kill to all other attempts
for (TaskAttempt attempt : task.attempts.values()) {
@@ -1134,9 +1138,21 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
// TA_KILL message to an attempt that doesn't need one for
// other reasons.
!attempt.isFinished()) {
- LOG.info("Issuing kill to other attempt " + attempt.getID());
+ LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
+ task.successfulAttempt + " has succeeded");
+ String diagnostics = null;
+ TaskAttemptTerminationCause errCause = null;
+ if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
+ diagnostics = "Killed this attempt as other speculative attempt : " + successTaId
+ + " succeeded";
+ errCause = TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION;
+ } else {
+ diagnostics = "Killed this speculative attempt as original attempt: " + successTaId
+ + " succeeded";
+ errCause = TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION;
+ }
task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
- .getID(), "Alternate attempt succeeded"));
+ .getID(), diagnostics, errCause));
}
}
// send notification to DAG scheduler
@@ -1509,14 +1525,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+ private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
if (commitAttempt != null && commitAttempt.equals(attempt)) {
LOG.info("Removing commit attempt: " + commitAttempt);
commitAttempt = null;
}
if (attempt != null && !attempt.isFinished()) {
- eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
- logMsg));
+ eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(), logMsg, errorCause));
}
}
@@ -1538,8 +1553,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
// issue kill to all non finished attempts
for (TaskAttempt attempt : task.attempts.values()) {
- task.killUnfinishedAttempt
- (attempt, "Task KILL is received. Killing attempt!");
+ task.killUnfinishedAttempt(attempt, "Task KILL is received. Killing attempt. Diagnostics: "
+ + terminateEvent.getDiagnosticInfo(), terminateEvent.getTerminationCause());
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index c27d869..e1f7a94 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -139,6 +139,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -1802,12 +1803,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
*/
void tryEnactKill(VertexTerminationCause trigger,
TaskTerminationCause taskterminationCause) {
+ // In most cases the dag is shutting down due to some error
+ TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN;
+ if (taskterminationCause == TaskTerminationCause.DAG_KILL) {
+ errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT;
+ }
if(trySetTerminationCause(trigger)){
- LOG.info("Killing tasks in vertex: " + logIdentifier + " due to trigger: "
- + trigger);
+ String msg = "Killing tasks in vertex: " + logIdentifier + " due to trigger: " + trigger;
+ LOG.info(msg);
for (Task task : tasks.values()) {
- eventHandler.handle(
- new TaskEventTermination(task.getTaskId(), taskterminationCause));
+ eventHandler.handle( // attempt was terminated because the vertex is shutting down
+ new TaskEventTermination(task.getTaskId(), errCause, msg));
}
}
}
@@ -3840,12 +3846,32 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case TASK_ATTEMPT_FAILED_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);
+ TaskAttemptTerminationCause errCause = null;
+ switch (sourceMeta.getEventGenerator()) {
+ case INPUT:
+ errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
+ break;
+ case PROCESSOR:
+ errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
+ break;
+ case OUTPUT:
+ errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
+ break;
+ case SYSTEM:
+ errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+ break;
+ default:
+ throw new TezUncheckedException("Unknown EventProducerConsumerType: " +
+ sourceMeta.getEventGenerator());
+ }
TaskAttemptFailedEvent taskFailedEvent =
(TaskAttemptFailedEvent) tezEvent.getEvent();
vertex.getEventHandler().handle(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED,
- "Error: " + taskFailedEvent.getDiagnostics()));
+ "Error: " + taskFailedEvent.getDiagnostics(),
+ errCause)
+ );
}
break;
default:
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index fae5d48..c56c93b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -67,6 +67,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.runtime.task.TezChild;
@@ -273,14 +274,14 @@ public class LocalContainerLauncher extends AbstractService implements
LOG.info("Container: " + containerId + " completed successfully");
appContext.getEventHandler().handle(
new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
- null));
+ null, TaskAttemptTerminationCause.CONTAINER_EXITED));
} else {
LOG.info("Container: " + containerId + " completed but with errors");
appContext.getEventHandler().handle(
new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
result.getErrorMessage() == null ?
(result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
- result.getErrorMessage()));
+ result.getErrorMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
}
}
@@ -295,13 +296,13 @@ public class LocalContainerLauncher extends AbstractService implements
appContext.getEventHandler()
.handle(new AMContainerEventCompleted(containerId,
TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
- t.getMessage()));
+ t.getMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
} else {
LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher");
appContext.getEventHandler()
.handle(new AMContainerEventCompleted(containerId,
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
- "CancellationException"));
+ "CancellationException", TaskAttemptTerminationCause.CONTAINER_EXITED));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 0b202ab..9b02578 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import com.google.common.base.Preconditions;
@@ -426,19 +427,22 @@ public class TaskSchedulerEventHandler extends AbstractService
// Inform the Containers about completion.
AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
if (amContainer != null) {
- String message = null;
+ String message = "Container completed. ";
+ TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
int exitStatus = containerStatus.getExitStatus();
if (exitStatus == ContainerExitStatus.PREEMPTED) {
message = "Container preempted externally. ";
+ errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
} else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
message = "Container disk failed. ";
- } else {
+ errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
+ } else if (exitStatus != ContainerExitStatus.SUCCESS){
message = "Container failed. ";
}
if (containerStatus.getDiagnostics() != null) {
message += containerStatus.getDiagnostics();
}
- sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message));
+ sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
}
}
@@ -554,8 +558,8 @@ public class TaskSchedulerEventHandler extends AbstractService
public void preemptContainer(ContainerId containerId) {
taskScheduler.deallocateContainer(containerId);
// Inform the Containers about completion.
- sendEvent(new AMContainerEventCompleted(containerId,
- ContainerExitStatus.PREEMPTED, "Container preempted internally"));
+ sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
+ "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
}
public void setShouldUnregisterFlag() {
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index e9649f3..a455f1e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,17 +20,20 @@ package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
public class AMContainerEventCompleted extends AMContainerEvent {
private final int exitStatus;
private final String diagnostics;
+ private final TaskAttemptTerminationCause errCause;
public AMContainerEventCompleted(ContainerId containerId,
- int exitStatus, String diagnostics) {
+ int exitStatus, String diagnostics, TaskAttemptTerminationCause errCause) {
super(containerId, AMContainerEventType.C_COMPLETED);
this.exitStatus = exitStatus;
this.diagnostics = diagnostics;
+ this.errCause = errCause;
}
public boolean isPreempted() {
@@ -41,6 +44,10 @@ public class AMContainerEventCompleted extends AMContainerEvent {
return (exitStatus == ContainerExitStatus.DISKS_FAILED);
}
+ public boolean isClusterAction() {
+ return isPreempted() || isDiskFailed();
+ }
+
public String getDiagnostics() {
return diagnostics;
}
@@ -48,5 +55,9 @@ public class AMContainerEventCompleted extends AMContainerEvent {
public int getContainerExitStatus() {
return exitStatus;
}
+
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index a0f9cb7..9d4f46b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
@@ -533,7 +534,7 @@ public class AMContainerImpl implements AMContainer {
.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
"AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
- container.getContainerId());
+ container.getContainerId(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
container.deAllocate();
LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
" for ContainerId: " + container.getContainerId() +
@@ -644,8 +645,10 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
+ // for a properly setup cluster this should almost always be an app error
+ // need to differentiate between launch failed due to framework/cluster or app
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
- event.getMessage());
+ event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
}
container.unregisterFromTAListener();
container.deAllocate();
@@ -659,12 +662,17 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
- if (event.isPreempted() || event.isDiskFailed()) {
+ if (event.isClusterAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
- errorMessage);
+ errorMessage, event.getTerminationCause());
} else {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt,
- errorMessage);
+ container
+ .sendTerminatedToTaskAttempt(
+ container.pendingAttempt,
+ errorMessage,
+ // if termination cause is generic exited then replace with specific
+ (event.getTerminationCause() == TaskAttemptTerminationCause.CONTAINER_EXITED ?
+ TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED : event.getTerminationCause()));
}
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
@@ -696,7 +704,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
- getMessage(container, cEvent));
+ getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
}
container.unregisterFromTAListener();
container.logStopped(container.pendingAttempt == null ?
@@ -722,27 +730,31 @@ public class AMContainerImpl implements AMContainer {
return;
}
container.nodeFailed = true;
- String errorMessage = null;
+ String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. ";
if (cEvent instanceof DiagnosableEvent) {
- errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+ errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo();
}
for (TezTaskAttemptID taId : container.failedAssignments) {
- container.sendNodeFailureToTA(taId, errorMessage);
+ container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED);
}
for (TezTaskAttemptID taId : container.completedAttempts) {
- container.sendNodeFailureToTA(taId, errorMessage);
+ container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED);
}
if (container.pendingAttempt != null) {
// Will be null in COMPLETED state.
- container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
- container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node failure");
+ container.sendNodeFailureToTA(container.pendingAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
+ container.sendTerminatingToTaskAttempt(container.pendingAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
}
if (container.runningAttempt != null) {
// Will be null in COMPLETED state.
- container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
- container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node failure");
+ container.sendNodeFailureToTA(container.runningAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
+ container.sendTerminatingToTaskAttempt(container.runningAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
}
container.logStopped(ContainerExitStatus.ABORTED);
}
@@ -767,7 +779,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState());
+ container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
container.logStopped(ContainerExitStatus.ABORTED);
container.sendStopRequestToNM();
@@ -909,12 +921,12 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
- if (event.isPreempted() || event.isDiskFailed()) {
+ if (event.isClusterAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
- getMessage(container, event));
+ getMessage(container, event), event.getTerminationCause());
} else {
container.sendTerminatedToTaskAttempt(container.runningAttempt,
- getMessage(container, event));
+ getMessage(container, event), event.getTerminationCause());
}
container.unregisterAttemptFromListener(container.runningAttempt);
container.registerFailedAttempt(container.runningAttempt);
@@ -929,8 +941,8 @@ public class AMContainerImpl implements AMContainer {
container.unregisterAttemptFromListener(container.runningAttempt);
container.sendTerminatingToTaskAttempt(container.runningAttempt,
- " Container" + container.getContainerId() +
- " received a STOP_REQUEST");
+ " Container" + container.getContainerId() + " received a STOP_REQUEST",
+ TaskAttemptTerminationCause.CONTAINER_STOPPED);
super.transition(container, cEvent);
}
}
@@ -964,7 +976,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.runningAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState());
+ container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
}
@@ -978,7 +990,8 @@ public class AMContainerImpl implements AMContainer {
" cannot be allocated to container: " + container.getContainerId() +
" in " + container.getState() + " state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
- container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
+ container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
@@ -1001,15 +1014,18 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
String diag = event.getDiagnostics();
for (TezTaskAttemptID taId : container.failedAssignments) {
- container.sendTerminatedToTaskAttempt(taId, diag);
+ container.sendTerminatedToTaskAttempt(taId, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
}
if (container.pendingAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
}
if (container.runningAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(container.runningAttempt);
container.runningAttempt = null;
}
@@ -1078,12 +1094,11 @@ public class AMContainerImpl implements AMContainer {
+ " in COMPLETED state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
- errorMessage);
+ errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
-
private void handleExtraTAAssign(
AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
this.inError = true;
@@ -1092,8 +1107,10 @@ public class AMContainerImpl implements AMContainer {
". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
". Current state: " + this.getState();
this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
- this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
- this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
+ this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+ TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+ this.sendTerminatingToTaskAttempt(currentTaId, errorMessage,
+ TaskAttemptTerminationCause.FRAMEWORK_ERROR);
this.registerFailedAttempt(event.getTaskAttemptId());
LOG.warn(errorMessage);
this.logStopped(ContainerExitStatus.INVALID);
@@ -1122,28 +1139,29 @@ public class AMContainerImpl implements AMContainer {
}
protected void sendTerminatedToTaskAttempt(
- TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+ TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errCause) {
+ sendEvent(new TaskAttemptEventContainerTerminated(taId, message, errCause));
}
protected void sendContainerTerminatedBySystemToTaskAttempt(
- TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message));
+ TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, errorCause));
}
protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
- String message) {
- sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+ String message, TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventContainerTerminating(taId, message, errorCause));
}
protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID taId) {
if (this.nodeFailed) {
- this.sendNodeFailureToTA(taId, "Node Failed");
+ this.sendNodeFailureToTA(taId, "Node Failed", TaskAttemptTerminationCause.NODE_FAILED);
}
}
- protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+ protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message,
+ TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventNodeFailed(taId, message, errorCause));
}
protected void sendStartRequestToNM(ContainerLaunchContext clc) {
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 0ae8061..2b21c89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -29,6 +29,7 @@ import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
@@ -43,21 +44,23 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private TaskAttemptState state;
private String diagnostics;
private TezCounters tezCounters;
+ private TaskAttemptTerminationCause error;
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
long startTime,
long finishTime,
TaskAttemptState state,
- String diagnostics,
- TezCounters counters) {
+ TaskAttemptTerminationCause error,
+ String diagnostics, TezCounters counters) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.startTime = startTime;
this.finishTime = finishTime;
this.state = state;
this.diagnostics = diagnostics;
- tezCounters = counters;
+ this.tezCounters = counters;
+ this.error = error;
}
public TaskAttemptFinishedEvent() {
@@ -87,6 +90,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
if (diagnostics != null) {
builder.setDiagnostics(diagnostics);
}
+ if (error != null) {
+ builder.setErrorEnum(error.name());
+ }
if (tezCounters != null) {
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
}
@@ -100,6 +106,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
if (proto.hasDiagnostics()) {
this.diagnostics = proto.getDiagnostics();
}
+ if (proto.hasErrorEnum()) {
+ this.error = TaskAttemptTerminationCause.valueOf(proto.getErrorEnum());
+ }
if (proto.hasCounters()) {
this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
@@ -129,6 +138,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", finishTime=" + finishTime
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ + ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ ", counters=" + (tezCounters == null ? "null" :
tezCounters.toString()
@@ -146,6 +156,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
public String getDiagnostics() {
return diagnostics;
}
+
+ public TaskAttemptTerminationCause getTaskAttemptError() {
+ return error;
+ }
public long getFinishTime() {
return finishTime;
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 8560359..79a0c34 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,11 +18,9 @@
package org.apache.tez.dag.history.logging.impl;
-import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.tez.common.ATSConstants;
@@ -485,6 +483,9 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ if (event.getTaskAttemptError() != null) {
+ otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+ }
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getCounters()));
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index 93f217f..a02268c 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -165,6 +165,7 @@ message TaskAttemptFinishedProto {
optional int32 state = 3;
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
+ optional string error_enum = 6;
}
message EventMetaDataProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index 0958c48..d1bef18 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -41,6 +41,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -200,6 +201,7 @@ public class TestPreemption {
TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);
TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+ Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, taImpl.getTerminationCause());
}
System.out.println("TestPreemption - Done running - " + info);
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index bba4edb..788b59b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -300,9 +301,11 @@ public class TestTaskAttempt {
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
// At state STARTING.
- taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+ taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
// At some KILLING state.
- taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+ taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
// taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
// null));
assertFalse(eventHandler.internalError);
@@ -366,7 +369,7 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
- "Terminating"));
+ "Terminating", TaskAttemptTerminationCause.APPLICATION_ERROR));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
eventHandler.internalError);
@@ -376,6 +379,7 @@ public class TestTaskAttempt {
assertEquals(1, taImpl.getDiagnostics().size());
assertEquals("Terminating", taImpl.getDiagnostics().get(0));
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
arg = ArgumentCaptor.forClass(Event.class);
@@ -392,13 +396,16 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
- "Terminated"));
+ "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
assertEquals(2, taImpl.getDiagnostics().size());
assertEquals("Terminated", taImpl.getDiagnostics().get(1));
+
+ // check that original error cause is retained
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
}
@@ -452,13 +459,14 @@ public class TestTaskAttempt {
null));
assertEquals("Task attempt is not in running state", taImpl.getState(),
TaskAttemptState.RUNNING);
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated"));
+ taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated",
+ TaskAttemptTerminationCause.CONTAINER_EXITED));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
eventHandler.internalError);
assertEquals("Terminated", taImpl.getDiagnostics().get(0));
-
+ assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, taImpl.getTerminationCause());
// TODO Ensure TA_TERMINATING after this is ingored.
}
@@ -541,14 +549,15 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
- "Terminated"));
+ "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
// Verify that the diagnostic message included in the Terminated event is not
- // captured - TA already succeeded.
+ // captured - TA already succeeded. Error cause is the default value.
assertEquals(0, taImpl.getDiagnostics().size());
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -636,8 +645,9 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
// Verify that the diagnostic message included in the Terminated event is not
- // captured - TA already succeeded.
+ // captured - TA already succeeded. Error cause should be the default value
assertEquals(0, taImpl.getDiagnostics().size());
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -718,7 +728,8 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
- taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned",
+ TaskAttemptTerminationCause.NODE_FAILED));
// Verify in KILLED state
assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED,
taImpl.getState());
@@ -734,6 +745,7 @@ public class TestTaskAttempt {
// Verify still in KILLED state
assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED,
taImpl.getState());
+ assertEquals(TaskAttemptTerminationCause.NODE_FAILED, taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -814,7 +826,8 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
- taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned",
+ TaskAttemptTerminationCause.NODE_FAILED));
// Verify no additional events
int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0;
@@ -824,6 +837,8 @@ public class TestTaskAttempt {
// Verify still in SUCCEEDED state
assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED,
taImpl.getState());
+ // error cause remains as default value
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
}
@Test//(timeout = 5000)
@@ -909,6 +924,8 @@ public class TestTaskAttempt {
taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 11));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
+ // default value of error cause
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
// different destination attempt reports error. now threshold crossed
TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
@@ -923,6 +940,7 @@ public class TestTaskAttempt {
finishEvent = (TaskAttemptFinishedEvent)histEvent.getHistoryEvent();
long newFinishTime = finishEvent.getFinishTime();
Assert.assertEquals(finishTime, newFinishTime);
+ assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl.getTerminationCause());
assertEquals(true, taImpl.inputFailedReported);
int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
@@ -1059,7 +1077,8 @@ public class TestTaskAttempt {
mockHeartbeatHandler, appCtx, locationHint, false,
resource, createFakeContainerContext(), true);
Assert.assertEquals(TaskAttemptStateInternal.NEW, taImpl.getInternalState());
- taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it"));
+ taImpl.handle(new TaskAttemptEventKillRequest(taImpl.getID(), "kill it",
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
Assert.assertEquals(0, taImpl.taskAttemptStartedEventLogged);
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 1391361..7c75a1d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -56,6 +56,7 @@ import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -147,10 +148,15 @@ public class TestTaskAttemptRecovery {
private void restoreFromTAFinishedEvent(TaskAttemptState state) {
String diag = "test_diag";
TezCounters counters = mock(TezCounters.class);
+
+ TaskAttemptTerminationCause errorEnum = null;
+ if (state != TaskAttemptState.SUCCEEDED) {
+ errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
+ }
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, state, diag, counters));
+ startTime, finishTime, state, errorEnum, diag, counters));
assertEquals(startTime, ta.getLaunchTime());
assertEquals(finishTime, ta.getFinishTime());
assertEquals(counters, ta.reportedStatus.counters);
@@ -159,6 +165,11 @@ public class TestTaskAttemptRecovery {
assertEquals(1, ta.getDiagnostics().size());
assertEquals(diag, ta.getDiagnostics().get(0));
assertEquals(state, recoveredState);
+ if (state != TaskAttemptState.SUCCEEDED) {
+ assertEquals(errorEnum, ta.getTerminationCause());
+ } else {
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, ta.getTerminationCause());
+ }
}
private void verifyEvents(List<Event> events, Class<? extends Event> eventClass,
@@ -278,7 +289,7 @@ public class TestTaskAttemptRecovery {
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
startTime, finishTime, TaskAttemptState.KILLED,
- "", new TezCounters()));
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "", new TezCounters()));
assertEquals(TaskAttemptState.KILLED, recoveredState);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/84a7ef05/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index ab7c87b..0b93093 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -54,7 +54,6 @@ import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -64,6 +63,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -173,13 +173,12 @@ public class TestTaskImpl {
}
private void killTask(TezTaskID taskId) {
- mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertTaskKillWaitState();
}
private void failTask(TezTaskID taskId) {
- mockTask.handle(new TaskEventTermination(taskId,
- TaskTerminationCause.OWN_TASK_FAILURE));
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertTaskKillWaitState();
}
@@ -626,9 +625,9 @@ public class TestTaskImpl {
@Test
public void testDiagnostics_KillNew(){
TezTaskID taskId = getNewTaskID();
- mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
assertEquals(1, mockTask.getDiagnostics().size());
- assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.DAG_KILL.name()));
+ assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
assertEquals(0, mockTask.taskStartedEventLogged);
assertEquals(1, mockTask.taskFinishedEventLogged);
}
@@ -637,9 +636,9 @@ public class TestTaskImpl {
public void testDiagnostics_Kill(){
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
- mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.OTHER_TASK_FAILURE));
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertEquals(1, mockTask.getDiagnostics().size());
- assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.OTHER_TASK_FAILURE.name()));
+ assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
}
// TODO Add test to validate the correct commit attempt.