You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2014/11/24 20:15:59 UTC
[1/2] tez git commit: TEZ-1773. Add attempt failure cause enum to the
attempt failed/killed history record (bikaS)
Repository: tez
Updated Branches:
refs/heads/master e8294b886 -> 81eef37d9
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
index afc3433..d953fef 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskRecovery.java
@@ -297,8 +297,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -329,8 +329,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.FAILED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -362,8 +362,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -397,8 +397,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -439,8 +439,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -451,8 +451,8 @@ public class TestTaskRecovery {
// it is possible for TaskAttempt transit from SUCCEEDED to FAILURE due to output failure.
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.FAILED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.FAILED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -486,8 +486,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -498,8 +498,8 @@ public class TestTaskRecovery {
// it is possible for TaskAttempt transit from SUCCEEDED to KILLED due to node failure.
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -537,8 +537,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -577,8 +577,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
TaskState recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.SUCCEEDED, null,
+ "", new TezCounters()));
assertEquals(TaskState.SUCCEEDED, recoveredState);
assertEquals(1, task.getAttempts().size());
assertEquals(1, task.getFinishedAttemptsCount());
@@ -658,8 +658,8 @@ public class TestTaskRecovery {
long taFinishTime = taStartTime + 100L;
recoveredState =
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- taStartTime, taFinishTime, TaskAttemptState.KILLED, "",
- new TezCounters()));
+ taStartTime, taFinishTime, TaskAttemptState.KILLED, null,
+ "", new TezCounters()));
assertEquals(TaskState.RUNNING, recoveredState);
assertEquals(TaskAttemptStateInternal.NEW,
((TaskAttemptImpl) task.getAttempt(taId)).getInternalState());
@@ -700,7 +700,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", ""));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.KILLED, "", null));
+ 0, TaskAttemptState.KILLED, null, "", null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(0, task.failedAttempts);
@@ -730,7 +730,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", ""));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, "", null));
+ 0, TaskAttemptState.FAILED, null, "", null));
}
assertEquals(maxFailedAttempts, task.getAttempts().size());
assertEquals(maxFailedAttempts, task.failedAttempts);
@@ -760,7 +760,7 @@ public class TestTaskRecovery {
task.restoreFromEvent(new TaskAttemptStartedEvent(taId, vertexName, 0L,
mock(ContainerId.class), mock(NodeId.class), "", "", ""));
task.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName, 0,
- 0, TaskAttemptState.FAILED, "", null));
+ 0, TaskAttemptState.FAILED, null, "", null));
}
assertEquals(maxFailedAttempts - 1, task.getAttempts().size());
assertEquals(maxFailedAttempts - 1, task.failedAttempts);
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
index 9500c97..687908d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestVertexImpl.java
@@ -56,6 +56,9 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -100,12 +103,15 @@ import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
+import org.apache.tez.dag.app.ClusterInfo;
+import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.RootInputInitializerManager;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.Task;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
@@ -114,6 +120,8 @@ import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
@@ -131,9 +139,12 @@ import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.impl.DAGImpl.VertexGroupInfo;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.VertexManagerWithException.VMExceptionLocation;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
+import org.apache.tez.dag.app.rm.container.AMContainerMap;
+import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -151,6 +162,7 @@ import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputUpdatePayloadEvent;
+import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.test.EdgeManagerForTest;
import org.apache.tez.test.VertexManagerPluginForTest;
@@ -2802,7 +2814,113 @@ public class TestVertexImpl {
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.abortCounter);
}
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexTaskAttemptProcessorFailure() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex1");
+
+ startVertex(v);
+ TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+ ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appContext);
+ containers.addContainerIfNew(container);
+ doReturn(containers).when(appContext).getAllContainers();
+
+ ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+ Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+ new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+ EventProducerConsumerType.PROCESSOR, v.getName(), null, ta.getID())))));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, ta.getTerminationCause());
+ }
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexTaskAttemptInputFailure() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex1");
+
+ startVertex(v);
+ TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+ ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appContext);
+ containers.addContainerIfNew(container);
+ doReturn(containers).when(appContext).getAllContainers();
+
+ ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+ Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+ new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+ EventProducerConsumerType.INPUT, v.getName(), null, ta.getID())))));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(TaskAttemptTerminationCause.INPUT_READ_ERROR, ta.getTerminationCause());
+ }
+
+
+ @SuppressWarnings("unchecked")
+ @Test(timeout = 5000)
+ public void testVertexTaskAttemptOutputFailure() {
+ initAllVertices(VertexState.INITED);
+
+ VertexImpl v = vertices.get("vertex1");
+
+ startVertex(v);
+ TaskAttemptImpl ta = (TaskAttemptImpl) v.getTask(0).getAttempts().values().iterator().next();
+ ta.handle(new TaskAttemptEventSchedule(ta.getID(), 2, 2));
+
+ NodeId nid = NodeId.newInstance("127.0.0.1", 0);
+ ContainerId contId = ContainerId.newInstance(appAttemptId, 3);
+ Container container = mock(Container.class);
+ when(container.getId()).thenReturn(contId);
+ when(container.getNodeId()).thenReturn(nid);
+ when(container.getNodeHttpAddress()).thenReturn("localhost:0");
+ AMContainerMap containers = new AMContainerMap(
+ mock(ContainerHeartbeatHandler.class), mock(TaskAttemptListener.class),
+ new ContainerContextMatcher(), appContext);
+ containers.addContainerIfNew(container);
+ doReturn(containers).when(appContext).getAllContainers();
+
+ ta.handle(new TaskAttemptEventStartedRemotely(ta.getID(), contId, null));
+ Assert.assertEquals(TaskAttemptStateInternal.RUNNING, ta.getInternalState());
+
+ dispatcher.getEventHandler().handle(
+ new VertexEventRouteEvent(v.getVertexId(), Collections.singletonList(new TezEvent(
+ new TaskAttemptFailedEvent("Failed"), new EventMetaData(
+ EventProducerConsumerType.OUTPUT, v.getName(), null, ta.getID())))));
+ dispatcher.await();
+ Assert.assertEquals(VertexState.RUNNING, v.getState());
+ Assert.assertEquals(TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR, ta.getTerminationCause());
+ }
+
@Test(timeout = 5000)
public void testSourceVertexStartHandling() {
LOG.info("Testing testSourceVertexStartHandling");
@@ -2819,21 +2937,6 @@ public class TestVertexImpl {
}
@Test(timeout = 5000)
- public void testCounters() {
- // FIXME need to test counters at vertex level
- }
-
- @Test(timeout = 5000)
- public void testDiagnostics() {
- // FIXME need to test diagnostics in various cases
- }
-
- @Test(timeout = 5000)
- public void testTaskAttemptCompletionEvents() {
- // FIXME need to test handling of task attempt events
- }
-
- @Test(timeout = 5000)
public void testSourceTaskAttemptCompletionEvents() {
LOG.info("Testing testSourceTaskAttemptCompletionEvents");
initAllVertices(VertexState.INITED);
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index 4ec1916..d2dece3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.junit.Assert;
import org.junit.Before;
@@ -179,6 +180,8 @@ public class TestTaskSchedulerEventHandler {
Assert.assertEquals("Container preempted externally. Container preempted by RM.",
completedEvent.getDiagnostics());
Assert.assertTrue(completedEvent.isPreempted());
+ Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
+ completedEvent.getTerminationCause());
Assert.assertFalse(completedEvent.isDiskFailed());
schedulerHandler.stop();
@@ -186,6 +189,31 @@ public class TestTaskSchedulerEventHandler {
}
@Test (timeout = 5000)
+ public void testContainerInternalPreempted() throws IOException {
+ Configuration conf = new Configuration(false);
+ schedulerHandler.init(conf);
+ schedulerHandler.start();
+
+ ContainerId mockCId = mock(ContainerId.class);
+ verify(mockTaskScheduler, times(0)).deallocateContainer((ContainerId)any());
+ schedulerHandler.preemptContainer(mockCId);
+ verify(mockTaskScheduler, times(1)).deallocateContainer(mockCId);
+ Assert.assertEquals(1, mockEventHandler.events.size());
+ Event event = mockEventHandler.events.get(0);
+ Assert.assertEquals(AMContainerEventType.C_COMPLETED, event.getType());
+ AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
+ Assert.assertEquals(mockCId, completedEvent.getContainerId());
+ Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
+ Assert.assertFalse(completedEvent.isPreempted());
+ Assert.assertFalse(completedEvent.isDiskFailed());
+ Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+ completedEvent.getTerminationCause());
+
+ schedulerHandler.stop();
+ schedulerHandler.close();
+ }
+
+ @Test (timeout = 5000)
public void testContainerDiskFailed() throws IOException {
Configuration conf = new Configuration(false);
schedulerHandler.init(conf);
@@ -211,6 +239,8 @@ public class TestTaskSchedulerEventHandler {
completedEvent.getDiagnostics());
Assert.assertFalse(completedEvent.isPreempted());
Assert.assertTrue(completedEvent.isDiskFailed());
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+ completedEvent.getTerminationCause());
schedulerHandler.stop();
schedulerHandler.close();
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index c0be044..f273896 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -67,17 +67,21 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminatedBySystem;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.NMCommunicatorEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TaskSpec;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.invocation.InvocationOnMock;
@@ -87,9 +91,7 @@ import com.google.common.collect.Maps;
public class TestAMContainer {
-
-
- @Test
+ @Test (timeout=5000)
// Assign before launch.
public void tetSingleSuccessfulTaskFlow() {
WrappedContainer wc = new WrappedContainer();
@@ -135,7 +137,7 @@ public class TestAMContainer {
assertNull(wc.amContainer.getRunningTaskAttempt());
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -146,7 +148,7 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
// Assign after launch.
public void testSingleSuccessfulTaskFlow2() {
WrappedContainer wc = new WrappedContainer();
@@ -191,7 +193,7 @@ public class TestAMContainer {
assertNull(wc.amContainer.getRunningTaskAttempt());
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -202,7 +204,7 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testSingleSuccessfulTaskFlowStopRequest() {
WrappedContainer wc = new WrappedContainer();
@@ -225,7 +227,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.STOPPING);
wc.verifyNoOutgoingEvents();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -238,7 +240,7 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testSingleSuccessfulTaskFlowFailedNMStopRequest() {
WrappedContainer wc = new WrappedContainer();
@@ -264,7 +266,7 @@ public class TestAMContainer {
assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -278,7 +280,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testMultipleAllocationsAtIdle() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -303,7 +305,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -317,7 +319,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testAllocationAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -343,7 +345,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -357,7 +359,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testMultipleAllocationsAtLaunching() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -382,7 +384,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -396,7 +398,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerTimedOutAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -418,7 +420,7 @@ public class TestAMContainer {
NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
// TODO Should this be an RM DE-ALLOCATE instead ?
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -432,7 +434,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testLaunchFailure() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -449,22 +451,28 @@ public class TestAMContainer {
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
+ for (Event e : outgoingEvents) {
+ if (e.getType() == TaskAttemptEventType.TA_CONTAINER_TERMINATING) {
+ Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
+ ((TaskAttemptEventContainerTerminating)e).getTerminationCause());
+ }
+ }
- wc.containerCompleted(false);
+ wc.containerCompleted();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED);
-
+
// Valid transition. Container complete, but not with an error.
assertFalse(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testContainerCompletedAtAllocated() {
WrappedContainer wc = new WrappedContainer();
wc.verifyState(AMContainerState.ALLOCATED);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
@@ -472,7 +480,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
// Verify that incoming NM launched events to COMPLETED containers are
// handled.
public void testContainerCompletedAtLaunching() {
@@ -484,7 +492,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -492,6 +500,8 @@ public class TestAMContainer {
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ Assert.assertEquals(TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED,
+ ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
assertFalse(wc.amContainer.isInErrorState());
@@ -501,9 +511,71 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
+
+ @SuppressWarnings("rawtypes")
+ @Test (timeout=5000)
+ public void testContainerCompletedAtLaunchingSpecificClusterError() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+
+ wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Container launched generated by NM call.
+ wc.containerLaunched();
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
+ public void testContainerCompletedAtLaunchingSpecificError() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+
+ wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_FAILED,
+ ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Container launched generated by NM call.
+ wc.containerLaunched();
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test (timeout=5000)
public void testContainerCompletedAtIdle() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -514,7 +586,7 @@ public class TestAMContainer {
wc.containerLaunched();
wc.verifyState(AMContainerState.IDLE);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -538,7 +610,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerCompletedAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -550,7 +622,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -574,7 +646,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerPreemptedAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -586,7 +658,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(ContainerExitStatus.PREEMPTED);
+ wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -594,6 +666,8 @@ public class TestAMContainer {
verify(wc.chh).unregister(wc.containerID);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
@@ -608,9 +682,47 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
}
+
+ @SuppressWarnings("rawtypes")
+ @Test (timeout=5000)
+ public void testContainerInternallyPreemptedAtRunning() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.containerLaunched();
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+
+ wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).register(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+ ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Pending task complete. (Ideally, container should be dead at this point
+ // and this event should not be generated. Network timeout on NM-RM heartbeat
+ // can cause it to be genreated)
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+ wc.verifyNoOutgoingEvents();
+ wc.verifyHistoryStopEvent();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testContainerDiskFailedAtRunning() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -622,7 +734,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted(ContainerExitStatus.DISKS_FAILED);
+ wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -630,6 +742,8 @@ public class TestAMContainer {
verify(wc.chh).unregister(wc.containerID);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
+ Assert.assertEquals(TaskAttemptTerminationCause.NODE_DISK_ERROR,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
@@ -646,7 +760,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testTaskAssignedToCompletedContainer() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -657,7 +771,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.taskAttemptSucceeded(wc.taskAttemptID);
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
@@ -677,7 +791,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
}
- @Test
+ @Test (timeout=5000)
public void testTaskPullAtLaunching() {
WrappedContainer wc = new WrappedContainer();
@@ -690,7 +804,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtIdle() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -712,11 +826,11 @@ public class TestAMContainer {
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
- assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
}
}
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -726,7 +840,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtIdleMultipleAttempts() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -756,13 +870,13 @@ public class TestAMContainer {
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
- assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
}
}
assertFalse(wc.amContainer.isInErrorState());
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyNoOutgoingEvents();
wc.verifyHistoryStopEvent();
@@ -772,7 +886,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtRunningMultipleAttempts() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -801,11 +915,11 @@ public class TestAMContainer {
for (Event event : outgoingEvents) {
if (event.getType() == TaskAttemptEventType.TA_NODE_FAILED) {
TaskAttemptEventNodeFailed nfEvent = (TaskAttemptEventNodeFailed) event;
- assertEquals("nodeFailed", nfEvent.getDiagnosticInfo());
+ assertTrue(nfEvent.getDiagnosticInfo().contains("nodeFailed"));
}
}
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyHistoryStopEvent();
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -818,7 +932,7 @@ public class TestAMContainer {
}
@SuppressWarnings("rawtypes")
- @Test
+ @Test (timeout=5000)
public void testNodeFailedAtCompletedMultipleSuccessfulTAs() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -835,7 +949,7 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID2);
wc.stopRequest();
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
wc.nodeFailed();
@@ -849,7 +963,7 @@ public class TestAMContainer {
assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
}
- @Test
+ @Test (timeout=5000)
public void testDuplicateCompletedEvents() {
WrappedContainer wc = new WrappedContainer();
@@ -865,17 +979,17 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID2);
wc.stopRequest();
wc.nmStopSent();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- wc.containerCompleted(false);
+ wc.containerCompleted();
wc.verifyNoOutgoingEvents();
wc.verifyHistoryStopEvent();
}
- @Test
+ @Test (timeout=5000)
public void testLocalResourceAddition() {
WrappedContainer wc = new WrappedContainer();
@@ -926,13 +1040,13 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID3);
// Verify references are cleared after a container completes.
- wc.containerCompleted(false);
+ wc.containerCompleted();
assertNull(wc.amContainer.containerLocalResources);
assertNull(wc.amContainer.additionalLocalResources);
}
@SuppressWarnings("unchecked")
- @Test
+ @Test (timeout=5000)
public void testCredentialsTransfer() {
WrappedContainerMultipleDAGs wc = new WrappedContainerMultipleDAGs();
@@ -1183,15 +1297,15 @@ public class TestAMContainer {
AMContainerEventType.C_NM_STOP_FAILED));
}
- public void containerCompleted(boolean preempted) {
+ public void containerCompleted() {
reset(eventHandler);
- amContainer.handle(new AMContainerEventCompleted(containerID,
- (preempted ? ContainerExitStatus.PREEMPTED : ContainerExitStatus.SUCCESS), null));
+ amContainer.handle(new AMContainerEventCompleted(containerID, ContainerExitStatus.SUCCESS, null,
+ TaskAttemptTerminationCause.CONTAINER_EXITED));
}
- public void containerCompleted(int exitStatus) {
+ public void containerCompleted(int exitStatus, TaskAttemptTerminationCause errCause) {
reset(eventHandler);
- amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null));
+ amContainer.handle(new AMContainerEventCompleted(containerID, exitStatus, null, errCause));
}
public void containerTimedOut() {
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
index 8913287..cd770a3 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/events/TestHistoryEventsProtoConversion.java
@@ -21,6 +21,7 @@ package org.apache.tez.dag.history.events;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -44,6 +45,7 @@ import org.apache.tez.dag.app.dag.impl.VertexStats;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.SummaryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -471,7 +473,7 @@ public class TestHistoryEventsProtoConversion {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- null, null);
+ null, null, null);
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -491,7 +493,7 @@ public class TestHistoryEventsProtoConversion {
TezTaskAttemptID.getInstance(TezTaskID.getInstance(TezVertexID.getInstance(
TezDAGID.getInstance(ApplicationId.newInstance(0, 1), 1), 111), 1), 1),
"vertex1", 10001l, 1000434444l, TaskAttemptState.FAILED,
- "diagnose", new TezCounters());
+ TaskAttemptTerminationCause.APPLICATION_ERROR, "diagnose", new TezCounters());
TaskAttemptFinishedEvent deserializedEvent = (TaskAttemptFinishedEvent)
testProtoConversion(event);
Assert.assertEquals(event.getTaskAttemptID(),
@@ -504,6 +506,8 @@ public class TestHistoryEventsProtoConversion {
deserializedEvent.getState());
Assert.assertEquals(event.getCounters(),
deserializedEvent.getCounters());
+ Assert.assertEquals(event.getTaskAttemptError(),
+ deserializedEvent.getTaskAttemptError());
logEvents(event, deserializedEvent);
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
index a20c9fe..e0f8c21 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/history/logging/impl/TestHistoryEventJsonConversion.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -155,7 +156,7 @@ public class TestHistoryEventJsonConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.FAILED, null, null);
+ random.nextInt(), TaskAttemptState.KILLED, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
index 4b6d648..91346ae 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/main/java/org/apache/tez/dag/history/logging/ats/HistoryEventTimelineConversion.java
@@ -376,6 +376,9 @@ public class HistoryEventTimelineConversion {
atsEntity.addOtherInfo(ATSConstants.FINISH_TIME, event.getFinishTime());
atsEntity.addOtherInfo(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
atsEntity.addOtherInfo(ATSConstants.STATUS, event.getState().name());
+ if (event.getTaskAttemptError() != null) {
+ atsEntity.addOtherInfo(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+ }
atsEntity.addOtherInfo(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
atsEntity.addOtherInfo(ATSConstants.COUNTERS,
DAGUtils.convertCountersToATSMap(event.getCounters()));
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
----------------------------------------------------------------------
diff --git a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
index ce47820..0f2942c 100644
--- a/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
+++ b/tez-plugins/tez-yarn-timeline-history/src/test/java/org/apache/tez/dag/history/logging/ats/TestHistoryEventTimelineConversion.java
@@ -63,6 +63,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.history.logging.EntityTypes;
import org.apache.tez.dag.history.utils.DAGUtils;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -157,7 +158,7 @@ public class TestHistoryEventTimelineConversion {
break;
case TASK_ATTEMPT_FINISHED:
event = new TaskAttemptFinishedEvent(tezTaskAttemptID, "v1", random.nextInt(),
- random.nextInt(), TaskAttemptState.FAILED, null, null);
+ random.nextInt(), TaskAttemptState.FAILED, TaskAttemptTerminationCause.OUTPUT_LOST, null, null);
break;
case CONTAINER_LAUNCHED:
event = new ContainerLaunchedEvent(containerId, random.nextInt(),
[2/2] tez git commit: TEZ-1773. Add attempt failure cause enum to the
attempt failed/killed history record (bikaS)
Posted by bi...@apache.org.
TEZ-1773. Add attempt failure cause enum to the attempt failed/killed history record (bikaS)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/81eef37d
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/81eef37d
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/81eef37d
Branch: refs/heads/master
Commit: 81eef37d9e1e9222ef09eed319c45cdcd9034cd8
Parents: e8294b8
Author: Bikas Saha <bi...@apache.org>
Authored: Mon Nov 24 11:15:44 2014 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Mon Nov 24 11:15:44 2014 -0800
----------------------------------------------------------------------
CHANGES.txt | 2 +
.../org/apache/tez/common/ATSConstants.java | 1 +
.../records/TaskAttemptTerminationCause.java | 45 ++++
.../tez/dag/app/TaskHeartbeatHandler.java | 3 +-
.../org/apache/tez/dag/app/dag/TaskAttempt.java | 2 +
.../event/TaskAttemptEventAttemptFailed.java | 13 +-
.../TaskAttemptEventContainerTerminated.java | 13 +-
...AttemptEventContainerTerminatedBySystem.java | 13 +-
.../TaskAttemptEventContainerTerminating.java | 12 +-
.../dag/event/TaskAttemptEventKillRequest.java | 13 +-
.../dag/event/TaskAttemptEventNodeFailed.java | 12 +-
.../dag/event/TaskAttemptEventOutputFailed.java | 9 +-
.../TaskAttemptEventTerminationCauseEvent.java | 26 +++
.../dag/app/dag/event/TaskAttemptEventType.java | 3 +-
.../dag/app/dag/event/TaskEventTermination.java | 28 ++-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 39 +++-
.../apache/tez/dag/app/dag/impl/TaskImpl.java | 19 +-
.../apache/tez/dag/app/dag/impl/VertexImpl.java | 36 ++-
.../app/launcher/LocalContainerLauncher.java | 9 +-
.../dag/app/rm/TaskSchedulerEventHandler.java | 14 +-
.../rm/container/AMContainerEventCompleted.java | 13 +-
.../dag/app/rm/container/AMContainerImpl.java | 96 ++++----
.../events/TaskAttemptFinishedEvent.java | 20 +-
.../impl/HistoryEventJsonConversion.java | 5 +-
tez-dag/src/main/proto/HistoryEvents.proto | 1 +
.../org/apache/tez/dag/app/TestPreemption.java | 2 +
.../org/apache/tez/dag/app/TestSpeculation.java | 3 +
.../tez/dag/app/dag/impl/TestTaskAttempt.java | 51 ++--
.../app/dag/impl/TestTaskAttemptRecovery.java | 13 +-
.../tez/dag/app/dag/impl/TestTaskImpl.java | 12 +-
.../tez/dag/app/dag/impl/TestTaskRecovery.java | 50 ++--
.../tez/dag/app/dag/impl/TestVertexImpl.java | 133 +++++++++--
.../app/rm/TestTaskSchedulerEventHandler.java | 30 +++
.../dag/app/rm/container/TestAMContainer.java | 230 ++++++++++++++-----
.../TestHistoryEventsProtoConversion.java | 8 +-
.../impl/TestHistoryEventJsonConversion.java | 3 +-
.../ats/HistoryEventTimelineConversion.java | 3 +
.../ats/TestHistoryEventTimelineConversion.java | 3 +-
38 files changed, 762 insertions(+), 226 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index 0a393a2..011bbaf 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -6,6 +6,8 @@ Release 0.6.0: Unreleased
INCOMPATIBLE CHANGES
ALL CHANGES:
+ TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
+ history record
TEZ-14. Support MR like speculation capabilities based on latency deviation
from the mean
TEZ-1733. TezMerger should sort FileChunks on size when merging
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
----------------------------------------------------------------------
diff --git a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
index 7b47b9c..e502374 100644
--- a/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
+++ b/tez-api/src/main/java/org/apache/tez/common/ATSConstants.java
@@ -62,6 +62,7 @@ public class ATSConstants {
public static final String FINISH_TIME = "endTime";
public static final String TIME_TAKEN = "timeTaken";
public static final String STATUS = "status";
+ public static final String TASK_ATTEMPT_ERROR_ENUM = "taskAttemptErrorEnum";
public static final String DIAGNOSTICS = "diagnostics";
public static final String SUCCESSFUL_ATTEMPT_ID = "successfulAttemptId";
public static final String COUNTERS = "counters";
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
new file mode 100644
index 0000000..ef0bb33
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/records/TaskAttemptTerminationCause.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.records;
+
+public enum TaskAttemptTerminationCause {
+ UNKNOWN_ERROR, // The error cause is unknown. Usually means a gap in error propagation
+
+ TERMINATED_BY_CLIENT, // Killed by client command
+ TERMINATED_AT_SHUTDOWN, // Killed due execution shutdown
+ INTERNAL_PREEMPTION, // Killed by Tez to makes space for higher pri work
+ EXTERNAL_PREEMPTION, // Killed by the cluster to make space for other work
+ TERMINATED_INEFFECTIVE_SPECULATION, // Killed speculative attempt because original succeeded
+ TERMINATED_EFFECTIVE_SPECULATION, // Killed original attempt because speculation succeeded
+ TERMINATED_ORPHANED, // Attempt is no longer needed by the task
+
+ APPLICATION_ERROR, // Failed due to application code error
+ FRAMEWORK_ERROR, // Failed due to code error in Tez code
+ INPUT_READ_ERROR, // Failed due to error in reading inputs
+ OUTPUT_WRITE_ERROR, // Failed due to error in writing outputs
+ OUTPUT_LOST, // Failed because attempts output were reported lost
+ TASK_HEARTBEAT_ERROR, // Failed because AM lost connection to the task
+
+ CONTAINER_LAUNCH_FAILED, // Failed to launch container
+ CONTAINER_EXITED, // Container exited. Indicates gap in specific error propagation from the cluster
+ CONTAINER_STOPPED, // Container stopped or released by Tez
+ NODE_FAILED, // Node for the container failed
+ NODE_DISK_ERROR, // Disk failed on the node runnign the task
+
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
index 6b698aa..d115b14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskHeartbeatHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
@@ -60,6 +61,6 @@ public class TaskHeartbeatHandler extends HeartbeatHandlerBase<TezTaskAttemptID>
protected void handleTimeOut(TezTaskAttemptID attemptId) {
eventHandler.handle(new TaskAttemptEventAttemptFailed(attemptId,
TaskAttemptEventType.TA_TIMED_OUT, "AttemptID:" + attemptId.toString()
- + " Timed out after " + timeOut / 1000 + " secs"));
+ + " Timed out after " + timeOut / 1000 + " secs", TaskAttemptTerminationCause.TASK_HEARTBEAT_ERROR));
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
index 4aa220d..3f60a4e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/TaskAttempt.java
@@ -29,6 +29,7 @@ import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptReport;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -73,6 +74,7 @@ public interface TaskAttempt {
TaskAttemptReport getReport();
List<String> getDiagnostics();
+ TaskAttemptTerminationCause getTerminationCause();
TezCounters getCounters();
float getProgress();
TaskAttemptState getState();
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
index 5c7b956..b9c1d09 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptFailed.java
@@ -18,16 +18,19 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String diagnostics;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventAttemptFailed(TezTaskAttemptID id,
- TaskAttemptEventType type, String diagnostics) {
+ TaskAttemptEventType type, String diagnostics, TaskAttemptTerminationCause errorCause) {
super(id, type);
this.diagnostics = diagnostics;
+ this.errorCause = errorCause;
}
@Override
@@ -35,5 +38,9 @@ public class TaskAttemptEventAttemptFailed extends TaskAttemptEvent
return diagnostics;
}
-
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
index 87aa313..5dd0141 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminated.java
@@ -17,20 +17,29 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminated extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message) {
+ public TaskAttemptEventContainerTerminated(TezTaskAttemptID id, String message,
+ TaskAttemptTerminationCause errCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED);
this.message = message;
+ this.errorCause = errCause;
}
@Override
public String getDiagnosticInfo() {
return message;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
index a92aafd..a3c57e4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminatedBySystem.java
@@ -18,19 +18,28 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminatedBySystem extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String diagnostics;
- public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics) {
+ private final TaskAttemptTerminationCause errorCause;
+ public TaskAttemptEventContainerTerminatedBySystem(TezTaskAttemptID id, String diagnostics,
+ TaskAttemptTerminationCause errorCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
this.diagnostics = diagnostics;
+ this.errorCause = errorCause;
}
@Override
public String getDiagnosticInfo() {
return diagnostics;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
index 7da6e14..02d04a5 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerTerminating.java
@@ -17,17 +17,20 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
- implements DiagnosableEvent {
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventContainerTerminating(TezTaskAttemptID id,
- String diagMessage) {
+ String diagMessage, TaskAttemptTerminationCause errCause) {
super(id, TaskAttemptEventType.TA_CONTAINER_TERMINATING);
this.message = diagMessage;
+ this.errorCause = errCause;
}
@Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventContainerTerminating extends TaskAttemptEvent
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
index 0205fcf..a0dfe5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventKillRequest.java
@@ -17,15 +17,19 @@
*/
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
-public class TaskAttemptEventKillRequest extends TaskAttemptEvent implements DiagnosableEvent {
+public class TaskAttemptEventKillRequest extends TaskAttemptEvent
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message) {
+ public TaskAttemptEventKillRequest(TezTaskAttemptID id, String message, TaskAttemptTerminationCause err) {
super(id, TaskAttemptEventType.TA_KILL_REQUEST);
this.message = message;
+ this.errorCause = err;
}
@Override
@@ -33,4 +37,9 @@ public class TaskAttemptEventKillRequest extends TaskAttemptEvent implements Dia
return message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
index 6d97466..541ef00 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventNodeFailed.java
@@ -17,17 +17,20 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
public class TaskAttemptEventNodeFailed extends TaskAttemptEvent
- implements DiagnosableEvent{
+ implements DiagnosableEvent, TaskAttemptEventTerminationCauseEvent {
private final String message;
+ private final TaskAttemptTerminationCause errorCause;
public TaskAttemptEventNodeFailed(TezTaskAttemptID id,
- String diagMessage) {
+ String diagMessage, TaskAttemptTerminationCause errorCause) {
super(id, TaskAttemptEventType.TA_NODE_FAILED);
this.message = diagMessage;
+ this.errorCause = errorCause;
}
@Override
@@ -35,4 +38,9 @@ public class TaskAttemptEventNodeFailed extends TaskAttemptEvent
return this.message;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
index 678e1e7..6bc110a 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventOutputFailed.java
@@ -18,10 +18,12 @@
package org.apache.tez.dag.app.dag.event;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.api.impl.TezEvent;
-public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
+public class TaskAttemptEventOutputFailed extends TaskAttemptEvent
+ implements TaskAttemptEventTerminationCauseEvent {
private TezEvent inputFailedEvent;
private int consumerTaskNumber;
@@ -40,5 +42,10 @@ public class TaskAttemptEventOutputFailed extends TaskAttemptEvent {
public int getConsumerTaskNumber() {
return consumerTaskNumber;
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return TaskAttemptTerminationCause.OUTPUT_LOST;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
new file mode 100644
index 0000000..70c20e3
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventTerminationCauseEvent.java
@@ -0,0 +1,26 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
+
+public interface TaskAttemptEventTerminationCauseEvent {
+
+ public TaskAttemptTerminationCause getTerminationCause();
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index c8eec1b..b7aca36 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -29,8 +29,7 @@ public enum TaskAttemptEventType {
//Producer: TaskAttemptListener
TA_STARTED_REMOTELY,
TA_STATUS_UPDATE,
- TA_DIAGNOSTICS_UPDATE,
- TA_COMMIT_PENDING,
+ TA_DIAGNOSTICS_UPDATE, // REMOVE THIS - UNUSED
TA_DONE,
TA_FAILED,
TA_TIMED_OUT,
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
index 73d5744..d48a0bf 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskEventTermination.java
@@ -18,22 +18,23 @@
package org.apache.tez.dag.app.dag.event;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskID;
-public class TaskEventTermination extends TaskEvent implements DiagnosableEvent{
+public class TaskEventTermination extends TaskEvent implements DiagnosableEvent,
+ TaskAttemptEventTerminationCauseEvent {
- private TaskTerminationCause terminationCause;
- private String diagnostics;
+ private final String diagnostics;
+ private final TaskAttemptTerminationCause errorCause;
- public TaskEventTermination(TezTaskID taskID, TaskTerminationCause terminationCause) {
+ public TaskEventTermination(TezTaskID taskID, TaskAttemptTerminationCause errorCause, String diagnostics) {
super(taskID, TaskEventType.T_TERMINATE);
- this.terminationCause = terminationCause;
- this.diagnostics = "Task is terminated due to:" + terminationCause.name();
- }
-
- public TaskTerminationCause getTerminationCause() {
- return terminationCause;
+ this.errorCause = errorCause;
+ if (diagnostics != null) {
+ this.diagnostics = diagnostics;
+ } else {
+ this.diagnostics = "Task is terminated due to: " + errorCause.name();
+ }
}
@Override
@@ -41,4 +42,9 @@ public class TaskEventTermination extends TaskEvent implements DiagnosableEvent{
return diagnostics;
}
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errorCause;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 3056c1e..5103095 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -76,6 +76,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventTerminationCauseEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventOutputFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
@@ -91,6 +92,7 @@ import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -124,6 +126,7 @@ public class TaskAttemptImpl implements TaskAttempt,
protected EventHandler eventHandler;
private final TezTaskAttemptID attemptId;
private final Clock clock;
+ private TaskAttemptTerminationCause terminationCause = TaskAttemptTerminationCause.UNKNOWN_ERROR;
private final List<String> diagnostics = new ArrayList<String>();
private final Lock readLock;
private final Lock writeLock;
@@ -291,7 +294,6 @@ public class TaskAttemptImpl implements TaskAttempt,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -313,7 +315,6 @@ public class TaskAttemptImpl implements TaskAttempt,
EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -332,7 +333,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_SCHEDULE,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -352,7 +352,6 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptEventType.TA_SCHEDULE,
TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM,
TaskAttemptEventType.TA_STATUS_UPDATE,
- TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_KILL_REQUEST,
@@ -498,6 +497,11 @@ public class TaskAttemptImpl implements TaskAttempt,
readLock.unlock();
}
}
+
+ @Override
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return terminationCause;
+ }
@Override
public TezCounters getCounters() {
@@ -745,6 +749,8 @@ public class TaskAttemptImpl implements TaskAttempt,
this.reportedStatus.counters = tEvent.getCounters();
this.reportedStatus.progress = 1f;
this.reportedStatus.state = tEvent.getState();
+ this.terminationCause = tEvent.getTaskAttemptError() != null ? tEvent.getTaskAttemptError()
+ : TaskAttemptTerminationCause.UNKNOWN_ERROR;
this.diagnostics.add(tEvent.getDiagnostics());
this.recoveredState = tEvent.getState();
sendEvent(createDAGCounterUpdateEventTAFinished(this, tEvent.getState()));
@@ -959,8 +965,8 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
- getFinishTime(), TaskAttemptState.SUCCEEDED, "",
- getCounters());
+ getFinishTime(), TaskAttemptState.SUCCEEDED, null,
+ "", getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -971,9 +977,9 @@ public class TaskAttemptImpl implements TaskAttempt,
TaskAttemptFinishedEvent finishEvt = new TaskAttemptFinishedEvent(
attemptId, getTask().getVertex().getName(), getLaunchTime(),
clock.getTime(), state,
+ terminationCause,
StringUtils.join(
- getDiagnostics(), LINE_SEPARATOR),
- getCounters());
+ getDiagnostics(), LINE_SEPARATOR), getCounters());
// FIXME how do we store information regd completion events
this.appContext.getHistoryHandler().handle(
new DAGHistoryEvent(getDAGID(), finishEvt));
@@ -1003,7 +1009,8 @@ public class TaskAttemptImpl implements TaskAttempt,
LOG.error(msg, e);
String diag = msg + ", " + e.getMessage() + ", " + ExceptionUtils.getStackTrace(e.getCause());
new TerminatedBeforeRunningTransition(FAILED_HELPER).transition(ta,
- new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag));
+ new TaskAttemptEventAttemptFailed(ta.getID(), TaskAttemptEventType.TA_FAILED, diag,
+ TaskAttemptTerminationCause.APPLICATION_ERROR));
return TaskAttemptStateInternal.FAILED;
}
// Create startTaskRequest
@@ -1085,6 +1092,13 @@ public class TaskAttemptImpl implements TaskAttempt,
if (event instanceof DiagnosableEvent) {
ta.addDiagnosticInfo(((DiagnosableEvent) event).getDiagnosticInfo());
}
+
+ // this should catch at test time if any new events are missing the error cause
+ assert event instanceof TaskAttemptEventTerminationCauseEvent;
+
+ if (event instanceof TaskAttemptEventTerminationCauseEvent) {
+ ta.trySetTerminationCause(((TaskAttemptEventTerminationCauseEvent) event).getTerminationCause());
+ }
ta.sendEvent(createDAGCounterUpdateEventTAFinished(ta,
helper.getTaskAttemptState()));
@@ -1484,6 +1498,13 @@ public class TaskAttemptImpl implements TaskAttempt,
sendEvent(new VertexEventRouteEvent(vertex.getVertexId(), tezIfEvents));
}
}
+
+ private void trySetTerminationCause(TaskAttemptTerminationCause err) {
+ // keep only the first error cause
+ if (terminationCause == TaskAttemptTerminationCause.UNKNOWN_ERROR) {
+ terminationCause = err;
+ }
+ }
private void initTaskAttemptStatus(TaskAttemptStatus result) {
result.progress = 0.0f;
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
index c3ba11d..b20fa13 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskImpl.java
@@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -84,6 +85,7 @@ import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
import org.apache.tez.dag.history.events.TaskFinishedEvent;
import org.apache.tez.dag.history.events.TaskStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -91,6 +93,7 @@ import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.impl.TezEvent;
import com.google.common.annotations.VisibleForTesting;
+
import org.apache.tez.state.OnStateChangedCallback;
import org.apache.tez.state.StateMachineTez;
@@ -698,7 +701,7 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
if (getState() != TaskState.RUNNING) {
LOG.info("Task not running. Issuing kill to bad commit attempt " + taskAttemptID);
eventHandler.handle(new TaskAttemptEventKillRequest(taskAttemptID
- , "Task not running. Bad attempt."));
+ , "Task not running. Bad attempt.", TaskAttemptTerminationCause.TERMINATED_ORPHANED));
return false;
}
if (commitAttempt == null) {
@@ -1027,15 +1030,18 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
LOG.info("Issuing kill to other attempt " + attempt.getID() + " as attempt: " +
task.successfulAttempt + " has succeeded");
String diagnostics = null;
+ TaskAttemptTerminationCause errCause = null;
if (attempt.getLaunchTime() < successfulAttempt.getLaunchTime()) {
diagnostics = "Killed this attempt as other speculative attempt : " + successTaId
+ " succeeded";
+ errCause = TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION;
} else {
diagnostics = "Killed this speculative attempt as original attempt: " + successTaId
+ " succeeded";
+ errCause = TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION;
}
task.eventHandler.handle(new TaskAttemptEventKillRequest(attempt
- .getID(), diagnostics));
+ .getID(), diagnostics, errCause));
}
}
// send notification to DAG scheduler
@@ -1416,14 +1422,13 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
}
}
- private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
+ private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg, TaskAttemptTerminationCause errorCause) {
if (commitAttempt != null && commitAttempt.equals(attempt)) {
LOG.info("Removing commit attempt: " + commitAttempt);
commitAttempt = null;
}
if (attempt != null && !attempt.isFinished()) {
- eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(),
- logMsg));
+ eventHandler.handle(new TaskAttemptEventKillRequest(attempt.getID(), logMsg, errorCause));
}
}
@@ -1445,8 +1450,8 @@ public class TaskImpl implements Task, EventHandler<TaskEvent> {
task.addDiagnosticInfo(terminateEvent.getDiagnosticInfo());
// issue kill to all non finished attempts
for (TaskAttempt attempt : task.attempts.values()) {
- task.killUnfinishedAttempt
- (attempt, "Task KILL is received. Killing attempt!");
+ task.killUnfinishedAttempt(attempt, "Task KILL is received. Killing attempt. Diagnostics: "
+ + terminateEvent.getDiagnosticInfo(), terminateEvent.getTerminationCause());
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
index 54cd6c4..3246f38 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/VertexImpl.java
@@ -139,6 +139,7 @@ import org.apache.tez.dag.history.events.VertexParallelismUpdatedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -1821,12 +1822,17 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
*/
void tryEnactKill(VertexTerminationCause trigger,
TaskTerminationCause taskterminationCause) {
+ // In most cases the dag is shutting down due to some error
+ TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN;
+ if (taskterminationCause == TaskTerminationCause.DAG_KILL) {
+ errCause = TaskAttemptTerminationCause.TERMINATED_BY_CLIENT;
+ }
if(trySetTerminationCause(trigger)){
- LOG.info("Killing tasks in vertex: " + logIdentifier + " due to trigger: "
- + trigger);
+ String msg = "Killing tasks in vertex: " + logIdentifier + " due to trigger: " + trigger;
+ LOG.info(msg);
for (Task task : tasks.values()) {
- eventHandler.handle(
- new TaskEventTermination(task.getTaskId(), taskterminationCause));
+ eventHandler.handle( // attempt was terminated because the vertex is shutting down
+ new TaskEventTermination(task.getTaskId(), errCause, msg));
}
}
}
@@ -3912,12 +3918,32 @@ public class VertexImpl implements org.apache.tez.dag.app.dag.Vertex,
case TASK_ATTEMPT_FAILED_EVENT:
{
checkEventSourceMetadata(vertex, sourceMeta);
+ TaskAttemptTerminationCause errCause = null;
+ switch (sourceMeta.getEventGenerator()) {
+ case INPUT:
+ errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
+ break;
+ case PROCESSOR:
+ errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
+ break;
+ case OUTPUT:
+ errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
+ break;
+ case SYSTEM:
+ errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+ break;
+ default:
+ throw new TezUncheckedException("Unknown EventProducerConsumerType: " +
+ sourceMeta.getEventGenerator());
+ }
TaskAttemptFailedEvent taskFailedEvent =
(TaskAttemptFailedEvent) tezEvent.getEvent();
vertex.getEventHandler().handle(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED,
- "Error: " + taskFailedEvent.getDiagnostics()));
+ "Error: " + taskFailedEvent.getDiagnostics(),
+ errCause)
+ );
}
break;
default:
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
index baeb9a3..f14fd5d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/launcher/LocalContainerLauncher.java
@@ -67,6 +67,7 @@ import org.apache.tez.dag.app.rm.container.AMContainerEventLaunched;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.events.ContainerLaunchedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.runtime.task.TezChild;
@@ -242,14 +243,14 @@ public class LocalContainerLauncher extends AbstractService implements
LOG.info("Container: " + containerId + " completed successfully");
appContext.getEventHandler().handle(
new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
- null));
+ null, TaskAttemptTerminationCause.CONTAINER_EXITED));
} else {
LOG.info("Container: " + containerId + " completed but with errors");
appContext.getEventHandler().handle(
new AMContainerEventCompleted(containerId, result.getExitStatus().getExitCode(),
result.getErrorMessage() == null ?
(result.getThrowable() == null ? null : result.getThrowable().getMessage()) :
- result.getErrorMessage()));
+ result.getErrorMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
}
}
@@ -263,13 +264,13 @@ public class LocalContainerLauncher extends AbstractService implements
appContext.getEventHandler()
.handle(new AMContainerEventCompleted(containerId,
TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE.getExitCode(),
- t.getMessage()));
+ t.getMessage(), TaskAttemptTerminationCause.APPLICATION_ERROR));
} else {
LOG.info("Ignoring CancellationException - triggered by LocalContainerLauncher");
appContext.getEventHandler()
.handle(new AMContainerEventCompleted(containerId,
TezChild.ContainerExecutionResult.ExitStatus.SUCCESS.getExitCode(),
- "CancellationException"));
+ "CancellationException", TaskAttemptTerminationCause.CONTAINER_EXITED));
}
}
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index ec8e73f..625b09e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -69,6 +69,7 @@ import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import com.google.common.base.Preconditions;
@@ -422,19 +423,22 @@ public class TaskSchedulerEventHandler extends AbstractService
// Inform the Containers about completion.
AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
if (amContainer != null) {
- String message = null;
+ String message = "Container completed. ";
+ TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
int exitStatus = containerStatus.getExitStatus();
if (exitStatus == ContainerExitStatus.PREEMPTED) {
message = "Container preempted externally. ";
+ errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
} else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
message = "Container disk failed. ";
- } else {
+ errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
+ } else if (exitStatus != ContainerExitStatus.SUCCESS){
message = "Container failed. ";
}
if (containerStatus.getDiagnostics() != null) {
message += containerStatus.getDiagnostics();
}
- sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message));
+ sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
}
}
@@ -550,8 +554,8 @@ public class TaskSchedulerEventHandler extends AbstractService
public void preemptContainer(ContainerId containerId) {
taskScheduler.deallocateContainer(containerId);
// Inform the Containers about completion.
- sendEvent(new AMContainerEventCompleted(containerId,
- ContainerExitStatus.PREEMPTED, "Container preempted internally"));
+ sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
+ "Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
}
public void setShouldUnregisterFlag() {
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index e9649f3..a455f1e 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,17 +20,20 @@ package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
public class AMContainerEventCompleted extends AMContainerEvent {
private final int exitStatus;
private final String diagnostics;
+ private final TaskAttemptTerminationCause errCause;
public AMContainerEventCompleted(ContainerId containerId,
- int exitStatus, String diagnostics) {
+ int exitStatus, String diagnostics, TaskAttemptTerminationCause errCause) {
super(containerId, AMContainerEventType.C_COMPLETED);
this.exitStatus = exitStatus;
this.diagnostics = diagnostics;
+ this.errCause = errCause;
}
public boolean isPreempted() {
@@ -41,6 +44,10 @@ public class AMContainerEventCompleted extends AMContainerEvent {
return (exitStatus == ContainerExitStatus.DISKS_FAILED);
}
+ public boolean isClusterAction() {
+ return isPreempted() || isDiskFailed();
+ }
+
public String getDiagnostics() {
return diagnostics;
}
@@ -48,5 +55,9 @@ public class AMContainerEventCompleted extends AMContainerEvent {
public int getContainerExitStatus() {
return exitStatus;
}
+
+ public TaskAttemptTerminationCause getTerminationCause() {
+ return errCause;
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index a0f9cb7..9d4f46b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -59,6 +59,7 @@ import org.apache.tez.dag.app.rm.NMCommunicatorStopRequestEvent;
import org.apache.tez.dag.history.DAGHistoryEvent;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.history.events.ContainerStoppedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
//import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
@@ -533,7 +534,7 @@ public class AMContainerImpl implements AMContainer {
.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
"AMScheduler Error: TaskAttempt allocated to unlaunched container: " +
- container.getContainerId());
+ container.getContainerId(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
container.deAllocate();
LOG.warn("Unexpected TA Assignment: TAId: " + event.getTaskAttemptId() +
" for ContainerId: " + container.getContainerId() +
@@ -644,8 +645,10 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
AMContainerEventLaunchFailed event = (AMContainerEventLaunchFailed) cEvent;
+ // for a properly setup cluster this should almost always be an app error
+ // need to differentiate between launch failed due to framework/cluster or app
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
- event.getMessage());
+ event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
}
container.unregisterFromTAListener();
container.deAllocate();
@@ -659,12 +662,17 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
- if (event.isPreempted() || event.isDiskFailed()) {
+ if (event.isClusterAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
- errorMessage);
+ errorMessage, event.getTerminationCause());
} else {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt,
- errorMessage);
+ container
+ .sendTerminatedToTaskAttempt(
+ container.pendingAttempt,
+ errorMessage,
+ // if termination cause is generic exited then replace with specific
+ (event.getTerminationCause() == TaskAttemptTerminationCause.CONTAINER_EXITED ?
+ TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED : event.getTerminationCause()));
}
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
@@ -696,7 +704,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
if (container.pendingAttempt != null) {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
- getMessage(container, cEvent));
+ getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
}
container.unregisterFromTAListener();
container.logStopped(container.pendingAttempt == null ?
@@ -722,27 +730,31 @@ public class AMContainerImpl implements AMContainer {
return;
}
container.nodeFailed = true;
- String errorMessage = null;
+ String errorMessage = "Node " + container.getContainer().getNodeId() + " failed. ";
if (cEvent instanceof DiagnosableEvent) {
- errorMessage = ((DiagnosableEvent) cEvent).getDiagnosticInfo();
+ errorMessage += ((DiagnosableEvent) cEvent).getDiagnosticInfo();
}
for (TezTaskAttemptID taId : container.failedAssignments) {
- container.sendNodeFailureToTA(taId, errorMessage);
+ container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED);
}
for (TezTaskAttemptID taId : container.completedAttempts) {
- container.sendNodeFailureToTA(taId, errorMessage);
+ container.sendNodeFailureToTA(taId, errorMessage, TaskAttemptTerminationCause.NODE_FAILED);
}
if (container.pendingAttempt != null) {
// Will be null in COMPLETED state.
- container.sendNodeFailureToTA(container.pendingAttempt, errorMessage);
- container.sendTerminatingToTaskAttempt(container.pendingAttempt, "Node failure");
+ container.sendNodeFailureToTA(container.pendingAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
+ container.sendTerminatingToTaskAttempt(container.pendingAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
}
if (container.runningAttempt != null) {
// Will be null in COMPLETED state.
- container.sendNodeFailureToTA(container.runningAttempt, errorMessage);
- container.sendTerminatingToTaskAttempt(container.runningAttempt, "Node failure");
+ container.sendNodeFailureToTA(container.runningAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
+ container.sendTerminatingToTaskAttempt(container.runningAttempt, errorMessage,
+ TaskAttemptTerminationCause.NODE_FAILED);
}
container.logStopped(ContainerExitStatus.ABORTED);
}
@@ -767,7 +779,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.pendingAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState());
+ container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
container.logStopped(ContainerExitStatus.ABORTED);
container.sendStopRequestToNM();
@@ -909,12 +921,12 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
- if (event.isPreempted() || event.isDiskFailed()) {
+ if (event.isClusterAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
- getMessage(container, event));
+ getMessage(container, event), event.getTerminationCause());
} else {
container.sendTerminatedToTaskAttempt(container.runningAttempt,
- getMessage(container, event));
+ getMessage(container, event), event.getTerminationCause());
}
container.unregisterAttemptFromListener(container.runningAttempt);
container.registerFailedAttempt(container.runningAttempt);
@@ -929,8 +941,8 @@ public class AMContainerImpl implements AMContainer {
container.unregisterAttemptFromListener(container.runningAttempt);
container.sendTerminatingToTaskAttempt(container.runningAttempt,
- " Container" + container.getContainerId() +
- " received a STOP_REQUEST");
+ " Container" + container.getContainerId() + " received a STOP_REQUEST",
+ TaskAttemptTerminationCause.CONTAINER_STOPPED);
super.transition(container, cEvent);
}
}
@@ -964,7 +976,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.runningAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
- container.getState());
+ container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
}
@@ -978,7 +990,8 @@ public class AMContainerImpl implements AMContainer {
" cannot be allocated to container: " + container.getContainerId() +
" in " + container.getState() + " state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
- container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
+ container.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
@@ -1001,15 +1014,18 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
String diag = event.getDiagnostics();
for (TezTaskAttemptID taId : container.failedAssignments) {
- container.sendTerminatedToTaskAttempt(taId, diag);
+ container.sendTerminatedToTaskAttempt(taId, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
}
if (container.pendingAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag);
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
}
if (container.runningAttempt != null) {
- container.sendTerminatedToTaskAttempt(container.runningAttempt, diag);
+ container.sendTerminatedToTaskAttempt(container.runningAttempt, diag,
+ TaskAttemptTerminationCause.CONTAINER_EXITED);
container.registerFailedAttempt(container.runningAttempt);
container.runningAttempt = null;
}
@@ -1078,12 +1094,11 @@ public class AMContainerImpl implements AMContainer {
+ " in COMPLETED state";
container.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
container.sendTerminatedToTaskAttempt(event.getTaskAttemptId(),
- errorMessage);
+ errorMessage, TaskAttemptTerminationCause.FRAMEWORK_ERROR);
container.registerFailedAttempt(event.getTaskAttemptId());
}
}
-
private void handleExtraTAAssign(
AMContainerEventAssignTA event, TezTaskAttemptID currentTaId) {
this.inError = true;
@@ -1092,8 +1107,10 @@ public class AMContainerImpl implements AMContainer {
". Attempts: " + currentTaId + ", " + event.getTaskAttemptId() +
". Current state: " + this.getState();
this.maybeSendNodeFailureForFailedAssignment(event.getTaskAttemptId());
- this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage);
- this.sendTerminatingToTaskAttempt(currentTaId, errorMessage);
+ this.sendTerminatingToTaskAttempt(event.getTaskAttemptId(), errorMessage,
+ TaskAttemptTerminationCause.FRAMEWORK_ERROR);
+ this.sendTerminatingToTaskAttempt(currentTaId, errorMessage,
+ TaskAttemptTerminationCause.FRAMEWORK_ERROR);
this.registerFailedAttempt(event.getTaskAttemptId());
LOG.warn(errorMessage);
this.logStopped(ContainerExitStatus.INVALID);
@@ -1122,28 +1139,29 @@ public class AMContainerImpl implements AMContainer {
}
protected void sendTerminatedToTaskAttempt(
- TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
+ TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errCause) {
+ sendEvent(new TaskAttemptEventContainerTerminated(taId, message, errCause));
}
protected void sendContainerTerminatedBySystemToTaskAttempt(
- TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message));
+ TezTaskAttemptID taId, String message, TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventContainerTerminatedBySystem(taId, message, errorCause));
}
protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
- String message) {
- sendEvent(new TaskAttemptEventContainerTerminating(taId, message));
+ String message, TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventContainerTerminating(taId, message, errorCause));
}
protected void maybeSendNodeFailureForFailedAssignment(TezTaskAttemptID taId) {
if (this.nodeFailed) {
- this.sendNodeFailureToTA(taId, "Node Failed");
+ this.sendNodeFailureToTA(taId, "Node Failed", TaskAttemptTerminationCause.NODE_FAILED);
}
}
- protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventNodeFailed(taId, message));
+ protected void sendNodeFailureToTA(TezTaskAttemptID taId, String message,
+ TaskAttemptTerminationCause errorCause) {
+ sendEvent(new TaskAttemptEventNodeFailed(taId, message, errorCause));
}
protected void sendStartRequestToNM(ContainerLaunchContext clc) {
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
index 0ae8061..2b21c89 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/events/TaskAttemptFinishedEvent.java
@@ -29,6 +29,7 @@ import org.apache.tez.dag.api.DagTypeConverters;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.recovery.records.RecoveryProtos.TaskAttemptFinishedProto;
@@ -43,21 +44,23 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
private TaskAttemptState state;
private String diagnostics;
private TezCounters tezCounters;
+ private TaskAttemptTerminationCause error;
public TaskAttemptFinishedEvent(TezTaskAttemptID taId,
String vertexName,
long startTime,
long finishTime,
TaskAttemptState state,
- String diagnostics,
- TezCounters counters) {
+ TaskAttemptTerminationCause error,
+ String diagnostics, TezCounters counters) {
this.taskAttemptId = taId;
this.vertexName = vertexName;
this.startTime = startTime;
this.finishTime = finishTime;
this.state = state;
this.diagnostics = diagnostics;
- tezCounters = counters;
+ this.tezCounters = counters;
+ this.error = error;
}
public TaskAttemptFinishedEvent() {
@@ -87,6 +90,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
if (diagnostics != null) {
builder.setDiagnostics(diagnostics);
}
+ if (error != null) {
+ builder.setErrorEnum(error.name());
+ }
if (tezCounters != null) {
builder.setCounters(DagTypeConverters.convertTezCountersToProto(tezCounters));
}
@@ -100,6 +106,9 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
if (proto.hasDiagnostics()) {
this.diagnostics = proto.getDiagnostics();
}
+ if (proto.hasErrorEnum()) {
+ this.error = TaskAttemptTerminationCause.valueOf(proto.getErrorEnum());
+ }
if (proto.hasCounters()) {
this.tezCounters = DagTypeConverters.convertTezCountersFromProto(
proto.getCounters());
@@ -129,6 +138,7 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
+ ", finishTime=" + finishTime
+ ", timeTaken=" + (finishTime - startTime)
+ ", status=" + state.name()
+ + ", errorEnum=" + (error != null ? error.name() : "")
+ ", diagnostics=" + diagnostics
+ ", counters=" + (tezCounters == null ? "null" :
tezCounters.toString()
@@ -146,6 +156,10 @@ public class TaskAttemptFinishedEvent implements HistoryEvent {
public String getDiagnostics() {
return diagnostics;
}
+
+ public TaskAttemptTerminationCause getTaskAttemptError() {
+ return error;
+ }
public long getFinishTime() {
return finishTime;
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
index 8560359..79a0c34 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/history/logging/impl/HistoryEventJsonConversion.java
@@ -18,11 +18,9 @@
package org.apache.tez.dag.history.logging.impl;
-import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.tez.common.ATSConstants;
@@ -485,6 +483,9 @@ public class HistoryEventJsonConversion {
otherInfo.put(ATSConstants.FINISH_TIME, event.getFinishTime());
otherInfo.put(ATSConstants.TIME_TAKEN, (event.getFinishTime() - event.getStartTime()));
otherInfo.put(ATSConstants.STATUS, event.getState().name());
+ if (event.getTaskAttemptError() != null) {
+ otherInfo.put(ATSConstants.TASK_ATTEMPT_ERROR_ENUM, event.getTaskAttemptError().name());
+ }
otherInfo.put(ATSConstants.DIAGNOSTICS, event.getDiagnostics());
otherInfo.put(ATSConstants.COUNTERS,
DAGUtils.convertCountersToJSON(event.getCounters()));
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/main/proto/HistoryEvents.proto
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/proto/HistoryEvents.proto b/tez-dag/src/main/proto/HistoryEvents.proto
index e8f323d..45e9582 100644
--- a/tez-dag/src/main/proto/HistoryEvents.proto
+++ b/tez-dag/src/main/proto/HistoryEvents.proto
@@ -166,6 +166,7 @@ message TaskAttemptFinishedProto {
optional int32 state = 3;
optional string diagnostics = 4;
optional TezCountersProto counters = 5;
+ optional string error_enum = 6;
}
message EventMetaDataProto {
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
index bc15954..8cc2e8b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestPreemption.java
@@ -41,6 +41,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.TaskAttemptStateInternal;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskAttemptImpl;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -200,6 +201,7 @@ public class TestPreemption {
TezTaskAttemptID testTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), i);
TaskAttemptImpl taImpl = dagImpl.getTaskAttempt(testTaId);
Assert.assertEquals(TaskAttemptStateInternal.KILLED, taImpl.getInternalState());
+ Assert.assertEquals(TaskAttemptTerminationCause.EXTERNAL_PREEMPTION, taImpl.getTerminationCause());
}
System.out.println("TestPreemption - Done running - " + info);
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
index 114c44b..38eb934 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestSpeculation.java
@@ -34,6 +34,7 @@ import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
@@ -115,6 +116,7 @@ public class TestSpeculation {
Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
TaskAttempt killedAttempt = task.getAttempt(killedTaId);
Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed as speculative attempt");
+ Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_EFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
tezClient.stop();
}
@@ -154,6 +156,7 @@ public class TestSpeculation {
Assert.assertEquals(successTaId, task.getSuccessfulAttempt().getID());
TaskAttempt killedAttempt = task.getAttempt(killedTaId);
Joiner.on(",").join(killedAttempt.getDiagnostics()).contains("Killed speculative attempt as");
+ Assert.assertEquals(TaskAttemptTerminationCause.TERMINATED_INEFFECTIVE_SPECULATION, killedAttempt.getTerminationCause());
tezClient.stop();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
index 6796d02..29469b1 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttempt.java
@@ -87,6 +87,7 @@ import org.apache.tez.dag.app.rm.AMSchedulerEventTAEnded;
import org.apache.tez.dag.app.rm.AMSchedulerEventTALaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.app.rm.container.ContainerContextMatcher;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -291,9 +292,11 @@ public class TestTaskAttempt {
taImpl.handle(new TaskAttemptEventSchedule(taskAttemptID, 0, 0));
// At state STARTING.
- taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+ taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
// At some KILLING state.
- taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null));
+ taImpl.handle(new TaskAttemptEventKillRequest(taskAttemptID, null,
+ TaskAttemptTerminationCause.TERMINATED_BY_CLIENT));
// taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
// null));
assertFalse(eventHandler.internalError);
@@ -358,7 +361,7 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventsAtRunning)).handle(arg.capture());
taImpl.handle(new TaskAttemptEventContainerTerminating(taskAttemptID,
- "Terminating"));
+ "Terminating", TaskAttemptTerminationCause.APPLICATION_ERROR));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATING",
eventHandler.internalError);
@@ -368,6 +371,7 @@ public class TestTaskAttempt {
assertEquals(1, taImpl.getDiagnostics().size());
assertEquals("Terminating", taImpl.getDiagnostics().get(0));
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 3;
arg = ArgumentCaptor.forClass(Event.class);
@@ -384,13 +388,16 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
- "Terminated"));
+ "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
assertEquals(2, taImpl.getDiagnostics().size());
assertEquals("Terminated", taImpl.getDiagnostics().get(1));
+
+ // check that original error cause is retained
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
}
@@ -445,13 +452,14 @@ public class TestTaskAttempt {
null));
assertEquals("Task attempt is not in running state", taImpl.getState(),
TaskAttemptState.RUNNING);
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated"));
+ taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "Terminated",
+ TaskAttemptTerminationCause.CONTAINER_EXITED));
assertFalse(
"InternalError occurred trying to handle TA_CONTAINER_TERMINATED",
eventHandler.internalError);
assertEquals("Terminated", taImpl.getDiagnostics().get(0));
-
+ assertEquals(TaskAttemptTerminationCause.CONTAINER_EXITED, taImpl.getTerminationCause());
// TODO Ensure TA_TERMINATING after this is ingored.
}
@@ -535,17 +543,18 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID,
- "Terminated"));
+ "Terminated", TaskAttemptTerminationCause.CONTAINER_EXITED));
int expectedEventAfterTerminated = expectedEvenstAfterTerminating + 0;
arg = ArgumentCaptor.forClass(Event.class);
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
// Verify that the diagnostic message included in the Terminated event is not
- // captured - TA already succeeded.
+ // captured - TA already succeeded. Error cause is the default value.
assertEquals(0, taImpl.getDiagnostics().size());
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
}
- @Test//(timeout = 5000)
+ @Test(timeout = 5000)
public void testFailure() throws Exception {
ApplicationId appId = ApplicationId.newInstance(1, 2);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
@@ -607,18 +616,23 @@ public class TestTaskAttempt {
taImpl.handle(new TaskAttemptEventStatusUpdate(taskAttemptID, new TaskStatusUpdateEvent(null, 0.1f)));
- taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0"));
+ taImpl.handle(new TaskAttemptEventAttemptFailed(taskAttemptID, TaskAttemptEventType.TA_FAILED, "0",
+ TaskAttemptTerminationCause.APPLICATION_ERROR));
assertEquals("Task attempt is not in the FAIL_IN_PROGRESS state", taImpl.getInternalState(),
TaskAttemptStateInternal.FAIL_IN_PROGRESS);
assertEquals(1, taImpl.getDiagnostics().size());
assertEquals("0", taImpl.getDiagnostics().get(0));
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
- taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1"));
+ taImpl.handle(new TaskAttemptEventContainerTerminated(taskAttemptID, "1",
+ TaskAttemptTerminationCause.CONTAINER_EXITED));
assertEquals(2, taImpl.getDiagnostics().size());
assertEquals("1", taImpl.getDiagnostics().get(1));
+ // err cause does not change
+ assertEquals(TaskAttemptTerminationCause.APPLICATION_ERROR, taImpl.getTerminationCause());
int expectedEvenstAfterTerminating = expectedEventsAtRunning + 5;
arg = ArgumentCaptor.forClass(Event.class);
@@ -815,8 +829,9 @@ public class TestTaskAttempt {
verify(eventHandler, times(expectedEventAfterTerminated)).handle(arg.capture());
// Verify that the diagnostic message included in the Terminated event is not
- // captured - TA already succeeded.
+ // captured - TA already succeeded. Error cause should be the default value
assertEquals(0, taImpl.getDiagnostics().size());
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -898,7 +913,8 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
- taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned",
+ TaskAttemptTerminationCause.NODE_FAILED));
// Verify in KILLED state
assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED,
taImpl.getState());
@@ -914,6 +930,7 @@ public class TestTaskAttempt {
// Verify still in KILLED state
assertEquals("Task attempt is not in the KILLED state", TaskAttemptState.KILLED,
taImpl.getState());
+ assertEquals(TaskAttemptTerminationCause.NODE_FAILED, taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -995,7 +1012,8 @@ public class TestTaskAttempt {
expectedEvenstAfterTerminating), DAGEventCounterUpdate.class, 1);
// Send out a Node Failure.
- taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned"));
+ taImpl.handle(new TaskAttemptEventNodeFailed(taskAttemptID, "NodeDecomissioned",
+ TaskAttemptTerminationCause.NODE_FAILED));
// Verify no additional events
int expectedEventsNodeFailure = expectedEvenstAfterTerminating + 0;
@@ -1005,6 +1023,8 @@ public class TestTaskAttempt {
// Verify still in SUCCEEDED state
assertEquals("Task attempt is not in the SUCCEEDED state", TaskAttemptState.SUCCEEDED,
taImpl.getState());
+ // error cause remains as default value
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
}
@Test(timeout = 5000)
@@ -1083,6 +1103,8 @@ public class TestTaskAttempt {
taImpl.handle(new TaskAttemptEventOutputFailed(taskAttemptID, tzEvent, 4));
assertEquals("Task attempt is not in succeeded state", taImpl.getState(),
TaskAttemptState.SUCCEEDED);
+ // default value of error cause
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, taImpl.getTerminationCause());
// different destination attempt reports error. now threshold crossed
TezTaskAttemptID mockDestId2 = mock(TezTaskAttemptID.class);
@@ -1091,6 +1113,7 @@ public class TestTaskAttempt {
assertEquals("Task attempt is not in FAILED state", taImpl.getState(),
TaskAttemptState.FAILED);
+ assertEquals(TaskAttemptTerminationCause.OUTPUT_LOST, taImpl.getTerminationCause());
assertEquals(true, taImpl.inputFailedReported);
int expectedEventsAfterFetchFailure = expectedEventsTillSucceeded + 2;
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
index 143268b..100e8d9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskAttemptRecovery.java
@@ -46,6 +46,7 @@ import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEventTAUpdate;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.events.TaskAttemptStartedEvent;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.junit.Before;
@@ -87,10 +88,15 @@ public class TestTaskAttemptRecovery {
private void restoreFromTAFinishedEvent(TaskAttemptState state) {
String diag = "test_diag";
TezCounters counters = mock(TezCounters.class);
+
+ TaskAttemptTerminationCause errorEnum = null;
+ if (state != TaskAttemptState.SUCCEEDED) {
+ errorEnum = TaskAttemptTerminationCause.APPLICATION_ERROR;
+ }
TaskAttemptState recoveredState =
ta.restoreFromEvent(new TaskAttemptFinishedEvent(taId, vertexName,
- startTime, finishTime, state, diag, counters));
+ startTime, finishTime, state, errorEnum, diag, counters));
assertEquals(startTime, ta.getLaunchTime());
assertEquals(finishTime, ta.getFinishTime());
assertEquals(counters, ta.reportedStatus.counters);
@@ -99,6 +105,11 @@ public class TestTaskAttemptRecovery {
assertEquals(1, ta.getDiagnostics().size());
assertEquals(diag, ta.getDiagnostics().get(0));
assertEquals(state, recoveredState);
+ if (state != TaskAttemptState.SUCCEEDED) {
+ assertEquals(errorEnum, ta.getTerminationCause());
+ } else {
+ assertEquals(TaskAttemptTerminationCause.UNKNOWN_ERROR, ta.getTerminationCause());
+ }
}
private void verifyEvents(List<Event> events, Class<? extends Event> eventClass,
http://git-wip-us.apache.org/repos/asf/tez/blob/81eef37d/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
index 88fa83d..e363dbd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/dag/impl/TestTaskImpl.java
@@ -54,7 +54,6 @@ import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.StateChangeNotifier;
import org.apache.tez.dag.app.dag.TaskStateInternal;
-import org.apache.tez.dag.app.dag.TaskTerminationCause;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventDiagnosticsUpdate;
import org.apache.tez.dag.app.dag.event.TaskEvent;
@@ -64,6 +63,7 @@ import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.node.AMNodeEventType;
+import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
@@ -173,7 +173,7 @@ public class TestTaskImpl {
}
private void killTask(TezTaskID taskId) {
- mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertTaskKillWaitState();
}
@@ -553,18 +553,18 @@ public class TestTaskImpl {
@Test
public void testDiagnostics_KillNew(){
TezTaskID taskId = getNewTaskID();
- mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.DAG_KILL));
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_BY_CLIENT, null));
assertEquals(1, mockTask.getDiagnostics().size());
- assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.DAG_KILL.name()));
+ assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_BY_CLIENT.name()));
}
@Test
public void testDiagnostics_Kill(){
TezTaskID taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
- mockTask.handle(new TaskEventTermination(taskId, TaskTerminationCause.OTHER_TASK_FAILURE));
+ mockTask.handle(new TaskEventTermination(taskId, TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN, null));
assertEquals(1, mockTask.getDiagnostics().size());
- assertTrue(mockTask.getDiagnostics().get(0).contains(TaskTerminationCause.OTHER_TASK_FAILURE.name()));
+ assertTrue(mockTask.getDiagnostics().get(0).contains(TaskAttemptTerminationCause.TERMINATED_AT_SHUTDOWN.name()));
}
// TODO Add test to validate the correct commit attempt.