You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/23 01:33:37 UTC
[3/4] tez git commit: TEZ-1929. pre-empted tasks should be marked as
killed instead of failed (bikas) (cherry picked from commit
6ba1339d57a5d05fa14f35f352076131bffea483) (cherry picked from commit
8e5d18e63e6c98aa4e89f568d159b3c1903b1dae)
TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas)
(cherry picked from commit 6ba1339d57a5d05fa14f35f352076131bffea483)
(cherry picked from commit 8e5d18e63e6c98aa4e89f568d159b3c1903b1dae)
Conflicts:
CHANGES.txt
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c30d261
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c30d261
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c30d261
Branch: refs/heads/branch-0.5
Commit: 9c30d2612aba8e280bc2b84d0a3e925a8f9baf5f
Parents: 84a7ef0
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 16:40:11 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 22 14:46:20 2015 -0700
----------------------------------------------------------------------
CHANGES.txt | 1 +
.../rm/container/AMContainerEventCompleted.java | 5 +--
.../dag/app/rm/container/AMContainerImpl.java | 4 +--
.../tez/dag/app/TestMockDAGAppMaster.java | 37 ++++++++++++++++++++
.../app/rm/TestTaskSchedulerEventHandler.java | 2 +-
.../dag/app/rm/container/TestAMContainer.java | 6 ++--
6 files changed, 47 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cbc6751..986ab9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
ALL CHANGES:
+ TEZ-1929. pre-empted tasks should be marked as killed instead of failed
TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
history record
TEZ-2203. Intern strings in tez counters
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index a455f1e..9bb6d7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -37,14 +37,15 @@ public class AMContainerEventCompleted extends AMContainerEvent {
}
public boolean isPreempted() {
- return (exitStatus == ContainerExitStatus.PREEMPTED);
+ return (exitStatus == ContainerExitStatus.PREEMPTED ||
+ errCause == TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
}
public boolean isDiskFailed() {
return (exitStatus == ContainerExitStatus.DISKS_FAILED);
}
- public boolean isClusterAction() {
+ public boolean isSystemAction() {
return isPreempted() || isDiskFailed();
}
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d4f46b..3dc4bec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -662,7 +662,7 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
- if (event.isClusterAction()) {
+ if (event.isSystemAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
errorMessage, event.getTerminationCause());
} else {
@@ -921,7 +921,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
- if (event.isClusterAction()) {
+ if (event.isSystemAction()) {
container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
getMessage(container, event), event.getTerminationCause());
} else {
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 8650aea..16f0f7e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -35,10 +35,16 @@ import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
import org.junit.Assert;
import org.junit.Test;
@@ -102,6 +108,37 @@ public class TestMockDAGAppMaster {
tezClient.stop();
}
+ @Test (timeout = 5000)
+ public void testInternalPreemption() throws Exception {
+ TezConfiguration tezconf = new TezConfiguration(defaultConf);
+
+ MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+ tezClient.start();
+
+ MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+ MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+ mockLauncher.startScheduling(false);
+ // there is only 1 task whose first attempt will be preempted
+ DAG dag = DAG.create("test");
+ Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
+ dag.addVertex(vA);
+
+ DAGClient dagClient = tezClient.submitDAG(dag);
+ mockLauncher.waitTillContainersLaunched();
+ ContainerData cData = mockLauncher.getContainers().values().iterator().next();
+ DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+ mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId);
+
+ mockLauncher.startScheduling(true);
+ dagClient.waitForCompletion();
+ Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+ TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+ TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+ TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId);
+ Assert.assertEquals(TaskAttemptState.KILLED, killedTa.getState());
+ tezClient.stop();
+ }
+
@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
Map<String, LocalResource> lrDAG = Maps.newHashMap();
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index d2dece3..62618cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -204,7 +204,7 @@ public class TestTaskSchedulerEventHandler {
AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
Assert.assertEquals(mockCId, completedEvent.getContainerId());
Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
- Assert.assertFalse(completedEvent.isPreempted());
+ Assert.assertTrue(completedEvent.isPreempted());
Assert.assertFalse(completedEvent.isDiskFailed());
Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
completedEvent.getTerminationCause());
http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index f273896..438c50d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -704,10 +704,10 @@ public class TestAMContainer {
verify(wc.chh).unregister(wc.containerID);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
- Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
- ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
- TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+ TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+ Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+ ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
assertFalse(wc.amContainer.isInErrorState());