You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/12 18:45:19 UTC

git commit: TEZ-675. Pre-empted taskAttempt gets marked as FAILED instead of KILLED

Updated Branches:
  refs/heads/master 18da4e29c -> 008912262


TEZ-675. Pre-empted taskAttempt gets marked as FAILED instead of KILLED


Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/00891226
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/00891226
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/00891226

Branch: refs/heads/master
Commit: 00891226217ac95e3f7b2fe9462a77145369d311
Parents: 18da4e2
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Dec 12 09:45:08 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Dec 12 09:45:08 2013 -0800

----------------------------------------------------------------------
 .../event/TaskAttemptEventAttemptKilled.java    |  36 ++++++
 .../dag/app/dag/event/TaskAttemptEventType.java |   1 +
 .../tez/dag/app/dag/impl/TaskAttemptImpl.java   |  19 ++-
 .../apache/tez/dag/app/rm/TaskScheduler.java    | 121 ++++++++++---------
 .../app/rm/TaskSchedulerAppCallbackWrapper.java |  21 ++++
 .../dag/app/rm/TaskSchedulerEventHandler.java   |   9 ++
 .../rm/container/AMContainerEventCompleted.java |  11 ++
 .../dag/app/rm/container/AMContainerImpl.java   |  30 ++++-
 .../tez/dag/app/rm/TestTaskScheduler.java       |  14 ++-
 .../dag/app/rm/TestTaskSchedulerHelpers.java    |   6 +
 .../dag/app/rm/container/TestAMContainer.java   |  80 ++++++++----
 11 files changed, 258 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
new file mode 100644
index 0000000..d1fd9f5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent 
+  implements DiagnosableEvent {
+
+  private final String diagnostics;
+  public TaskAttemptEventAttemptKilled(TezTaskAttemptID id, String diagnostics) {
+    super(id, TaskAttemptEventType.TA_KILLED);
+    this.diagnostics = diagnostics;
+  }
+
+  @Override
+  public String getDiagnosticInfo() {
+    return diagnostics;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 5210e33..d66b8ec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -34,6 +34,7 @@ public enum TaskAttemptEventType {
   TA_COMMIT_PENDING,
   TA_DONE,
   TA_FAILED,
+  TA_KILLED,
   TA_TIMED_OUT,
   
 //Producer: Client

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 28b8809..2a5e577 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -183,6 +183,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
         .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+        .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
 
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -195,6 +196,7 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
+        .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
         .addTransition(TaskAttemptStateInternal.RUNNING, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
 
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
@@ -209,21 +211,22 @@ public class TaskAttemptImpl implements TaskAttempt,
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+        .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
         .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.OUTPUT_CONSUMABLE), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
 
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
         .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
+        .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
 
         .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
         .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
+        .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
 
         .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
+        .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
 
         .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
-        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
+        .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
 
         // How will duplicate history events be handled ?
         // TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
@@ -1025,6 +1028,10 @@ public class TaskAttemptImpl implements TaskAttempt,
     public ContainerCompletedBeforeRunningTransition() {
       super(FAILED_HELPER);
     }
+    
+    public ContainerCompletedBeforeRunningTransition(TerminatedTransitionHelper helper) {
+      super(helper);
+    }
 
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
@@ -1113,6 +1120,10 @@ public class TaskAttemptImpl implements TaskAttempt,
     public ContainerCompletedWhileRunningTransition() {
       super(FAILED_HELPER);
     }
+    
+    public ContainerCompletedWhileRunningTransition(TerminatedTransitionHelper helper) {
+      super(helper);
+    }
 
     @Override
     public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index fd62a75..f96c541 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -110,6 +110,7 @@ public class TaskScheduler extends AbstractService
                                 );
     public void onError(Throwable t);
     public float getProgress();
+    public void preemptContainer(ContainerId containerId);
     public AppFinalStatus getFinalAppStatus();
   }
 
@@ -888,67 +889,75 @@ public class TaskScheduler extends AbstractService
     return null;
   }
 
-  synchronized void preemptIfNeeded() {
-    Resource freeResources = Resources.subtract(totalResources,
-      allocatedResources);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
-        " cpu:" + allocatedResources.getVirtualCores() + 
-        " delayedContainers: " + delayedContainerManager.delayedContainers.size());
-    }
-    assert freeResources.getMemory() >= 0;
-    
-    if (delayedContainerManager.delayedContainers.size() > 0) {
-      // if we are holding onto containers then nothing to preempt from outside
-      return;
-    }
-
-    CookieContainerRequest highestPriRequest = null;
-    for(CookieContainerRequest request : taskRequests.values()) {
-      if(highestPriRequest == null) {
-        highestPriRequest = request;
-      } else if(isHigherPriority(request.getPriority(),
-                                   highestPriRequest.getPriority())){
-        highestPriRequest = request;
+  void preemptIfNeeded() {
+    ContainerId preemptedContainer = null;
+    synchronized (this) {
+      Resource freeResources = Resources.subtract(totalResources,
+        allocatedResources);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
+          " cpu:" + allocatedResources.getVirtualCores() + 
+          " delayedContainers: " + delayedContainerManager.delayedContainers.size());
       }
-    }
-    if(highestPriRequest != null &&
-       !fitsIn(highestPriRequest.getCapability(), freeResources)) {
-      // highest priority request will not fit in existing free resources
-      // free up some more
-      // TODO this is subject to error wrt RM resource normalization
-      Map.Entry<Object, Container> preemptedEntry = null;
-      for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
-        HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
-        CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
-        Priority taskPriority = lastTaskInfo.getPriority();
-        Object signature = lastTaskInfo.getCookie().getContainerSignature();
-        if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
-          // higher or same priority
-          continue;
+      assert freeResources.getMemory() >= 0;
+      
+      if (delayedContainerManager.delayedContainers.size() > 0) {
+        // if we are holding onto containers then nothing to preempt from outside
+        return;
+      }
+  
+      CookieContainerRequest highestPriRequest = null;
+      for(CookieContainerRequest request : taskRequests.values()) {
+        if(highestPriRequest == null) {
+          highestPriRequest = request;
+        } else if(isHigherPriority(request.getPriority(),
+                                     highestPriRequest.getPriority())){
+          highestPriRequest = request;
         }
-        if (containerSignatureMatcher.isExactMatch(
-            highestPriRequest.getCookie().getContainerSignature(),
-            signature)) {
-          // exact match with different priorities
-          continue;
+      }
+      if(highestPriRequest != null &&
+         !fitsIn(highestPriRequest.getCapability(), freeResources)) {
+        // highest priority request will not fit in existing free resources
+        // free up some more
+        // TODO this is subject to error wrt RM resource normalization
+        Map.Entry<Object, Container> preemptedEntry = null;
+        for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+          HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
+          CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
+          Priority taskPriority = lastTaskInfo.getPriority();
+          Object signature = lastTaskInfo.getCookie().getContainerSignature();
+          if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
+            // higher or same priority
+            continue;
+          }
+          if (containerSignatureMatcher.isExactMatch(
+              highestPriRequest.getCookie().getContainerSignature(),
+              signature)) {
+            // exact match with different priorities
+            continue;
+          }
+          if(preemptedEntry == null ||
+             !isHigherPriority(taskPriority, 
+                 preemptedEntry.getValue().getPriority())) {
+            // keep the lower priority or the one added later
+            preemptedEntry = entry;
+          }
         }
-        if(preemptedEntry == null ||
-           !isHigherPriority(taskPriority, 
-               preemptedEntry.getValue().getPriority())) {
-          // keep the lower priority or the one added later
-          preemptedEntry = entry;
+        if(preemptedEntry != null) {
+          // found something to preempt
+          LOG.info("Preempting task: " + preemptedEntry.getKey() +
+              " to free resource for request: " + highestPriRequest +
+              " . Current free resources: " + freeResources);
+          preemptedContainer = preemptedEntry.getValue().getId();
+          // app client will be notified when after container is killed
+          // and we get its completed container status
         }
       }
-      if(preemptedEntry != null) {
-        // found something to preempt
-        LOG.info("Preempting task: " + preemptedEntry.getKey() +
-            " to free resource for request: " + highestPriRequest +
-            " . Current free resources: " + freeResources);
-        deallocateContainer(preemptedEntry.getValue().getId());
-        // app client will be notified when after container is killed
-        // and we get its completed container status
-      }
+    }
+    
+    // upcall outside locks
+    if (preemptedContainer != null) {
+      appClientDelegate.preemptContainer(preemptedContainer);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index 1f11a03..2debf06 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -119,6 +119,11 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
       throw new TezUncheckedException(e);
     }
   }
+  
+  @Override
+  public void preemptContainer(ContainerId containerId) {
+    completionService.submit(new PreemptContainerCallable(real, containerId));
+  }
 
   @Override
   public AppFinalStatus getFinalAppStatus() {
@@ -271,6 +276,22 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
     }
   }
 
+  static class PreemptContainerCallable extends TaskSchedulerAppCallbackBase 
+      implements Callable<Void> {
+    private final ContainerId containerId;
+    
+    public PreemptContainerCallable(TaskSchedulerAppCallback app, ContainerId id) {
+      super(app);
+      this.containerId = id;
+    }
+    
+    @Override
+    public Void call() throws Exception {
+      app.preemptContainer(containerId);
+      return null;
+    }
+  }
+  
   static class GetProgressCallable extends TaskSchedulerAppCallbackBase
       implements Callable<Float> {
 

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 3d80d0c..031d952 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -516,4 +517,12 @@ public class TaskSchedulerEventHandler extends AbstractService
   public void dagCompleted() {
     taskScheduler.resetMatchLocalityForAllHeldContainers();
   }
+
+  @Override
+  public void preemptContainer(ContainerId containerId) {
+    taskScheduler.deallocateContainer(containerId);
+    // Inform the Containers about completion.
+    sendEvent(new AMContainerEventCompleted(ContainerStatus.newInstance(
+        containerId, ContainerState.COMPLETE, "Container Preempted Internally", -1), true));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index fb19559..a6bcc14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -23,14 +23,25 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
 public class AMContainerEventCompleted extends AMContainerEvent {
 
   private final ContainerStatus containerStatus;
+  private final boolean isPreempted;
 
   public AMContainerEventCompleted(ContainerStatus containerStatus) {
+    this(containerStatus, false);
+  }
+  
+  public AMContainerEventCompleted(ContainerStatus containerStatus, 
+      boolean isPreempted) {
     super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
     this.containerStatus = containerStatus;
+    this.isPreempted = isPreempted;
   }
 
   public ContainerStatus getContainerStatus() {
     return this.containerStatus;
   }
+  
+  public boolean isPreempted() {
+    return isPreempted;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 36c34ec..486584c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -45,6 +45,7 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
 import org.apache.tez.dag.app.ContainerContext;
 import org.apache.tez.dag.app.TaskAttemptListener;
 import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
 import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
@@ -482,8 +483,13 @@ public class AMContainerImpl implements AMContainer {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
       if (container.pendingAttempt != null) {
         String errorMessage = getMessage(container, event);
-        container.sendTerminatedToTaskAttempt(container.pendingAttempt,
-            errorMessage);
+        if (event.isPreempted()) {
+          container.sendKilledToTaskAttempt(container.pendingAttempt,
+              errorMessage);
+        } else {
+          container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+              errorMessage);
+        }
         container.registerFailedAttempt(container.pendingAttempt);
         container.pendingAttempt = null;
         LOG.warn(errorMessage);
@@ -500,7 +506,8 @@ public class AMContainerImpl implements AMContainer {
     public String getMessage(AMContainerImpl container,
         AMContainerEventCompleted event) {
       return "Container" + container.getContainerId()
-          + " COMPLETED while trying to launch. Diagnostics: ["
+          + (event.isPreempted() ? " PREEMPTED" : " COMPLETED")
+          + " while trying to launch. Diagnostics: ["
           + event.getContainerStatus().getDiagnostics() +"]";
     }
   }
@@ -645,7 +652,8 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public String getMessage(
         AMContainerImpl container, AMContainerEventCompleted event) {
-      return "Container " + container.getContainerId() + " COMPLETED"
+      return "Container " + container.getContainerId()
+          + (event.isPreempted() ? " PREEMPTED" : " COMPLETED")
           + " with diagnostics set to ["
           + event.getContainerStatus().getDiagnostics() + "]";
     }
@@ -717,8 +725,13 @@ public class AMContainerImpl implements AMContainer {
     @Override
     public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
       AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
-      container.sendTerminatedToTaskAttempt(container.runningAttempt,
-          getMessage(container, event));
+      if (event.isPreempted()) {
+        container.sendKilledToTaskAttempt(container.runningAttempt,
+            getMessage(container, event));
+      } else {
+        container.sendTerminatedToTaskAttempt(container.runningAttempt,
+            getMessage(container, event));
+      }
       container.unregisterAttemptFromListener(container.runningAttempt);
       container.registerFailedAttempt(container.runningAttempt);
       container.runningAttempt = null;
@@ -920,6 +933,11 @@ public class AMContainerImpl implements AMContainer {
       TezTaskAttemptID taId, String message) {
     sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
   }
+  
+  protected void sendKilledToTaskAttempt(
+    TezTaskAttemptID taId, String message) {
+      sendEvent(new TaskAttemptEventAttemptKilled(taId, message));
+  }
 
   protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
       String message) {

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index cc7bbae..c8e8d1b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -675,7 +675,7 @@ public class TestTaskScheduler {
     scheduler.close();
   }
 
-  @SuppressWarnings("unchecked")
+  @SuppressWarnings({ "unchecked", "rawtypes" })
   @Test
   public void testTaskSchedulerPreemption() throws Exception {
     RackResolver.init(new YarnConfiguration());
@@ -689,7 +689,7 @@ public class TestTaskScheduler {
     String appHost = "host";
     int appPort = 0;
     String appUrl = "url";
-    TaskSchedulerWithDrainableAppCallback scheduler =
+    final TaskSchedulerWithDrainableAppCallback scheduler =
       new TaskSchedulerWithDrainableAppCallback(
         mockApp, new PreemptionMatcher(), appHost, appPort,
         appUrl, mockRMClient, mockAppContext);
@@ -830,6 +830,16 @@ public class TestTaskScheduler {
           }
 
         });
+    
+    Mockito.doAnswer(new Answer() {
+      public Object answer(InvocationOnMock invocation) {
+          Object[] args = invocation.getArguments();
+          ContainerId cId = (ContainerId) args[0];
+          scheduler.deallocateContainer(cId);
+          return null;
+      }})
+    .when(mockApp).preemptContainer((ContainerId)any());
+    
     scheduler.onContainersAllocated(containers);
     drainableAppCallback.drain();
     Assert.assertEquals(3072, scheduler.allocatedResources.getMemory());

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 9557456..99158d0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -286,6 +286,12 @@ class TestTaskSchedulerHelpers {
       return real.getFinalAppStatus();
     }
 
+    @Override
+    public void preemptContainer(ContainerId cId) {
+      invocations++;
+      real.preemptContainer(cId);
+    }
+
     public void drain() throws InterruptedException, ExecutionException {
       while (completedEvents < invocations) {
         Future f = completionService.poll(5000l, TimeUnit.MILLISECONDS);

http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fd1900f..18cdcfd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -120,7 +120,7 @@ public class TestAMContainer {
     assertNull(wc.amContainer.getRunningTaskAttempt());
     verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     // 1 Scheduler completed event.
     wc.verifyCountAndGetOutgoingEvents(1);
@@ -176,7 +176,7 @@ public class TestAMContainer {
     assertNull(wc.amContainer.getRunningTaskAttempt());
     verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     // 1 Scheduler completed event.
     wc.verifyCountAndGetOutgoingEvents(1);
@@ -210,7 +210,7 @@ public class TestAMContainer {
     wc.verifyState(AMContainerState.STOPPING);
     wc.verifyNoOutgoingEvents();
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     // 1 Scheduler completed event.
     wc.verifyCountAndGetOutgoingEvents(1);
@@ -249,7 +249,7 @@ public class TestAMContainer {
     assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     // 1 Scheduler completed event.
     wc.verifyCountAndGetOutgoingEvents(1);
@@ -288,7 +288,7 @@ public class TestAMContainer {
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -328,7 +328,7 @@ public class TestAMContainer {
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -367,7 +367,7 @@ public class TestAMContainer {
     assertTrue(wc.amContainer.isInErrorState());
 
     wc.nmStopSent();
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     // 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -403,7 +403,7 @@ public class TestAMContainer {
         NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
     // TODO Should this be an RM DE-ALLOCATE instead ?
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -435,7 +435,7 @@ public class TestAMContainer {
         TaskAttemptEventType.TA_CONTAINER_TERMINATING,
         AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -452,7 +452,7 @@ public class TestAMContainer {
     List<Event> outgoingEvents;
     wc.verifyState(AMContainerState.ALLOCATED);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -474,7 +474,7 @@ public class TestAMContainer {
 
     wc.assignTaskAttempt(wc.taskAttemptID);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -505,7 +505,7 @@ public class TestAMContainer {
     wc.containerLaunched();
     wc.verifyState(AMContainerState.IDLE);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -541,7 +541,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.verifyState(AMContainerState.RUNNING);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
     verify(wc.tal).registerRunningContainer(wc.containerID);
     verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -566,6 +566,42 @@ public class TestAMContainer {
 
   @SuppressWarnings("rawtypes")
   @Test
+  public void testContainerPreemptedAtRunning() {
+    WrappedContainer wc = new WrappedContainer();
+    List<Event> outgoingEvents;
+
+    wc.launchContainer();
+
+    wc.assignTaskAttempt(wc.taskAttemptID);
+    wc.containerLaunched();
+    wc.pullTaskToRun();
+    wc.verifyState(AMContainerState.RUNNING);
+
+    wc.containerCompleted(true);
+    wc.verifyState(AMContainerState.COMPLETED);
+    verify(wc.tal).registerRunningContainer(wc.containerID);
+    verify(wc.tal).unregisterRunningContainer(wc.containerID);
+    verify(wc.chh).register(wc.containerID);
+    verify(wc.chh).unregister(wc.containerID);
+
+    outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+    verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+        AMSchedulerEventType.S_CONTAINER_COMPLETED,
+        TaskAttemptEventType.TA_KILLED);
+
+    assertFalse(wc.amContainer.isInErrorState());
+
+    // Pending task complete. (Ideally, container should be dead at this point
+    // and this event should not be generated. Network timeout on NM-RM heartbeat
+    // can cause it to be genreated)
+    wc.taskAttemptSucceeded(wc.taskAttemptID);
+    wc.verifyNoOutgoingEvents();
+
+    assertFalse(wc.amContainer.isInErrorState());
+  }
+  
+  @SuppressWarnings("rawtypes")
+  @Test
   public void testTaskAssignedToCompletedContainer() {
     WrappedContainer wc = new WrappedContainer();
     List<Event> outgoingEvents;
@@ -576,7 +612,7 @@ public class TestAMContainer {
     wc.pullTaskToRun();
     wc.taskAttemptSucceeded(wc.taskAttemptID);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
 
     TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
@@ -634,7 +670,7 @@ public class TestAMContainer {
       }
     }
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -680,7 +716,7 @@ public class TestAMContainer {
 
     assertFalse(wc.amContainer.isInErrorState());
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
@@ -724,7 +760,7 @@ public class TestAMContainer {
       }
     }
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -754,7 +790,7 @@ public class TestAMContainer {
     wc.taskAttemptSucceeded(taID2);
     wc.stopRequest();
     wc.nmStopSent();
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
 
     wc.nodeFailed();
@@ -786,14 +822,14 @@ public class TestAMContainer {
     wc.taskAttemptSucceeded(taID2);
     wc.stopRequest();
     wc.nmStopSent();
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyState(AMContainerState.COMPLETED);
 
     outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
     verifyUnOrderedOutgoingEventTypes(outgoingEvents,
         AMSchedulerEventType.S_CONTAINER_COMPLETED);
 
-    wc.containerCompleted();
+    wc.containerCompleted(false);
     wc.verifyNoOutgoingEvents();
   }
 
@@ -939,11 +975,11 @@ public class TestAMContainer {
           AMContainerEventType.C_NM_STOP_FAILED));
     }
 
-    public void containerCompleted() {
+    public void containerCompleted(boolean preempted) {
       reset(eventHandler);
       ContainerStatus cStatus = ContainerStatus.newInstance(containerID,
           ContainerState.COMPLETE, "", 100);
-      amContainer.handle(new AMContainerEventCompleted(cStatus));
+      amContainer.handle(new AMContainerEventCompleted(cStatus, preempted));
     }
 
     public void containerTimedOut() {