You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ss...@apache.org on 2015/08/06 11:26:33 UTC
[41/51] [abbrv] tez git commit: TEZ-2495. Inform TaskCommunicaor
about Task and Container termination reasons. (sseth)
TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons. (sseth)
Project: http://git-wip-us.apache.org/repos/asf/tez/repo
Commit: http://git-wip-us.apache.org/repos/asf/tez/commit/b41dbb05
Tree: http://git-wip-us.apache.org/repos/asf/tez/tree/b41dbb05
Diff: http://git-wip-us.apache.org/repos/asf/tez/diff/b41dbb05
Branch: refs/heads/TEZ-2003
Commit: b41dbb054370d561157aa2ccf0b932948ddb4a35
Parents: a519c29
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 28 02:01:04 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu Aug 6 01:26:10 2015 -0700
----------------------------------------------------------------------
TEZ-2003-CHANGES.txt | 1 +
.../org/apache/tez/common/TezUtilsInternal.java | 31 ++++++++-----
.../apache/tez/dag/api/ContainerEndReason.java | 27 +++++++++++
.../tez/dag/api/TaskAttemptEndReason.java | 13 +++---
.../apache/tez/dag/api/TaskCommunicator.java | 11 +++--
.../apache/tez/dag/app/TaskAttemptListener.java | 6 ++-
.../dag/app/TaskAttemptListenerImpTezDag.java | 9 ++--
.../tez/dag/app/TezTaskCommunicatorImpl.java | 6 ++-
.../rm/container/AMContainerEventCompleted.java | 41 +++++++++++++++++
.../dag/app/rm/container/AMContainerImpl.java | 35 ++++++++-------
.../app/TestTaskAttemptListenerImplTezDag.java | 8 ++--
.../dag/app/rm/container/TestAMContainer.java | 47 +++++++++++---------
.../TezTestServiceTaskCommunicatorImpl.java | 9 ++--
.../apache/tez/runtime/task/TezTaskRunner2.java | 20 +++++++--
14 files changed, 186 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/TEZ-2003-CHANGES.txt
----------------------------------------------------------------------
diff --git a/TEZ-2003-CHANGES.txt b/TEZ-2003-CHANGES.txt
index d651960..e333832 100644
--- a/TEZ-2003-CHANGES.txt
+++ b/TEZ-2003-CHANGES.txt
@@ -28,5 +28,6 @@ ALL CHANGES:
TEZ-2443. TaskRunner2 should call abort, NPEs while cleaning up tasks.
TEZ-2465. Retrun the status of a kill request in TaskRunner2.
TEZ-2471. NPE in LogicalIOProcessorRuntimeTask while printing thread info.
+ TEZ-2495. Inform TaskCommunicaor about Task and Container termination reasons.
INCOMPATIBLE CHANGES:
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
index 347a4f6..0bdeb79 100644
--- a/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
+++ b/tez-common/src/main/java/org/apache/tez/common/TezUtilsInternal.java
@@ -246,10 +246,16 @@ public class TezUtilsInternal {
return TaskAttemptTerminationCause.COMMUNICATION_ERROR;
case SERVICE_BUSY:
return TaskAttemptTerminationCause.SERVICE_BUSY;
- case INTERRUPTED_BY_SYSTEM:
- return TaskAttemptTerminationCause.INTERRUPTED_BY_SYSTEM;
- case INTERRUPTED_BY_USER:
- return TaskAttemptTerminationCause.INTERRUPTED_BY_USER;
+ case INTERNAL_PREEMPTION:
+ return TaskAttemptTerminationCause.INTERNAL_PREEMPTION;
+ case EXTERNAL_PREEMPTION:
+ return TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
+ case APPLICATION_ERROR:
+ return TaskAttemptTerminationCause.APPLICATION_ERROR;
+ case FRAMEWORK_ERROR:
+ return TaskAttemptTerminationCause.FRAMEWORK_ERROR;
+ case NODE_FAILED:
+ return TaskAttemptTerminationCause.NODE_FAILED;
case OTHER:
return TaskAttemptTerminationCause.UNKNOWN_ERROR;
default:
@@ -267,20 +273,24 @@ public class TezUtilsInternal {
return TaskAttemptEndReason.COMMUNICATION_ERROR;
case SERVICE_BUSY:
return TaskAttemptEndReason.SERVICE_BUSY;
+ case INTERNAL_PREEMPTION:
+ return TaskAttemptEndReason.INTERNAL_PREEMPTION;
+ case EXTERNAL_PREEMPTION:
+ return TaskAttemptEndReason.EXTERNAL_PREEMPTION;
+ case APPLICATION_ERROR:
+ return TaskAttemptEndReason.APPLICATION_ERROR;
+ case FRAMEWORK_ERROR:
+ return TaskAttemptEndReason.FRAMEWORK_ERROR;
+ case NODE_FAILED:
+ return TaskAttemptEndReason.NODE_FAILED;
case INTERRUPTED_BY_SYSTEM:
- return TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM;
case INTERRUPTED_BY_USER:
- return TaskAttemptEndReason.INTERRUPTED_BY_USER;
case UNKNOWN_ERROR:
case TERMINATED_BY_CLIENT:
case TERMINATED_AT_SHUTDOWN:
- case INTERNAL_PREEMPTION:
- case EXTERNAL_PREEMPTION:
case TERMINATED_INEFFECTIVE_SPECULATION:
case TERMINATED_EFFECTIVE_SPECULATION:
case TERMINATED_ORPHANED:
- case APPLICATION_ERROR:
- case FRAMEWORK_ERROR:
case INPUT_READ_ERROR:
case OUTPUT_WRITE_ERROR:
case OUTPUT_LOST:
@@ -288,7 +298,6 @@ public class TezUtilsInternal {
case CONTAINER_LAUNCH_FAILED:
case CONTAINER_EXITED:
case CONTAINER_STOPPED:
- case NODE_FAILED:
case NODE_DISK_ERROR:
default:
return TaskAttemptEndReason.OTHER;
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
new file mode 100644
index 0000000..e13e886
--- /dev/null
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/ContainerEndReason.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.tez.dag.api;
+
+// TODO TEZ-2003 Expose as a public API
+public enum ContainerEndReason {
+ NODE_FAILED, // Completed because the node running the container was marked as dead
+ INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+ EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+ APPLICATION_ERROR, // An error in the AM caused by user code
+ FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+ LAUNCH_FAILED, // Failure to launch the container
+ COMPLETED, // Completed via normal flow
+ OTHER
+}
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
----------------------------------------------------------------------
diff --git a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
index 96a4768..de78d21 100644
--- a/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
+++ b/tez-common/src/main/java/org/apache/tez/dag/api/TaskAttemptEndReason.java
@@ -16,9 +16,12 @@ package org.apache.tez.dag.api;
// TODO TEZ-2003 Expose as a public API
public enum TaskAttemptEndReason {
- COMMUNICATION_ERROR,
- SERVICE_BUSY,
- INTERRUPTED_BY_SYSTEM,
- INTERRUPTED_BY_USER,
- OTHER
+ NODE_FAILED, // Completed because the node running the container was marked as dead
+ COMMUNICATION_ERROR, // Communication error with the task
+ SERVICE_BUSY, // External service busy
+ INTERNAL_PREEMPTION, // Preempted by the AM, due to an internal decision
+ EXTERNAL_PREEMPTION, // Preempted due to cluster contention
+ APPLICATION_ERROR, // An error in the AM caused by user code
+ FRAMEWORK_ERROR, // An error in the AM - likely a bug.
+ OTHER // Unknown reason
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
index 2651013..d0a006b 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/api/TaskCommunicator.java
@@ -36,7 +36,10 @@ public abstract class TaskCommunicator extends AbstractService {
public abstract void registerRunningContainer(ContainerId containerId, String hostname, int port);
// TODO TEZ-2003 Ideally, don't expose YARN containerId; instead expose a Tez specific construct.
- public abstract void registerContainerEnd(ContainerId containerId);
+ public abstract void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason);
+
+ // TODO TEZ-2003 Provide additional inforamtion like reason for container end / Task end.
+ // Was it caused by preemption - or as a result of a general task completion / container completion
// TODO TEZ-2003 TaskSpec breakup into a clean interface
// TODO TEZ-2003 Add support for priority
@@ -48,11 +51,7 @@ public abstract class TaskCommunicator extends AbstractService {
// TODO TEZ-2003. Are additional APIs required to mark a container as completed ? - for completeness.
// TODO TEZ-2003 Remove reference to TaskAttemptID
- // TODO TEZ-2003 This needs some information about why the attempt is being unregistered.
- // e.g. preempted in which case the task may need to be informed. Alternately as a result of
- // a failed task.
- // In case of preemption - a killTask API is likely a better bet than trying to overload this method.
- public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID);
+ public abstract void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason);
// TODO TEZ-2003 This doesn't necessarily belong here. A server may not start within the AM.
public abstract InetSocketAddress getAddress();
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
index e4dad27..92e38ae 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListener.java
@@ -21,6 +21,8 @@ package org.apache.tez.dag.app;
import java.net.InetSocketAddress;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
@@ -34,9 +36,9 @@ public interface TaskAttemptListener {
void registerTaskAttempt(AMContainerTask amContainerTask, ContainerId containerId, int taskCommId);
- void unregisterRunningContainer(ContainerId containerId, int taskCommId);
+ void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason);
- void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId);
+ void unregisterTaskAttempt(TezTaskAttemptID attemptID, int taskCommId, TaskAttemptEndReason endReason);
void dagComplete(DAG dag);
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
index db78fa9..1c61a0d 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TaskAttemptListenerImpTezDag.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.collections4.ListUtils;
+import org.apache.tez.dag.api.ContainerEndReason;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
@@ -355,7 +356,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void unregisterRunningContainer(ContainerId containerId, int taskCommId) {
+ public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unregistering Container from TaskAttemptListener: " + containerId);
}
@@ -363,7 +364,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
if (containerInfo.taskAttemptId != null) {
registeredAttempts.remove(containerInfo.taskAttemptId);
}
- taskCommunicators[taskCommId].registerContainerEnd(containerId);
+ taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason);
}
@Override
@@ -404,7 +405,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
@Override
- public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId) {
+ public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason) {
ContainerId containerId = registeredAttempts.remove(attemptId);
if (containerId == null) {
LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
@@ -418,7 +419,7 @@ public class TaskAttemptListenerImpTezDag extends AbstractService implements
}
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, NULL_CONTAINER_INFO);
- taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId);
+ taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason);
}
@Override
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
index accde2c..3774eb4 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/TezTaskCommunicatorImpl.java
@@ -41,6 +41,8 @@ import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
@@ -185,7 +187,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
}
@Override
- public void registerContainerEnd(ContainerId containerId) {
+ public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
ContainerInfo containerInfo = registeredContainers.remove(containerId);
if (containerInfo != null) {
synchronized(containerInfo) {
@@ -231,7 +233,7 @@ public class TezTaskCommunicatorImpl extends TaskCommunicator {
@Override
- public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
TaskAttempt taskAttempt = new TaskAttempt(taskAttemptID);
ContainerId containerId = attemptToContainerMap.remove(taskAttempt);
if(containerId == null) {
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
index 9bb6d7f..8ef2a83 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerEventCompleted.java
@@ -20,6 +20,7 @@ package org.apache.tez.dag.app.rm.container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.tez.dag.api.ContainerEndReason;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
public class AMContainerEventCompleted extends AMContainerEvent {
@@ -61,4 +62,44 @@ public class AMContainerEventCompleted extends AMContainerEvent {
return errCause;
}
+ public ContainerEndReason getContainerEndReason() {
+ if (errCause != null) {
+ switch (errCause) {
+ case INTERNAL_PREEMPTION:
+ return ContainerEndReason.INTERNAL_PREEMPTION;
+ case EXTERNAL_PREEMPTION:
+ return ContainerEndReason.EXTERNAL_PREEMPTION;
+ case FRAMEWORK_ERROR:
+ return ContainerEndReason.FRAMEWORK_ERROR;
+ case APPLICATION_ERROR:
+ return ContainerEndReason.APPLICATION_ERROR;
+ case CONTAINER_LAUNCH_FAILED:
+ return ContainerEndReason.LAUNCH_FAILED;
+ case NODE_FAILED:
+ return ContainerEndReason.NODE_FAILED;
+ case CONTAINER_EXITED:
+ return ContainerEndReason.COMPLETED;
+ case UNKNOWN_ERROR:
+ case TERMINATED_BY_CLIENT:
+ case TERMINATED_AT_SHUTDOWN:
+ case TERMINATED_INEFFECTIVE_SPECULATION:
+ case TERMINATED_EFFECTIVE_SPECULATION:
+ case TERMINATED_ORPHANED:
+ case INPUT_READ_ERROR:
+ case OUTPUT_WRITE_ERROR:
+ case OUTPUT_LOST:
+ case TASK_HEARTBEAT_ERROR:
+ case CONTAINER_STOPPED:
+ case NODE_DISK_ERROR:
+ case COMMUNICATION_ERROR:
+ case SERVICE_BUSY:
+ case INTERRUPTED_BY_SYSTEM:
+ case INTERRUPTED_BY_USER:
+ default:
+ return ContainerEndReason.OTHER;
+ }
+ } else {
+ return ContainerEndReason.OTHER;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
index 39df2e8..e9e0f04 100644
--- a/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
+++ b/tez-dag/src/main/java/org/apache/tez/dag/app/rm/container/AMContainerImpl.java
@@ -27,6 +27,9 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.tez.common.TezUtilsInternal;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.security.Credentials;
@@ -624,7 +627,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.currentAttempt,
event.getMessage(), TaskAttemptTerminationCause.CONTAINER_LAUNCH_FAILED);
}
- container.unregisterFromTAListener();
+ container.unregisterFromTAListener(ContainerEndReason.LAUNCH_FAILED);
container.deAllocate();
}
}
@@ -654,7 +657,7 @@ public class AMContainerImpl implements AMContainer {
}
container.containerLocalResources = null;
container.additionalLocalResources = null;
- container.unregisterFromTAListener();
+ container.unregisterFromTAListener(event.getContainerEndReason());
String diag = event.getDiagnostics();
if (!(diag == null || diag.equals(""))) {
LOG.info("Container " + container.getContainerId()
@@ -680,7 +683,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatingToTaskAttempt(container.currentAttempt,
getMessage(container, cEvent), TaskAttemptTerminationCause.CONTAINER_STOPPED);
}
- container.unregisterFromTAListener();
+ container.unregisterFromTAListener(ContainerEndReason.OTHER);
container.logStopped(container.currentAttempt == null ?
ContainerExitStatus.SUCCESS
: ContainerExitStatus.INVALID);
@@ -732,7 +735,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
- container.unregisterFromTAListener();
+ container.unregisterFromTAListener(ContainerEndReason.NODE_FAILED);
container.deAllocate();
}
}
@@ -749,7 +752,7 @@ public class AMContainerImpl implements AMContainer {
container.getState(), TaskAttemptTerminationCause.FRAMEWORK_ERROR);
}
container.logStopped(ContainerExitStatus.ABORTED);
- container.unregisterFromTAListener();
+ container.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
container.sendStopRequestToNM();
}
}
@@ -821,7 +824,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
AMContainerEventAssignTA event = (AMContainerEventAssignTA) cEvent;
- container.unregisterAttemptFromListener(container.currentAttempt);
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
container.handleExtraTAAssign(event, container.currentAttempt);
}
}
@@ -832,7 +835,7 @@ public class AMContainerImpl implements AMContainer {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
container.lastTaskFinishTime = System.currentTimeMillis();
container.completedAttempts.add(container.currentAttempt);
- container.unregisterAttemptFromListener(container.currentAttempt);
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
container.currentAttempt = null;
}
}
@@ -849,7 +852,7 @@ public class AMContainerImpl implements AMContainer {
container.sendTerminatedToTaskAttempt(container.currentAttempt,
getMessage(container, event), event.getTerminationCause());
}
- container.unregisterAttemptFromListener(container.currentAttempt);
+ container.unregisterAttemptFromListener(container.currentAttempt, TezUtilsInternal.toTaskAttemptEndReason(event.getTerminationCause()));
container.registerFailedAttempt(container.currentAttempt);
container.currentAttempt= null;
super.transition(container, cEvent);
@@ -859,7 +862,7 @@ public class AMContainerImpl implements AMContainer {
protected static class StopRequestAtRunningTransition
extends StopRequestAtIdleTransition {
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
- container.unregisterAttemptFromListener(container.currentAttempt);
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.OTHER);
super.transition(container, cEvent);
}
}
@@ -880,7 +883,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
- container.unregisterAttemptFromListener(container.currentAttempt);
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.NODE_FAILED);
}
}
@@ -889,7 +892,7 @@ public class AMContainerImpl implements AMContainer {
@Override
public void transition(AMContainerImpl container, AMContainerEvent cEvent) {
super.transition(container, cEvent);
- container.unregisterAttemptFromListener(container.currentAttempt);
+ container.unregisterAttemptFromListener(container.currentAttempt, TaskAttemptEndReason.FRAMEWORK_ERROR);
container.sendTerminatingToTaskAttempt(container.currentAttempt,
"Container " + container.getContainerId() +
" hit an invalid transition - " + cEvent.getType() + " at " +
@@ -1015,7 +1018,7 @@ public class AMContainerImpl implements AMContainer {
LOG.warn(errorMessage);
this.logStopped(ContainerExitStatus.INVALID);
this.sendStopRequestToNM();
- this.unregisterFromTAListener();
+ this.unregisterFromTAListener(ContainerEndReason.FRAMEWORK_ERROR);
this.unregisterFromContainerListener();
}
@@ -1073,8 +1076,8 @@ public class AMContainerImpl implements AMContainer {
container.getNodeId(), container.getContainerToken(), launcherId));
}
- protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId) {
- taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId);
+ protected void unregisterAttemptFromListener(TezTaskAttemptID attemptId, TaskAttemptEndReason endReason) {
+ taskAttemptListener.unregisterTaskAttempt(attemptId, taskCommId, endReason);
}
protected void registerAttemptWithListener(AMContainerTask amContainerTask) {
@@ -1085,8 +1088,8 @@ public class AMContainerImpl implements AMContainer {
taskAttemptListener.registerRunningContainer(containerId, taskCommId);
}
- protected void unregisterFromTAListener() {
- this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId);
+ protected void unregisterFromTAListener(ContainerEndReason endReason) {
+ this.taskAttemptListener.unregisterRunningContainer(containerId, taskCommId, endReason);
}
protected void registerWithContainerListener() {
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
index 34b9792..68d3baf 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/TestTaskAttemptListenerImplTezDag.java
@@ -47,6 +47,8 @@ import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.security.JobTokenSecretManager;
import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskHeartbeatRequest;
import org.apache.tez.dag.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezException;
@@ -163,12 +165,12 @@ public class TestTaskAttemptListenerImplTezDag {
assertEquals(taskSpec, containerTask.getTaskSpec());
// Task unregistered. Should respond to heartbeats
- taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0);
+ taskAttemptListener.unregisterTaskAttempt(taskAttemptID, 0, TaskAttemptEndReason.OTHER);
containerTask = tezUmbilical.getTask(containerContext2);
assertNull(containerTask);
// Container unregistered. Should send a shouldDie = true
- taskAttemptListener.unregisterRunningContainer(containerId2, 0);
+ taskAttemptListener.unregisterRunningContainer(containerId2, 0, ContainerEndReason.OTHER);
containerTask = tezUmbilical.getTask(containerContext2);
assertTrue(containerTask.shouldDie());
@@ -182,7 +184,7 @@ public class TestTaskAttemptListenerImplTezDag {
doReturn(taskAttemptId2).when(taskSpec2).getTaskAttemptID();
AMContainerTask amContainerTask2 = new AMContainerTask(taskSpec, null, null, false, 0);
taskAttemptListener.registerTaskAttempt(amContainerTask2, containerId3, 0);
- taskAttemptListener.unregisterRunningContainer(containerId3, 0);
+ taskAttemptListener.unregisterRunningContainer(containerId3, 0, ContainerEndReason.OTHER);
containerTask = tezUmbilical.getTask(containerContext3);
assertTrue(containerTask.shouldDie());
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
----------------------------------------------------------------------
diff --git a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
index bdd0f61..b8b4774 100644
--- a/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
+++ b/tez-dag/src/test/java/org/apache/tez/dag/app/rm/container/TestAMContainer.java
@@ -63,6 +63,8 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.TokenCache;
+import org.apache.tez.dag.api.ContainerEndReason;
+import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicator;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ContainerHeartbeatHandler;
@@ -132,14 +134,14 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
// Container completed
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
verify(wc.chh).unregister(wc.containerID);
assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -181,13 +183,13 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
verify(wc.chh).unregister(wc.containerID);
assertEquals(1, wc.amContainer.getAllTaskAttempts().size());
@@ -232,7 +234,7 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0);
+ verify(wc.tal).unregisterTaskAttempt(wc.taskAttemptID, 0, TaskAttemptEndReason.OTHER);
TezTaskAttemptID taId2 = TezTaskAttemptID.getInstance(wc.taskID, 2);
wc.assignTaskAttempt(taId2);
@@ -247,14 +249,14 @@ public class TestAMContainer {
wc.verifyState(AMContainerState.IDLE);
wc.verifyNoOutgoingEvents();
assertNull(wc.amContainer.getCurrentTaskAttempt());
- verify(wc.tal).unregisterTaskAttempt(taId2, 0);
+ verify(wc.tal).unregisterTaskAttempt(taId2, 0, TaskAttemptEndReason.OTHER);
// Container completed
wc.containerCompleted();
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
verify(wc.chh).unregister(wc.containerID);
assertEquals(2, wc.amContainer.getAllTaskAttempts().size());
@@ -287,7 +289,7 @@ public class TestAMContainer {
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
verify(wc.chh).unregister(wc.containerID);
assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -324,7 +326,7 @@ public class TestAMContainer {
wc.verifyHistoryStopEvent();
wc.verifyState(AMContainerState.COMPLETED);
wc.verifyNoOutgoingEvents();
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
verify(wc.chh).unregister(wc.containerID);
assertNull(wc.amContainer.getCurrentTaskAttempt());
@@ -347,7 +349,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -385,7 +387,7 @@ public class TestAMContainer {
wc.assignTaskAttempt(taID2);
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.FRAMEWORK_ERROR);
verify(wc.chh).unregister(wc.containerID);
// 1 for NM stop request. 2 TERMINATING to TaskAttempt.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(3);
@@ -421,7 +423,7 @@ public class TestAMContainer {
wc.containerTimedOut();
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -455,7 +457,7 @@ public class TestAMContainer {
wc.stopRequest();
wc.verifyState(AMContainerState.STOP_REQUESTED);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
verify(wc.chh).unregister(wc.containerID);
// 1 to TA, 1 for RM de-allocate.
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
@@ -489,7 +491,7 @@ public class TestAMContainer {
wc.launchFailed();
wc.verifyState(AMContainerState.STOPPING);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.LAUNCH_FAILED);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(2);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -539,7 +541,7 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -569,7 +571,7 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -600,7 +602,7 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.ABORTED, TaskAttemptTerminationCause.NODE_FAILED);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.NODE_FAILED);
outgoingEvents = wc.verifyCountAndGetOutgoingEvents(1);
verifyUnOrderedOutgoingEventTypes(outgoingEvents,
@@ -631,7 +633,7 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -660,7 +662,7 @@ public class TestAMContainer {
wc.containerCompleted();
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.COMPLETED);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -695,7 +697,7 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.PREEMPTED, TaskAttemptTerminationCause.EXTERNAL_PREEMPTION);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.EXTERNAL_PREEMPTION);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -732,7 +734,8 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.INVALID, TaskAttemptTerminationCause.INTERNAL_PREEMPTION);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0,
+ ContainerEndReason.INTERNAL_PREEMPTION);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
@@ -769,7 +772,7 @@ public class TestAMContainer {
wc.containerCompleted(ContainerExitStatus.DISKS_FAILED, TaskAttemptTerminationCause.NODE_DISK_ERROR);
wc.verifyState(AMContainerState.COMPLETED);
verify(wc.tal).registerRunningContainer(wc.containerID, 0);
- verify(wc.tal).unregisterRunningContainer(wc.containerID, 0);
+ verify(wc.tal).unregisterRunningContainer(wc.containerID, 0, ContainerEndReason.OTHER);
verify(wc.chh).register(wc.containerID);
verify(wc.chh).unregister(wc.containerID);
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
----------------------------------------------------------------------
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
index cf28b11..98673a6 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/dag/app/taskcomm/TezTestServiceTaskCommunicatorImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.tez.dag.api.ContainerEndReason;
import org.apache.tez.dag.api.TaskAttemptEndReason;
import org.apache.tez.dag.api.TaskCommunicatorContext;
import org.apache.tez.dag.app.TezTaskCommunicatorImpl;
@@ -98,8 +99,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
}
@Override
- public void registerContainerEnd(ContainerId containerId) {
- super.registerContainerEnd(containerId);
+ public void registerContainerEnd(ContainerId containerId, ContainerEndReason endReason) {
+ super.registerContainerEnd(containerId, endReason);
}
@Override
@@ -175,8 +176,8 @@ public class TezTestServiceTaskCommunicatorImpl extends TezTaskCommunicatorImpl
}
@Override
- public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
- super.unregisterRunningTaskAttempt(taskAttemptID);
+ public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID, TaskAttemptEndReason endReason) {
+ super.unregisterRunningTaskAttempt(taskAttemptID, endReason);
// Nothing else to do for now. The push API in the test does not support termination of a running task
}
http://git-wip-us.apache.org/repos/asf/tez/blob/b41dbb05/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
----------------------------------------------------------------------
diff --git a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
index 3bf9f84..15629fd 100644
--- a/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
+++ b/tez-runtime-internals/src/main/java/org/apache/tez/runtime/task/TezTaskRunner2.java
@@ -261,7 +261,13 @@ public class TezTaskRunner2 {
taskRunnerCallable.interruptTask();
}
return true;
+ } else {
+ LOG.info("Ignoring killTask request for {} since end reason is already set to {}",
+ task.getTaskAttemptID(), firstEndReason);
}
+ } else {
+ LOG.info("Ignoring killTask request for {} since it is not in a running state",
+ task.getTaskAttemptID());
}
}
return false;
@@ -389,10 +395,18 @@ public class TezTaskRunner2 {
isFirstTerminate = trySettingEndReason(EndReason.CONTAINER_STOP_REQUESTED);
// Respect stopContainerRequested since it can come in at any point, despite a previous failure.
stopContainerRequested.set(true);
- }
- if (isFirstTerminate) {
- killTask();
+ if (isFirstTerminate) {
+ LOG.info("Attempting to abort {} since a shutdown request was received",
+ task.getTaskAttemptID());
+ if (taskRunnerCallable != null) {
+ taskKillStartTime = System.currentTimeMillis();
+ taskRunnerCallable.interruptTask();
+ }
+ } else {
+ LOG.info("Not acting on shutdown request for {} since the task is not in running state",
+ task.getTaskAttemptID());
+ }
}
}
}