You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/12 18:45:19 UTC
git commit: TEZ-675. Pre-empted taskAttempt gets marked as FAILED
instead of KILLED
Updated Branches:
refs/heads/master 18da4e29c -> 008912262
TEZ-675. Pre-empted taskAttempt gets marked as FAILED instead of KILLED
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/00891226
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/00891226
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/00891226
Branch: refs/heads/master
Commit: 00891226217ac95e3f7b2fe9462a77145369d311
Parents: 18da4e2
Author: Bikas Saha <bi...@apache.org>
Authored: Thu Dec 12 09:45:08 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Thu Dec 12 09:45:08 2013 -0800
----------------------------------------------------------------------
.../event/TaskAttemptEventAttemptKilled.java | 36 ++++++
.../dag/app/dag/event/TaskAttemptEventType.java | 1 +
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 19 ++-
.../apache/tez/dag/app/rm/TaskScheduler.java | 121 ++++++++++---------
.../app/rm/TaskSchedulerAppCallbackWrapper.java | 21 ++++
.../dag/app/rm/TaskSchedulerEventHandler.java | 9 ++
.../rm/container/AMContainerEventCompleted.java | 11 ++
.../dag/app/rm/container/AMContainerImpl.java | 30 ++++-
.../tez/dag/app/rm/TestTaskScheduler.java | 14 ++-
.../dag/app/rm/TestTaskSchedulerHelpers.java | 6 +
.../dag/app/rm/container/TestAMContainer.java | 80 ++++++++----
11 files changed, 258 insertions(+), 90 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
new file mode 100644
index 0000000..d1fd9f5
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent
+ implements DiagnosableEvent {
+
+ private final String diagnostics;
+ public TaskAttemptEventAttemptKilled(TezTaskAttemptID id, String diagnostics) {
+ super(id, TaskAttemptEventType.TA_KILLED);
+ this.diagnostics = diagnostics;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index 5210e33..d66b8ec 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -34,6 +34,7 @@ public enum TaskAttemptEventType {
TA_COMMIT_PENDING,
TA_DONE,
TA_FAILED,
+ TA_KILLED,
TA_TIMED_OUT,
//Producer: Client
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 28b8809..2a5e577 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -183,6 +183,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -195,6 +196,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
@@ -209,21 +211,22 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.OUTPUT_CONSUMABLE), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
// How will duplicate history events be handled ?
// TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
@@ -1025,6 +1028,10 @@ public class TaskAttemptImpl implements TaskAttempt,
public ContainerCompletedBeforeRunningTransition() {
super(FAILED_HELPER);
}
+
+ public ContainerCompletedBeforeRunningTransition(TerminatedTransitionHelper helper) {
+ super(helper);
+ }
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
@@ -1113,6 +1120,10 @@ public class TaskAttemptImpl implements TaskAttempt,
public ContainerCompletedWhileRunningTransition() {
super(FAILED_HELPER);
}
+
+ public ContainerCompletedWhileRunningTransition(TerminatedTransitionHelper helper) {
+ super(helper);
+ }
@Override
public void transition(TaskAttemptImpl ta, TaskAttemptEvent event) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
index fd62a75..f96c541 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskScheduler.java
@@ -110,6 +110,7 @@ public class TaskScheduler extends AbstractService
);
public void onError(Throwable t);
public float getProgress();
+ public void preemptContainer(ContainerId containerId);
public AppFinalStatus getFinalAppStatus();
}
@@ -888,67 +889,75 @@ public class TaskScheduler extends AbstractService
return null;
}
- synchronized void preemptIfNeeded() {
- Resource freeResources = Resources.subtract(totalResources,
- allocatedResources);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
- " cpu:" + allocatedResources.getVirtualCores() +
- " delayedContainers: " + delayedContainerManager.delayedContainers.size());
- }
- assert freeResources.getMemory() >= 0;
-
- if (delayedContainerManager.delayedContainers.size() > 0) {
- // if we are holding onto containers then nothing to preempt from outside
- return;
- }
-
- CookieContainerRequest highestPriRequest = null;
- for(CookieContainerRequest request : taskRequests.values()) {
- if(highestPriRequest == null) {
- highestPriRequest = request;
- } else if(isHigherPriority(request.getPriority(),
- highestPriRequest.getPriority())){
- highestPriRequest = request;
+ void preemptIfNeeded() {
+ ContainerId preemptedContainer = null;
+ synchronized (this) {
+ Resource freeResources = Resources.subtract(totalResources,
+ allocatedResources);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Allocated resource memory: " + allocatedResources.getMemory() +
+ " cpu:" + allocatedResources.getVirtualCores() +
+ " delayedContainers: " + delayedContainerManager.delayedContainers.size());
}
- }
- if(highestPriRequest != null &&
- !fitsIn(highestPriRequest.getCapability(), freeResources)) {
- // highest priority request will not fit in existing free resources
- // free up some more
- // TODO this is subject to error wrt RM resource normalization
- Map.Entry<Object, Container> preemptedEntry = null;
- for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
- HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
- CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
- Priority taskPriority = lastTaskInfo.getPriority();
- Object signature = lastTaskInfo.getCookie().getContainerSignature();
- if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
- // higher or same priority
- continue;
+ assert freeResources.getMemory() >= 0;
+
+ if (delayedContainerManager.delayedContainers.size() > 0) {
+ // if we are holding onto containers then nothing to preempt from outside
+ return;
+ }
+
+ CookieContainerRequest highestPriRequest = null;
+ for(CookieContainerRequest request : taskRequests.values()) {
+ if(highestPriRequest == null) {
+ highestPriRequest = request;
+ } else if(isHigherPriority(request.getPriority(),
+ highestPriRequest.getPriority())){
+ highestPriRequest = request;
}
- if (containerSignatureMatcher.isExactMatch(
- highestPriRequest.getCookie().getContainerSignature(),
- signature)) {
- // exact match with different priorities
- continue;
+ }
+ if(highestPriRequest != null &&
+ !fitsIn(highestPriRequest.getCapability(), freeResources)) {
+ // highest priority request will not fit in existing free resources
+ // free up some more
+ // TODO this is subject to error wrt RM resource normalization
+ Map.Entry<Object, Container> preemptedEntry = null;
+ for(Map.Entry<Object, Container> entry : taskAllocations.entrySet()) {
+ HeldContainer heldContainer = heldContainers.get(entry.getValue().getId());
+ CookieContainerRequest lastTaskInfo = heldContainer.getLastTaskInfo();
+ Priority taskPriority = lastTaskInfo.getPriority();
+ Object signature = lastTaskInfo.getCookie().getContainerSignature();
+ if(!isHigherPriority(highestPriRequest.getPriority(), taskPriority)) {
+ // higher or same priority
+ continue;
+ }
+ if (containerSignatureMatcher.isExactMatch(
+ highestPriRequest.getCookie().getContainerSignature(),
+ signature)) {
+ // exact match with different priorities
+ continue;
+ }
+ if(preemptedEntry == null ||
+ !isHigherPriority(taskPriority,
+ preemptedEntry.getValue().getPriority())) {
+ // keep the lower priority or the one added later
+ preemptedEntry = entry;
+ }
}
- if(preemptedEntry == null ||
- !isHigherPriority(taskPriority,
- preemptedEntry.getValue().getPriority())) {
- // keep the lower priority or the one added later
- preemptedEntry = entry;
+ if(preemptedEntry != null) {
+ // found something to preempt
+ LOG.info("Preempting task: " + preemptedEntry.getKey() +
+ " to free resource for request: " + highestPriRequest +
+ " . Current free resources: " + freeResources);
+ preemptedContainer = preemptedEntry.getValue().getId();
+ // app client will be notified when after container is killed
+ // and we get its completed container status
}
}
- if(preemptedEntry != null) {
- // found something to preempt
- LOG.info("Preempting task: " + preemptedEntry.getKey() +
- " to free resource for request: " + highestPriRequest +
- " . Current free resources: " + freeResources);
- deallocateContainer(preemptedEntry.getValue().getId());
- // app client will be notified when after container is killed
- // and we get its completed container status
- }
+ }
+
+ // upcall outside locks
+ if (preemptedContainer != null) {
+ appClientDelegate.preemptContainer(preemptedContainer);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
index 1f11a03..2debf06 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerAppCallbackWrapper.java
@@ -119,6 +119,11 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
throw new TezUncheckedException(e);
}
}
+
+ @Override
+ public void preemptContainer(ContainerId containerId) {
+ completionService.submit(new PreemptContainerCallable(real, containerId));
+ }
@Override
public AppFinalStatus getFinalAppStatus() {
@@ -271,6 +276,22 @@ class TaskSchedulerAppCallbackWrapper implements TaskSchedulerAppCallback {
}
}
+ static class PreemptContainerCallable extends TaskSchedulerAppCallbackBase
+ implements Callable<Void> {
+ private final ContainerId containerId;
+
+ public PreemptContainerCallable(TaskSchedulerAppCallback app, ContainerId id) {
+ super(app);
+ this.containerId = id;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ app.preemptContainer(containerId);
+ return null;
+ }
+ }
+
static class GetProgressCallable extends TaskSchedulerAppCallbackBase
implements Callable<Float> {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
index 3d80d0c..031d952 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/TaskSchedulerEventHandler.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -516,4 +517,12 @@ public class TaskSchedulerEventHandler extends AbstractService
public void dagCompleted() {
taskScheduler.resetMatchLocalityForAllHeldContainers();
}
+
+ @Override
+ public void preemptContainer(ContainerId containerId) {
+ taskScheduler.deallocateContainer(containerId);
+ // Inform the Containers about completion.
+ sendEvent(new AMContainerEventCompleted(ContainerStatus.newInstance(
+ containerId, ContainerState.COMPLETE, "Container Preempted Internally", -1), true));
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index fb19559..a6bcc14 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -23,14 +23,25 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
public class AMContainerEventCompleted extends AMContainerEvent {
private final ContainerStatus containerStatus;
+ private final boolean isPreempted;
public AMContainerEventCompleted(ContainerStatus containerStatus) {
+ this(containerStatus, false);
+ }
+
+ public AMContainerEventCompleted(ContainerStatus containerStatus,
+ boolean isPreempted) {
super(containerStatus.getContainerId(), AMContainerEventType.C_COMPLETED);
this.containerStatus = containerStatus;
+ this.isPreempted = isPreempted;
}
public ContainerStatus getContainerStatus() {
return this.containerStatus;
}
+
+ public boolean isPreempted() {
+ return isPreempted;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 36c34ec..486584c 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -45,6 +45,7 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
@@ -482,8 +483,13 @@ public class AMContainerImpl implements AMContainer {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
- container.sendTerminatedToTaskAttempt(container.pendingAttempt,
- errorMessage);
+ if (event.isPreempted()) {
+ container.sendKilledToTaskAttempt(container.pendingAttempt,
+ errorMessage);
+ } else {
+ container.sendTerminatedToTaskAttempt(container.pendingAttempt,
+ errorMessage);
+ }
container.registerFailedAttempt(container.pendingAttempt);
container.pendingAttempt = null;
LOG.warn(errorMessage);
@@ -500,7 +506,8 @@ public class AMContainerImpl implements AMContainer {
public String getMessage(AMContainerImpl container,
AMContainerEventCompleted event) {
return "Container" + container.getContainerId()
- + " COMPLETED while trying to launch. Diagnostics: ["
+ + (event.isPreempted() ? " PREEMPTED" : " COMPLETED")
+ + " while trying to launch. Diagnostics: ["
+ event.getContainerStatus().getDiagnostics() +"]";
}
}
@@ -645,7 +652,8 @@ public class AMContainerImpl implements AMContainer {
@Override
public String getMessage(
AMContainerImpl container, AMContainerEventCompleted event) {
- return "Container " + container.getContainerId() + " COMPLETED"
+ return "Container " + container.getContainerId()
+ + (event.isPreempted() ? " PREEMPTED" : " COMPLETED")
+ " with diagnostics set to ["
+ event.getContainerStatus().getDiagnostics() + "]";
}
@@ -717,8 +725,13 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
- container.sendTerminatedToTaskAttempt(container.runningAttempt,
- getMessage(container, event));
+ if (event.isPreempted()) {
+ container.sendKilledToTaskAttempt(container.runningAttempt,
+ getMessage(container, event));
+ } else {
+ container.sendTerminatedToTaskAttempt(container.runningAttempt,
+ getMessage(container, event));
+ }
container.unregisterAttemptFromListener(container.runningAttempt);
container.registerFailedAttempt(container.runningAttempt);
container.runningAttempt = null;
@@ -920,6 +933,11 @@ public class AMContainerImpl implements AMContainer {
TezTaskAttemptID taId, String message) {
sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
}
+
+ protected void sendKilledToTaskAttempt(
+ TezTaskAttemptID taId, String message) {
+ sendEvent(new TaskAttemptEventAttemptKilled(taId, message));
+ }
protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
String message) {
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
index cc7bbae..c8e8d1b 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskScheduler.java
@@ -675,7 +675,7 @@ public class TestTaskScheduler {
scheduler.close();
}
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes" })
@Test
public void testTaskSchedulerPreemption() throws Exception {
RackResolver.init(new YarnConfiguration());
@@ -689,7 +689,7 @@ public class TestTaskScheduler {
String appHost = "host";
int appPort = 0;
String appUrl = "url";
- TaskSchedulerWithDrainableAppCallback scheduler =
+ final TaskSchedulerWithDrainableAppCallback scheduler =
new TaskSchedulerWithDrainableAppCallback(
mockApp, new PreemptionMatcher(), appHost, appPort,
appUrl, mockRMClient, mockAppContext);
@@ -830,6 +830,16 @@ public class TestTaskScheduler {
}
});
+
+ Mockito.doAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ Object[] args = invocation.getArguments();
+ ContainerId cId = (ContainerId) args[0];
+ scheduler.deallocateContainer(cId);
+ return null;
+ }})
+ .when(mockApp).preemptContainer((ContainerId)any());
+
scheduler.onContainersAllocated(containers);
drainableAppCallback.drain();
Assert.assertEquals(3072, scheduler.allocatedResources.getMemory());
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
index 9557456..99158d0 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/TestTaskSchedulerHelpers.java
@@ -286,6 +286,12 @@ class TestTaskSchedulerHelpers {
return real.getFinalAppStatus();
}
+ @Override
+ public void preemptContainer(ContainerId cId) {
+ invocations++;
+ real.preemptContainer(cId);
+ }
+
public void drain() throws InterruptedException, ExecutionException {
while (completedEvents < invocations) {
Future f = completionService.poll(5000l, TimeUnit.MILLISECONDS);
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/00891226/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index fd1900f..18cdcfd 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -120,7 +120,7 @@ public class TestAMContainer {
assertNull(wc.amContainer.getRunningTaskAttempt());
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
// 1 Scheduler completed event.
wc.verifyCountAndGetOutgoingEvents(1);
@@ -176,7 +176,7 @@ public class TestAMContainer {
assertNull(wc.amContainer.getRunningTaskAttempt());
verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
// 1 Scheduler completed event.
wc.verifyCountAndGetOutgoingEvents(1);
@@ -210,7 +210,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.STOPPING);
wc.verifyNoOutgoingEvents();
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
// 1 Scheduler completed event.
wc.verifyCountAndGetOutgoingEvents(1);
@@ -249,7 +249,7 @@ public class TestAMContainer {
assertTrue(wc.verifyCountAndGetOutgoingEvents(1).get(0).getType() ==
AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
// 1 Scheduler completed event.
wc.verifyCountAndGetOutgoingEvents(1);
@@ -288,7 +288,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted();
+ wc.containerCompleted(false);
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -328,7 +328,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted();
+ wc.containerCompleted(false);
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -367,7 +367,7 @@ public class TestAMContainer {
assertTrue(wc.amContainer.isInErrorState());
wc.nmStopSent();
- wc.containerCompleted();
+ wc.containerCompleted(false);
// 1 Inform scheduler. 2 TERMINATED to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -403,7 +403,7 @@ public class TestAMContainer {
NMCommunicatorEventType.CONTAINER_STOP_REQUEST);
// TODO Should this be an RM DE-ALLOCATE instead ?
- wc.containerCompleted();
+ wc.containerCompleted(false);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -435,7 +435,7 @@ public class TestAMContainer {
TaskAttemptEventType.TA_CONTAINER_TERMINATING,
AMSchedulerEventType.S_CONTAINER_DEALLOCATE);
- wc.containerCompleted();
+ wc.containerCompleted(false);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -452,7 +452,7 @@ public class TestAMContainer {
List<Event> outgoingEvents;
wc.verifyState(AMContainerState.ALLOCATED);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -474,7 +474,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(wc.taskAttemptID);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -505,7 +505,7 @@ public class TestAMContainer {
wc.containerLaunched();
wc.verifyState(AMContainerState.IDLE);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -541,7 +541,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.verifyState(AMContainerState.RUNNING);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID);
verify(wc.tal).unregisterRunningContainer(wc.containerID);
@@ -566,6 +566,42 @@ public class TestAMContainer {
@SuppressWarnings("rawtypes")
@Test
+ public void testContainerPreemptedAtRunning() {
+ WrappedContainer wc = new WrappedContainer();
+ List<Event> outgoingEvents;
+
+ wc.launchContainer();
+
+ wc.assignTaskAttempt(wc.taskAttemptID);
+ wc.containerLaunched();
+ wc.pullTaskToRun();
+ wc.verifyState(AMContainerState.RUNNING);
+
+ wc.containerCompleted(true);
+ wc.verifyState(AMContainerState.COMPLETED);
+ verify(wc.tal).registerRunningContainer(wc.containerID);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID);
+ verify(wc.chh).register(wc.containerID);
+ verify(wc.chh).unregister(wc.containerID);
+
+ outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
+ verifyUnOrderedOutgoingEventTypes(outgoingEvents,
+ AMSchedulerEventType.S_CONTAINER_COMPLETED,
+ TaskAttemptEventType.TA_KILLED);
+
+ assertFalse(wc.amContainer.isInErrorState());
+
+ // Pending task complete. (Ideally, container should be dead at this point
+ // and this event should not be generated. Network timeout on NM-RM heartbeat
+ // can cause it to be genreated)
+ wc.taskAttemptSucceeded(wc.taskAttemptID);
+ wc.verifyNoOutgoingEvents();
+
+ assertFalse(wc.amContainer.isInErrorState());
+ }
+
+ @SuppressWarnings("rawtypes")
+ @Test
public void testTaskAssignedToCompletedContainer() {
WrappedContainer wc = new WrappedContainer();
List<Event> outgoingEvents;
@@ -576,7 +612,7 @@ public class TestAMContainer {
wc.pullTaskToRun();
wc.taskAttemptSucceeded(wc.taskAttemptID);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
TezTaskAttemptID taID2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
@@ -634,7 +670,7 @@ public class TestAMContainer {
}
}
- wc.containerCompleted();
+ wc.containerCompleted(false);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -680,7 +716,7 @@ public class TestAMContainer {
assertFalse(wc.amContainer.isInErrorState());
- wc.containerCompleted();
+ wc.containerCompleted(false);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
AMSchedulerEventType.S_CONTAINER_COMPLETED);
@@ -724,7 +760,7 @@ public class TestAMContainer {
}
}
- wc.containerCompleted();
+ wc.containerCompleted(false);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
TaskAttemptEventType.TA_CONTAINER_TERMINATED,
@@ -754,7 +790,7 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID2);
wc.stopRequest();
wc.nmStopSent();
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
wc.nodeFailed();
@@ -786,14 +822,14 @@ public class TestAMContainer {
wc.taskAttemptSucceeded(taID2);
wc.stopRequest();
wc.nmStopSent();
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyState(AMContainerState.COMPLETED);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
AMSchedulerEventType.S_CONTAINER_COMPLETED);
- wc.containerCompleted();
+ wc.containerCompleted(false);
wc.verifyNoOutgoingEvents();
}
@@ -939,11 +975,11 @@ public class TestAMContainer {
AMContainerEventType.C_NM_STOP_FAILED));
}
- public void containerCompleted() {
+ public void containerCompleted(boolean preempted) {
reset(eventHandler);
ContainerStatus cStatus = ContainerStatus.newInstance(containerID,
ContainerState.COMPLETE, "", 100);
- amContainer.handle(new AMContainerEventCompleted(cStatus));
+ amContainer.handle(new AMContainerEventCompleted(cStatus, preempted));
}
public void containerTimedOut() {