You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2015/09/23 01:33:37 UTC

[3/4] tez git commit: TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas) (cherry picked from commit 6ba1339d57a5d05fa14f35f352076131bffea483) (cherry picked from commit 8e5d18e63e6c98aa4e89f568d159b3c1903b1dae)

TEZ-1929. pre-empted tasks should be marked as killed instead of failed (bikas)
(cherry picked from commit 6ba1339d57a5d05fa14f35f352076131bffea483)
(cherry picked from commit 8e5d18e63e6c98aa4e89f568d159b3c1903b1dae)

Conflicts:
	CHANGES.txt


Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/9c30d261
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/9c30d261
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/9c30d261

Branch: refs/heads/branch-0.5
Commit: 9c30d2612aba8e280bc2b84d0a3e925a8f9baf5f
Parents: 84a7ef0
Author: Bikas Saha <bi...@apache.org>
Authored: Fri Jan 30 16:40:11 2015 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Tue Sep 22 14:46:20 2015 -0700

----------------------------------------------------------------------
 CHANGES.txt                                     |  1 +
 .../rm/container/AMContainerEventCompleted.java |  5 +--
 .../dag/app/rm/container/AMContainerImpl.java   |  4 +--
 .../tez/dag/app/TestMockDAGAppMaster.java       | 37 ++++++++++++++++++++
 .../app/rm/TestTaskSchedulerEventHandler.java   |  2 +-
 .../dag/app/rm/container/TestAMContainer.java   |  6 ++--
 6 files changed, 47 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/CHANGES.txt
----------------------------------------------------------------------
diff --git a/CHANGES.txt b/CHANGES.txt
index cbc6751..986ab9b 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -7,6 +7,7 @@ INCOMPATIBLE CHANGES
   TEZ-2552. CRC errors can cause job to run for very long time in large jobs.
 
 ALL CHANGES:
+  TEZ-1929. pre-empted tasks should be marked as killed instead of failed
   TEZ-1773. Add attempt failure cause enum to the attempt failed/killed
   history record
   TEZ-2203. Intern strings in tez counters

http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index a455f1e..9bb6d7f 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -37,14 +37,15 @@ public class AMContainerEventCompleted extends AMContainerEvent {
   }
 
   public boolean isPreempted() {
-    return (exitStatus == ContainerExitStatus.PREEMPTED);
+    return (exitStatus == ContainerExitStatus.PREEMPTED || 
+        errCause == TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
   }
   
   public boolean isDiskFailed() {
     return (exitStatus == ContainerExitStatus.DISKS_FAILED);
   }
   
-  public boolean isClusterAction() {
+  public boolean isSystemAction() {
     return isPreempted() || isDiskFailed();
   }
   

http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 9d4f46b..3dc4bec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -662,7 +662,7 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       if (container.pendingAttempt != null) {
         String errorMessage = getMessage(container, event);
-        if (event.isClusterAction()) {
+        if (event.isSystemAction()) {
           container.sendContainerTerminatedBySystemToTaskAttempt(container.pendingAttempt,
               errorMessage, event.getTerminationCause());
         } else {
@@ -921,7 +921,7 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
-      if (event.isClusterAction()) {
+      if (event.isSystemAction()) {
         container.sendContainerTerminatedBySystemToTaskAttempt(container.runningAttempt,
             getMessage(container, event), event.getTerminationCause());
       } else {

http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
index 8650aea..16f0f7e 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestMockDAGAppMaster.java
@@ -35,10 +35,16 @@ import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.client.DAGClient;
 import org.apache.tez.dag.api.client.DAGStatus;
+import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
 import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
 import org.apache.tez.dag.app.dag.DAGState;
+import org.apache.tez.dag.app.dag.TaskAttempt;
 import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
+import org.apache.tez.dag.app.dag.impl.DAGImpl;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+import org.apache.tez.dag.records.TezTaskID;
+import org.apache.tez.dag.records.TezVertexID;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -102,6 +108,37 @@ public class TestMockDAGAppMaster {
     tezClient.stop();
   }
   
+  @Test (timeout = 5000)
+  public void testInternalPreemption() throws Exception {
+    TezConfiguration tezconf = new TezConfiguration(defaultConf);
+    
+    MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null);
+    tezClient.start();
+    
+    MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
+    MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
+    mockLauncher.startScheduling(false);
+    // there is only 1 task whose first attempt will be preempted
+    DAG dag = DAG.create("test");
+    Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
+    dag.addVertex(vA);
+
+    DAGClient dagClient = tezClient.submitDAG(dag);
+    mockLauncher.waitTillContainersLaunched();
+    ContainerData cData = mockLauncher.getContainers().values().iterator().next();
+    DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
+    mockApp.getTaskSchedulerEventHandler().preemptContainer(cData.cId);
+    
+    mockLauncher.startScheduling(true);
+    dagClient.waitForCompletion();
+    Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
+    TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
+    TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
+    TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId);
+    Assert.assertEquals(TaskAttemptState.KILLED, killedTa.getState());
+    tezClient.stop();
+  }
+  
   @Test (timeout = 10000)
   public void testMultipleSubmissions() throws Exception {
     Map<String, LocalResource> lrDAG = Maps.newHashMap();

http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
index d2dece3..62618cc 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerEventHandler.java
@@ -204,7 +204,7 @@ public class TestTaskSchedulerEventHandler {
     AMContainerEventCompleted completedEvent = (AMContainerEventCompleted) event;
     Assert.assertEquals(mockCId, completedEvent.getContainerId());
     Assert.assertEquals("Container preempted internally", completedEvent.getDiagnostics());
-    Assert.assertFalse(completedEvent.isPreempted());
+    Assert.assertTrue(completedEvent.isPreempted());
     Assert.assertFalse(completedEvent.isDiskFailed());
     Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
         completedEvent.getTerminationCause());

http://git-wip-us.apache.org/repos/asf/tez/blob/9c30d261/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index f273896..438c50d 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -704,10 +704,10 @@ public class TestAMContainer {
     verify(wc.chh).unregister(wc.containerID);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
-    Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
-        ((TaskAttemptEventContainerTerminated)outgoingEvents.get(0)).getTerminationCause());
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
-        TaskAttemptEventType.TA_CONTAINER_TERMINATED);
+        TaskAttemptEventType.TA_CONTAINER_TERMINATED_BY_SYSTEM);
+    Assert.assertEquals(TaskAttemptTerminationCause.INTERNAL_PREEMPTION,
+        ((TaskAttemptEventContainerTerminatedBySystem)outgoingEvents.get(0)).getTerminationCause());
 
     assertFalse(wc.amContainer.isInErrorState());