You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by bi...@apache.org on 2013/12/16 08:06:21 UTC
git commit: Addendum to TEZ-675. Pre-empted taskAttempt gets marked
as FAILED instead of KILLED (bikas)
Updated Branches:
refs/heads/master 44a7b72bb -> 3d9601b83
Addendum to TEZ-675. Pre-empted taskAttempt gets marked as FAILED instead of KILLED (bikas)
Project: http://git-wip-us.apache.org/repos/asf/incubator-tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tez/commit/3d9601b8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tez/tree/3d9601b8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tez/diff/3d9601b8
Branch: refs/heads/master
Commit: 3d9601b834ddd00c0e28e08b683402e7c14b8885
Parents: 44a7b72
Author: Bikas Saha <bi...@apache.org>
Authored: Sun Dec 15 23:06:03 2013 -0800
Committer: Bikas Saha <bi...@apache.org>
Committed: Sun Dec 15 23:06:03 2013 -0800
----------------------------------------------------------------------
.../event/TaskAttemptEventAttemptKilled.java | 36 --------------------
.../TaskAttemptEventContainerPreempted.java | 36 ++++++++++++++++++++
.../dag/app/dag/event/TaskAttemptEventType.java | 4 ++-
.../tez/dag/app/dag/impl/TaskAttemptImpl.java | 14 ++++----
.../dag/app/rm/container/AMContainerImpl.java | 10 +++---
.../dag/app/rm/container/TestAMContainer.java | 2 +-
6 files changed, 52 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d9601b8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
deleted file mode 100644
index d1fd9f5..0000000
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventAttemptKilled.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
-
-package org.apache.tez.dag.app.dag.event;
-
-import org.apache.tez.dag.records.TezTaskAttemptID;
-
-public class TaskAttemptEventAttemptKilled extends TaskAttemptEvent
- implements DiagnosableEvent {
-
- private final String diagnostics;
- public TaskAttemptEventAttemptKilled(TezTaskAttemptID id, String diagnostics) {
- super(id, TaskAttemptEventType.TA_KILLED);
- this.diagnostics = diagnostics;
- }
-
- @Override
- public String getDiagnosticInfo() {
- return diagnostics;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d9601b8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java
new file mode 100644
index 0000000..1a29010
--- /dev/null
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventContainerPreempted.java
@@ -0,0 +1,36 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.tez.dag.app.dag.event;
+
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public class TaskAttemptEventContainerPreempted extends TaskAttemptEvent
+ implements DiagnosableEvent {
+
+ private final String diagnostics;
+ public TaskAttemptEventContainerPreempted(TezTaskAttemptID id, String diagnostics) {
+ super(id, TaskAttemptEventType.TA_CONTAINER_PREEMPTED);
+ this.diagnostics = diagnostics;
+ }
+
+ @Override
+ public String getDiagnosticInfo() {
+ return diagnostics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d9601b8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
index d66b8ec..018bd3d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/event/TaskAttemptEventType.java
@@ -34,7 +34,6 @@ public enum TaskAttemptEventType {
TA_COMMIT_PENDING,
TA_DONE,
TA_FAILED,
- TA_KILLED,
TA_TIMED_OUT,
//Producer: Client
@@ -53,6 +52,9 @@ public enum TaskAttemptEventType {
// container. TODO: Document the case.
TA_CONTAINER_TERMINATED,
+ // Container has either been preempted or will be preempted
+ TA_CONTAINER_PREEMPTED,
+
// The node running the task attempt failed.
TA_NODE_FAILED,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d9601b8/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
index 2a5e577..b9c4a03 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/dag/impl/TaskAttemptImpl.java
@@ -183,7 +183,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new NodeFailedBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new ContainerTerminatingBeforeRunningTransition())
.addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.START_WAIT, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
@@ -196,7 +196,7 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileRunningTransition())
- .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.RUNNING, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.RUNNING), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_STATUS_UPDATE, STATUS_UPDATER)
@@ -211,22 +211,22 @@ public class TaskAttemptImpl implements TaskAttempt,
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_NODE_FAILED, new TerminatedWhileRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_CONTAINER_TERMINATING, new TerminatedWhileRunningTransition(FAILED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedBeforeRunningTransition())
- .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_KILLED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
+ .addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, new ContainerCompletedBeforeRunningTransition(KILLED_HELPER))
.addTransition(TaskAttemptStateInternal.OUTPUT_CONSUMABLE, EnumSet.of(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.OUTPUT_CONSUMABLE), TaskAttemptEventType.TA_OUTPUT_FAILED, new OutputReportedFailedTransition())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.KILL_IN_PROGRESS, TaskAttemptStateInternal.KILL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATED, new ContainerCompletedWhileTerminating())
.addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.FAIL_IN_PROGRESS, TaskAttemptStateInternal.FAIL_IN_PROGRESS, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.KILLED, TaskAttemptStateInternal.KILLED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
.addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE, DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_KILLED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
+ .addTransition(TaskAttemptStateInternal.FAILED, TaskAttemptStateInternal.FAILED, EnumSet.of(TaskAttemptEventType.TA_STARTED_REMOTELY, TaskAttemptEventType.TA_CONTAINER_PREEMPTED, TaskAttemptEventType.TA_STATUS_UPDATE, TaskAttemptEventType.TA_OUTPUT_CONSUMABLE, TaskAttemptEventType.TA_COMMIT_PENDING, TaskAttemptEventType.TA_DONE, TaskAttemptEventType.TA_FAILED, TaskAttemptEventType.TA_TIMED_OUT, TaskAttemptEventType.TA_FAIL_REQUEST, TaskAttemptEventType.TA_KILL_REQUEST, TaskAttemptEventType.TA_NODE_FAILED, TaskAttemptEventType.TA_CONTAINER_TERMINATING, TaskAttemptEventType.TA_CONTAINER_TERMINATED, TaskAttemptEventType.TA_OUTPUT_FAILED))
// How will duplicate history events be handled ?
// TODO Maybe consider not failing REDUCE tasks in this case. Also, MAP_TASKS in case there's only one phase in the job.
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d9601b8/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 486584c..738b6d3 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -45,7 +45,7 @@ import org.apache.tez.dag.app.ContainerHeartbeatHandler;
import org.apache.tez.dag.app.ContainerContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.dag.event.DiagnosableEvent;
-import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
+import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerPreempted;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminated;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventContainerTerminating;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventNodeFailed;
@@ -484,7 +484,7 @@ public class AMContainerImpl implements AMContainer {
if (container.pendingAttempt != null) {
String errorMessage = getMessage(container, event);
if (event.isPreempted()) {
- container.sendKilledToTaskAttempt(container.pendingAttempt,
+ container.sendPreemptedToTaskAttempt(container.pendingAttempt,
errorMessage);
} else {
container.sendTerminatedToTaskAttempt(container.pendingAttempt,
@@ -726,7 +726,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventCompleted event = (AMContainerEventCompleted) cEvent;
if (event.isPreempted()) {
- container.sendKilledToTaskAttempt(container.runningAttempt,
+ container.sendPreemptedToTaskAttempt(container.runningAttempt,
getMessage(container, event));
} else {
container.sendTerminatedToTaskAttempt(container.runningAttempt,
@@ -934,9 +934,9 @@ public class AMContainerImpl implements AMContainer {
sendEvent(new TaskAttemptEventContainerTerminated(taId, message));
}
- protected void sendKilledToTaskAttempt(
+ protected void sendPreemptedToTaskAttempt(
TezTaskAttemptID taId, String message) {
- sendEvent(new TaskAttemptEventAttemptKilled(taId, message));
+ sendEvent(new TaskAttemptEventContainerPreempted(taId, message));
}
protected void sendTerminatingToTaskAttempt(TezTaskAttemptID taId,
http://git-wip-us.apache.org/repos/asf/incubator-tez/blob/3d9601b8/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index 18cdcfd..84f00c9 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -587,7 +587,7 @@ public class TestAMContainer {
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
AMSchedulerEventType.S_CONTAINER_COMPLETED,
- TaskAttemptEventType.TA_KILLED);
+ TaskAttemptEventType.TA_CONTAINER_PREEMPTED);
assertFalse(wc.amContainer.isInErrorState());