You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/05/12 22:49:42 UTC
[25/36] hadoop git commit: MAPREDUCE-5465. Tasks are often killed
before they exit on their own. Contributed by Ming Ma
MAPREDUCE-5465. Tasks are often killed before they exit on their own. Contributed by Ming Ma
Conflicts:
hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5625ac46
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5625ac46
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5625ac46
Branch: refs/heads/YARN-2928
Commit: 5625ac469a4d7ba4d936a0002c0fe61828c8eedb
Parents: aa3e32d
Author: Jason Lowe <jl...@apache.org>
Authored: Mon May 11 22:37:35 2015 +0000
Committer: Zhijie Shen <zj...@apache.org>
Committed: Tue May 12 13:44:13 2015 -0700
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../hadoop/mapred/LocalContainerLauncher.java | 12 +-
.../hadoop/mapreduce/v2/app/AppContext.java | 2 +
.../hadoop/mapreduce/v2/app/MRAppMaster.java | 39 +-
.../v2/app/TaskAttemptFinishingMonitor.java | 63 +++
.../v2/app/client/MRClientService.java | 2 +-
.../v2/app/job/TaskAttemptStateInternal.java | 39 +-
.../v2/app/job/event/TaskAttemptEventType.java | 3 +
.../v2/app/job/impl/TaskAttemptImpl.java | 445 +++++++++++++++----
.../v2/app/launcher/ContainerLauncher.java | 8 +-
.../v2/app/launcher/ContainerLauncherImpl.java | 11 +-
.../mapred/TestTaskAttemptFinishingMonitor.java | 108 +++++
.../apache/hadoop/mapreduce/v2/app/MRApp.java | 16 +
.../hadoop/mapreduce/v2/app/MockAppContext.java | 6 +
.../hadoop/mapreduce/v2/app/TestFail.java | 2 +
.../hadoop/mapreduce/v2/app/TestKill.java | 142 ++++--
.../mapreduce/v2/app/TestRuntimeEstimators.java | 5 +
.../v2/app/job/impl/TestTaskAttempt.java | 263 ++++++++++-
.../apache/hadoop/mapreduce/MRJobConfig.java | 10 +-
.../src/main/resources/mapred-default.xml | 20 +
.../hadoop/mapreduce/v2/hs/JobHistory.java | 6 +
.../v2/TestSpeculativeExecutionWithMRApp.java | 6 +-
22 files changed, 1060 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2152be0..e28d575 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -426,6 +426,9 @@ Release 2.8.0 - UNRELEASED
MAPREDUCE-6353. Divide by zero error in MR AM when calculating available
containers. (Anubhav Dhoot via kasha)
+ MAPREDUCE-5465. Tasks are often killed before they exit on their own
+ (Ming Ma via jlowe)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index ffc5326..52b3497 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -264,7 +264,8 @@ public class LocalContainerLauncher extends AbstractService implements
context.getEventHandler().handle(
new TaskAttemptEvent(taId,
TaskAttemptEventType.TA_CONTAINER_CLEANED));
-
+ } else if (event.getType() == EventType.CONTAINER_COMPLETED) {
+ LOG.debug("Container completed " + event.toString());
} else {
LOG.warn("Ignoring unexpected event " + event.toString());
}
@@ -314,7 +315,14 @@ public class LocalContainerLauncher extends AbstractService implements
}
runSubtask(remoteTask, ytask.getType(), attemptID, numMapTasks,
(numReduceTasks > 0), localMapFiles);
-
+
+ // In non-uber mode, TA gets TA_CONTAINER_COMPLETED from MRAppMaster
+ // as part of NM -> RM -> AM notification route.
+ // In uber mode, given the task run inside the MRAppMaster container,
+ // we have to simulate the notification.
+ context.getEventHandler().handle(new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+
} catch (RuntimeException re) {
JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptID.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.NUM_FAILED_UBERTASKS, 1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
index 31e282a..4af11c3 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/AppContext.java
@@ -67,4 +67,6 @@ public interface AppContext {
boolean hasSuccessfullyUnregistered();
String getNMHostname();
+
+ TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
index 9272c7a..fa0a432 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
@@ -208,6 +208,14 @@ public class MRAppMaster extends CompositeService {
private SpeculatorEventDispatcher speculatorEventDispatcher;
private AMPreemptionPolicy preemptionPolicy;
+ // After a task attempt completes from TaskUmbilicalProtocol's point of view,
+ // it will be transitioned to finishing state.
+ // taskAttemptFinishingMonitor is just a timer for attempts in finishing
+ // state. If the attempt stays in finishing state for too long,
+ // taskAttemptFinishingMonitor will notify the attempt via TA_TIMED_OUT
+ // event.
+ private TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
+
private Job job;
private Credentials jobCredentials = new Credentials(); // Filled during init
protected UserGroupInformation currentUser; // Will be setup during init
@@ -250,6 +258,12 @@ public class MRAppMaster extends CompositeService {
logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
+ protected TaskAttemptFinishingMonitor createTaskAttemptFinishingMonitor(
+ EventHandler eventHandler) {
+ TaskAttemptFinishingMonitor monitor =
+ new TaskAttemptFinishingMonitor(eventHandler);
+ return monitor;
+ }
@Override
protected void serviceInit(final Configuration conf) throws Exception {
@@ -260,7 +274,11 @@ public class MRAppMaster extends CompositeService {
initJobCredentialsAndUGI(conf);
- context = new RunningAppContext(conf);
+ dispatcher = createDispatcher();
+ addIfService(dispatcher);
+ taskAttemptFinishingMonitor = createTaskAttemptFinishingMonitor(dispatcher.getEventHandler());
+ addIfService(taskAttemptFinishingMonitor);
+ context = new RunningAppContext(conf, taskAttemptFinishingMonitor);
// Job name is the same as the app name util we support DAG of jobs
// for an app later
@@ -327,9 +345,6 @@ public class MRAppMaster extends CompositeService {
}
if (errorHappenedShutDown) {
- dispatcher = createDispatcher();
- addIfService(dispatcher);
-
NoopEventHandler eater = new NoopEventHandler();
//We do not have a JobEventDispatcher in this path
dispatcher.register(JobEventType.class, eater);
@@ -376,9 +391,6 @@ public class MRAppMaster extends CompositeService {
} else {
committer = createOutputCommitter(conf);
- dispatcher = createDispatcher();
- addIfService(dispatcher);
-
//service to handle requests from JobClient
clientService = createClientService(context);
// Init ClientService separately so that we stop it separately, since this
@@ -967,10 +979,14 @@ public class MRAppMaster extends CompositeService {
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
private TimelineClient timelineClient = null;
- public RunningAppContext(Configuration config) {
+ private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
+
+ public RunningAppContext(Configuration config,
+ TaskAttemptFinishingMonitor taskAttemptFinishingMonitor) {
this.conf = config;
this.clientToAMTokenSecretManager =
new ClientToAMTokenSecretManager(appAttemptID, null);
+ this.taskAttemptFinishingMonitor = taskAttemptFinishingMonitor;
if (conf.getBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA,
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)
&& conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
@@ -1072,7 +1088,12 @@ public class MRAppMaster extends CompositeService {
public String getNMHostname() {
return nmHost;
}
-
+
+ @Override
+ public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+ return taskAttemptFinishingMonitor;
+ }
+
// Get Timeline Collector's address (get sync from RM)
public TimelineClient getTimelineClient() {
return timelineClient;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java
new file mode 100644
index 0000000..f603398
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/TaskAttemptFinishingMonitor.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce.v2.app;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.AbstractLivelinessMonitor;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+/**
+ * This class generates TA_TIMED_OUT if the task attempt stays in FINISHING
+ * state for too long.
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class TaskAttemptFinishingMonitor extends
+ AbstractLivelinessMonitor<TaskAttemptId> {
+
+ private EventHandler eventHandler;
+
+ public TaskAttemptFinishingMonitor(EventHandler eventHandler) {
+ super("TaskAttemptFinishingMonitor", new SystemClock());
+ this.eventHandler = eventHandler;
+ }
+
+ public void init(Configuration conf) {
+ super.init(conf);
+ int expireIntvl = conf.getInt(MRJobConfig.TASK_EXIT_TIMEOUT,
+ MRJobConfig.TASK_EXIT_TIMEOUT_DEFAULT);
+ int checkIntvl = conf.getInt(
+ MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS,
+ MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS_DEFAULT);
+
+ setExpireInterval(expireIntvl);
+ setMonitorInterval(checkIntvl);
+ }
+
+ @Override
+ protected void expire(TaskAttemptId id) {
+ eventHandler.handle(
+ new TaskAttemptEvent(id,
+ TaskAttemptEventType.TA_TIMED_OUT));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
index ceb1dbf..d378b0a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/client/MRClientService.java
@@ -370,7 +370,7 @@ public class MRClientService extends AbstractService implements ClientService {
new TaskAttemptDiagnosticsUpdateEvent(taskAttemptId, message));
appContext.getEventHandler().handle(
new TaskAttemptEvent(taskAttemptId,
- TaskAttemptEventType.TA_FAILMSG));
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT));
FailTaskAttemptResponse response = recordFactory.
newRecordInstance(FailTaskAttemptResponse.class);
return response;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
index f6c3e57..5f17651 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/TaskAttemptStateInternal.java
@@ -30,9 +30,42 @@ public enum TaskAttemptStateInternal {
UNASSIGNED,
ASSIGNED,
RUNNING,
- COMMIT_PENDING,
- SUCCESS_CONTAINER_CLEANUP,
- SUCCEEDED,
+ COMMIT_PENDING,
+
+ // Transition into SUCCESS_FINISHING_CONTAINER
+ // After the attempt finishes successfully from
+ // TaskUmbilicalProtocol's point of view, it will transition to
+ // SUCCESS_FINISHING_CONTAINER state. That will give a chance for the
+ // container to exit by itself. In the transition,
+ // the attempt will notify the task via T_ATTEMPT_SUCCEEDED so that
+ // from job point of view, the task is considered succeeded.
+
+ // Transition out of SUCCESS_FINISHING_CONTAINER
+ // The attempt will transition from SUCCESS_FINISHING_CONTAINER to
+ // SUCCESS_CONTAINER_CLEANUP if it doesn't receive container exit
+ // notification within TASK_EXIT_TIMEOUT;
+ // Or it will transition to SUCCEEDED if it receives container exit
+ // notification from YARN.
+ SUCCESS_FINISHING_CONTAINER,
+
+ // Transition into FAIL_FINISHING_CONTAINER
+ // After the attempt fails from
+ // TaskUmbilicalProtocol's point of view, it will transition to
+ // FAIL_FINISHING_CONTAINER state. That will give a chance for the container
+ // to exit by itself. In the transition,
+ // the attempt will notify the task via T_ATTEMPT_FAILED so that
+ // from job point of view, the task is considered failed.
+
+ // Transition out of FAIL_FINISHING_CONTAINER
+ // The attempt will transition from FAIL_FINISHING_CONTAINER to
+ // FAIL_CONTAINER_CLEANUP if it doesn't receive container exit
+ // notification within TASK_EXIT_TIMEOUT;
+ // Or it will transition to FAILED if it receives container exit
+ // notification from YARN.
+ FAIL_FINISHING_CONTAINER,
+
+ SUCCESS_CONTAINER_CLEANUP,
+ SUCCEEDED,
FAIL_CONTAINER_CLEANUP,
FAIL_TASK_CLEANUP,
FAILED,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
index 1f05ac3..61de032 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptEventType.java
@@ -49,6 +49,9 @@ public enum TaskAttemptEventType {
TA_TIMED_OUT,
TA_PREEMPTED,
+ //Producer:Client
+ TA_FAILMSG_BY_CLIENT,
+
//Producer:TaskCleaner
TA_CLEANUP_DONE,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
index f4b434b..7e82df2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java
@@ -184,8 +184,20 @@ public abstract class TaskAttemptImpl implements
private Locality locality;
private Avataar avataar;
- private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION =
- new CleanupContainerTransition();
+ private static final CleanupContainerTransition
+ CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition();
+ private static final MoveContainerToSucceededFinishingTransition
+ SUCCEEDED_FINISHING_TRANSITION =
+ new MoveContainerToSucceededFinishingTransition();
+ private static final MoveContainerToFailedFinishingTransition
+ FAILED_FINISHING_TRANSITION =
+ new MoveContainerToFailedFinishingTransition();
+ private static final ExitFinishingOnTimeoutTransition
+ FINISHING_ON_TIMEOUT_TRANSITION =
+ new ExitFinishingOnTimeoutTransition();
+
+ private static final FinalizeFailedTransition FINALIZE_FAILED_TRANSITION =
+ new FinalizeFailedTransition();
private static final DiagnosticInformationUpdater
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION
@@ -204,6 +216,8 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+ TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_TOO_MANY_FETCH_FAILURE);
private static final StateMachineFactory
@@ -221,16 +235,16 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new KilledTransition())
.addTransition(TaskAttemptStateInternal.NEW, TaskAttemptStateInternal.FAILED,
- TaskAttemptEventType.TA_FAILMSG, new FailedTransition())
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new FailedTransition())
.addTransition(TaskAttemptStateInternal.NEW,
EnumSet.of(TaskAttemptStateInternal.FAILED,
TaskAttemptStateInternal.KILLED,
TaskAttemptStateInternal.SUCCEEDED),
TaskAttemptEventType.TA_RECOVER, new RecoverTransition())
.addTransition(TaskAttemptStateInternal.NEW,
- TaskAttemptStateInternal.NEW,
- TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
- DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ TaskAttemptStateInternal.NEW,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the UNASSIGNED state.
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
@@ -238,14 +252,14 @@ public abstract class TaskAttemptImpl implements
new ContainerAssignedTransition())
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_KILL, new DeallocateContainerTransition(
- TaskAttemptStateInternal.KILLED, true))
+ TaskAttemptStateInternal.KILLED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED, TaskAttemptStateInternal.FAILED,
- TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, new DeallocateContainerTransition(
TaskAttemptStateInternal.FAILED, true))
.addTransition(TaskAttemptStateInternal.UNASSIGNED,
- TaskAttemptStateInternal.UNASSIGNED,
- TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
- DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ TaskAttemptStateInternal.UNASSIGNED,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
// Transitions from the ASSIGNED state.
.addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.RUNNING,
@@ -258,15 +272,19 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
new DeallocateContainerTransition(TaskAttemptStateInternal.FAILED, false))
.addTransition(TaskAttemptStateInternal.ASSIGNED,
- TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
- CLEANUP_CONTAINER_TRANSITION)
+ FINALIZE_FAILED_TRANSITION)
.addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
- .addTransition(TaskAttemptStateInternal.ASSIGNED,
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.ASSIGNED,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
- TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+ CLEANUP_CONTAINER_TRANSITION)
// Transitions from RUNNING state.
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
@@ -274,23 +292,27 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.RUNNING, TaskAttemptStateInternal.RUNNING,
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
- // If no commit is required, task directly goes to success
+ // If no commit is required, task goes to finishing state
+ // This will give a chance for the container to exit by itself
.addTransition(TaskAttemptStateInternal.RUNNING,
- TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
- TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
// If commit is required, task goes through commit pending state.
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptEventType.TA_COMMIT_PENDING, new CommitPendingTransition())
// Failure handling while RUNNING
.addTransition(TaskAttemptStateInternal.RUNNING,
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
- TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT, CLEANUP_CONTAINER_TRANSITION)
//for handling container exit without sending the done or fail msg
.addTransition(TaskAttemptStateInternal.RUNNING,
- TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
- CLEANUP_CONTAINER_TRANSITION)
+ FINALIZE_FAILED_TRANSITION)
// Timeout handling while RUNNING
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
@@ -301,12 +323,97 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
// Kill handling
.addTransition(TaskAttemptStateInternal.RUNNING,
- TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
.addTransition(TaskAttemptStateInternal.RUNNING,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_PREEMPTED, new PreemptedTransition())
+ // Transitions from SUCCESS_FINISHING_CONTAINER state
+ // When the container exits by itself, the notification of container
+ // completed event will be routed via NM -> RM -> AM.
+ // After MRAppMaster gets notification from RM, it will generate
+ // TA_CONTAINER_COMPLETED event.
+ .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ new ExitFinishingOnContainerCompletedTransition())
+ // Given TA notifies task T_ATTEMPT_SUCCEEDED when it transitions to
+ // SUCCESS_FINISHING_CONTAINER, it is possible to receive the event
+ // TA_CONTAINER_CLEANED in the following scenario.
+ // 1. It is the last task for the job.
+ // 2. After the task receives T_ATTEMPT_SUCCEEDED, it will notify job.
+ // 3. Job will be marked completed.
+ // 4. As part of MRAppMaster's shutdown, all containers will be killed.
+ .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED,
+ new ExitFinishingOnContainerCleanedupTransition())
+ // The client wants to kill the task. Given the task is in finishing
+ // state, it could go to succeeded state or killed state. If it is a
+ // reducer, it will go to succeeded state;
+ // otherwise, it goes to killed state.
+ .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ EnumSet.of(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP),
+ TaskAttemptEventType.TA_KILL,
+ new KilledAfterSucceededFinishingTransition())
+ // The attempt stays in finishing state for too long
+ // Let us clean up the container
+ .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ // ignore-able events
+ .addTransition(TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ EnumSet.of(TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
+
+ // Transitions from FAIL_FINISHING_CONTAINER state
+ // When the container exits by itself, the notification of container
+ // completed event will be routed via NM -> RM -> AM.
+ // After MRAppMaster gets notification from RM, it will generate
+ // TA_CONTAINER_COMPLETED event.
+ .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED,
+ new ExitFinishingOnContainerCompletedTransition())
+ // Given TA notifies task T_ATTEMPT_FAILED when it transitions to
+ // FAIL_FINISHING_CONTAINER, it is possible to receive the event
+ // TA_CONTAINER_CLEANED in the following scenario.
+ // 1. It is the last task attempt for the task.
+ // 2. After the task receives T_ATTEMPT_FAILED, it will notify job.
+ // 3. Job will be marked failed.
+ // 4. As part of MRAppMaster's shutdown, all containers will be killed.
+ .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.FAILED,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED,
+ new ExitFinishingOnContainerCleanedupTransition())
+ .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_TIMED_OUT, FINISHING_ON_TIMEOUT_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
+ DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
+ // ignore-able events
+ .addTransition(TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ EnumSet.of(TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_UPDATE,
+ TaskAttemptEventType.TA_DONE,
+ TaskAttemptEventType.TA_COMMIT_PENDING,
+ TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT))
+
// Transitions from COMMIT_PENDING state
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.COMMIT_PENDING, TaskAttemptEventType.TA_UPDATE,
@@ -316,22 +423,27 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_DIAGNOSTICS_UPDATE,
DIAGNOSTIC_INFORMATION_UPDATE_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
- TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
- TaskAttemptEventType.TA_DONE, CLEANUP_CONTAINER_TRANSITION)
+ TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER,
+ TaskAttemptEventType.TA_DONE, SUCCEEDED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
- TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL,
+ TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_KILL,
CLEANUP_CONTAINER_TRANSITION)
// if container killed by AM shutting down
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.KILLED,
TaskAttemptEventType.TA_CONTAINER_CLEANED, new KilledTransition())
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
- TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
- TaskAttemptEventType.TA_FAILMSG, CLEANUP_CONTAINER_TRANSITION)
+ TaskAttemptStateInternal.FAIL_FINISHING_CONTAINER,
+ TaskAttemptEventType.TA_FAILMSG, FAILED_FINISHING_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+ CLEANUP_CONTAINER_TRANSITION)
+ .addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
+ TaskAttemptStateInternal.FAILED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
- CLEANUP_CONTAINER_TRANSITION)
+ FINALIZE_FAILED_TRANSITION)
.addTransition(TaskAttemptStateInternal.COMMIT_PENDING,
TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_TIMED_OUT, CLEANUP_CONTAINER_TRANSITION)
@@ -348,8 +460,8 @@ public abstract class TaskAttemptImpl implements
// Transitions from SUCCESS_CONTAINER_CLEANUP state
// kill and cleanup the container
.addTransition(TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
- TaskAttemptStateInternal.SUCCEEDED, TaskAttemptEventType.TA_CONTAINER_CLEANED,
- new SucceededTransition())
+ TaskAttemptStateInternal.SUCCEEDED,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED)
.addTransition(
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
@@ -360,6 +472,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP,
EnumSet.of(TaskAttemptEventType.TA_KILL,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
@@ -383,6 +496,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from KILL_CONTAINER_CLEANUP
@@ -405,6 +519,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_TIMED_OUT))
// Transitions from FAIL_TASK_CLEANUP
@@ -425,6 +540,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
// Container launch events can arrive late
TaskAttemptEventType.TA_CONTAINER_LAUNCHED,
@@ -447,6 +563,7 @@ public abstract class TaskAttemptImpl implements
TaskAttemptEventType.TA_COMMIT_PENDING,
TaskAttemptEventType.TA_DONE,
TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_PREEMPTED,
// Container launch events can arrive late
@@ -460,7 +577,7 @@ public abstract class TaskAttemptImpl implements
new TooManyFetchFailureTransition())
.addTransition(TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.KILLED),
- TaskAttemptEventType.TA_KILL,
+ TaskAttemptEventType.TA_KILL,
new KilledAfterSuccessTransition())
.addTransition(
TaskAttemptStateInternal.SUCCEEDED, TaskAttemptStateInternal.SUCCEEDED,
@@ -470,6 +587,10 @@ public abstract class TaskAttemptImpl implements
.addTransition(TaskAttemptStateInternal.SUCCEEDED,
TaskAttemptStateInternal.SUCCEEDED,
EnumSet.of(TaskAttemptEventType.TA_FAILMSG,
+ TaskAttemptEventType.TA_FAILMSG_BY_CLIENT,
+ // TaskAttemptFinishingMonitor might time out the attempt right
+ // after the attempt receives TA_CONTAINER_COMPLETED.
+ TaskAttemptEventType.TA_TIMED_OUT,
TaskAttemptEventType.TA_CONTAINER_CLEANED,
TaskAttemptEventType.TA_CONTAINER_COMPLETED))
@@ -1213,21 +1334,21 @@ public abstract class TaskAttemptImpl implements
return TaskAttemptState.STARTING;
case COMMIT_PENDING:
return TaskAttemptState.COMMIT_PENDING;
- case FAILED:
- return TaskAttemptState.FAILED;
- case KILLED:
- return TaskAttemptState.KILLED;
- // All CLEANUP states considered as RUNNING since events have not gone out
- // to the Task yet. May be possible to consider them as a Finished state.
case FAIL_CONTAINER_CLEANUP:
case FAIL_TASK_CLEANUP:
+ case FAIL_FINISHING_CONTAINER:
+ case FAILED:
+ return TaskAttemptState.FAILED;
case KILL_CONTAINER_CLEANUP:
case KILL_TASK_CLEANUP:
- case SUCCESS_CONTAINER_CLEANUP:
+ case KILLED:
+ return TaskAttemptState.KILLED;
case RUNNING:
return TaskAttemptState.RUNNING;
case NEW:
return TaskAttemptState.NEW;
+ case SUCCESS_CONTAINER_CLEANUP:
+ case SUCCESS_FINISHING_CONTAINER:
case SUCCEEDED:
return TaskAttemptState.SUCCEEDED;
default:
@@ -1429,6 +1550,15 @@ public abstract class TaskAttemptImpl implements
}
}
+ private static void finalizeProgress(TaskAttemptImpl taskAttempt) {
+ // unregister it to TaskAttemptListener so that it stops listening
+ taskAttempt.taskAttemptListener.unregister(
+ taskAttempt.attemptId, taskAttempt.jvmID);
+ taskAttempt.reportedStatus.progress = 1.0f;
+ taskAttempt.updateProgressSplits();
+ }
+
+
static class RequestContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
private final boolean rescheduled;
@@ -1661,53 +1791,66 @@ public abstract class TaskAttemptImpl implements
}
}
- private static class SucceededTransition implements
+ /**
+ * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
+ * state upon receiving TA_CONTAINER_COMPLETED event
+ */
+ private static class ExitFinishingOnContainerCompletedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
- public void transition(TaskAttemptImpl taskAttempt,
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+ taskAttempt.attemptId);
+ sendContainerCompleted(taskAttempt);
+ }
+ }
+
+ private static class ExitFinishingOnContainerCleanedupTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
- //set the finish time
- taskAttempt.setFinishTime();
- taskAttempt.eventHandler.handle(
- createJobCounterUpdateEventTASucceeded(taskAttempt));
- taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
- taskAttempt.attemptId,
- TaskEventType.T_ATTEMPT_SUCCEEDED));
- taskAttempt.eventHandler.handle
- (new SpeculatorEvent
- (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
- }
+ taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+ taskAttempt.attemptId);
+ }
}
private static class FailedTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@Override
- public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) {
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
// set the finish time
taskAttempt.setFinishTime();
-
- if (taskAttempt.getLaunchTime() != 0) {
- taskAttempt.eventHandler
- .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
- TaskAttemptUnsuccessfulCompletionEvent tauce =
- createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
- TaskAttemptStateInternal.FAILED);
- taskAttempt.eventHandler.handle(new JobHistoryEvent(
- taskAttempt.attemptId.getTaskId().getJobId(), tauce));
- // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
- // handling failed map/reduce events.
- }else {
- LOG.debug("Not generating HistoryFinish event since start event not " +
- "generated for taskAttempt: " + taskAttempt.getID());
- }
- taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
- taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+ notifyTaskAttemptFailed(taskAttempt);
}
}
+ private static class FinalizeFailedTransition extends FailedTransition {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ finalizeProgress(taskAttempt);
+ sendContainerCompleted(taskAttempt);
+ super.transition(taskAttempt, event);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void sendContainerCompleted(TaskAttemptImpl taskAttempt) {
+ taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+ taskAttempt.attemptId,
+ taskAttempt.container.getId(), StringInterner
+ .weakIntern(taskAttempt.container.getNodeId().toString()),
+ taskAttempt.container.getContainerToken(),
+ ContainerLauncher.EventType.CONTAINER_COMPLETED));
+ }
+
private static class RecoverTransition implements
MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent, TaskAttemptStateInternal> {
@@ -1832,6 +1975,35 @@ public abstract class TaskAttemptImpl implements
}
}
+ private static class KilledAfterSucceededFinishingTransition
+ implements MultipleArcTransition<TaskAttemptImpl, TaskAttemptEvent,
+ TaskAttemptStateInternal> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+ taskAttempt.attemptId);
+ sendContainerCleanup(taskAttempt, event);
+ if(taskAttempt.getID().getTaskId().getTaskType() == TaskType.REDUCE) {
+ // after a reduce task has succeeded, its outputs are in safe in HDFS.
+ // logically such a task should not be killed. we only come here when
+ // there is a race condition in the event queue. E.g. some logic sends
+ // a kill request to this attempt when the successful completion event
+ // for this task is already in the event queue. so the kill event will
+ // get executed immediately after the attempt is marked successful and
+ // result in this transition being exercised.
+ // ignore this for reduce tasks
+ LOG.info("Ignoring killed event for successful reduce task attempt" +
+ taskAttempt.getID().toString());
+ return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP;
+ } else {
+ return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP;
+ }
+ }
+ }
+
private static class KilledTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@@ -1887,6 +2059,31 @@ public abstract class TaskAttemptImpl implements
}
}
+ /**
+ * Transition from SUCCESS_FINISHING_CONTAINER or FAIL_FINISHING_CONTAINER
+ * state upon receiving TA_TIMED_OUT event
+ */
+ private static class ExitFinishingOnTimeoutTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ taskAttempt.appContext.getTaskAttemptFinishingMonitor().unregister(
+ taskAttempt.attemptId);
+ // The attempt stays in finishing state for too long
+ String msg = "Task attempt " + taskAttempt.getID() + " is done from " +
+ "TaskUmbilicalProtocol's point of view. However, it stays in " +
+ "finishing state for too long";
+ LOG.warn(msg);
+ taskAttempt.addDiagnosticInfo(msg);
+ sendContainerCleanup(taskAttempt, event);
+ }
+ }
+
+ /**
+ * Finish and clean up the container
+ */
private static class CleanupContainerTransition implements
SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
@SuppressWarnings("unchecked")
@@ -1894,27 +2091,103 @@ public abstract class TaskAttemptImpl implements
public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) {
// unregister it to TaskAttemptListener so that it stops listening
- // for it
- taskAttempt.taskAttemptListener.unregister(
- taskAttempt.attemptId, taskAttempt.jvmID);
+ // for it.
+ finalizeProgress(taskAttempt);
+ sendContainerCleanup(taskAttempt, event);
+ }
+ }
- if (event instanceof TaskAttemptKillEvent) {
- taskAttempt.addDiagnosticInfo(
- ((TaskAttemptKillEvent) event).getMessage());
- }
+ @SuppressWarnings("unchecked")
+ private static void sendContainerCleanup(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ if (event instanceof TaskAttemptKillEvent) {
+ taskAttempt.addDiagnosticInfo(
+ ((TaskAttemptKillEvent) event).getMessage());
+ }
+ //send the cleanup event to containerLauncher
+ taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
+ taskAttempt.attemptId,
+ taskAttempt.container.getId(), StringInterner
+ .weakIntern(taskAttempt.container.getNodeId().toString()),
+ taskAttempt.container.getContainerToken(),
+ ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
+ }
+
+ /**
+ * Transition to SUCCESS_FINISHING_CONTAINER upon receiving TA_DONE event
+ */
+ private static class MoveContainerToSucceededFinishingTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ finalizeProgress(taskAttempt);
+
+ // register it to finishing state
+ taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
+ taskAttempt.attemptId);
+
+ // set the finish time
+ taskAttempt.setFinishTime();
+
+ // notify job history
+ taskAttempt.eventHandler.handle(
+ createJobCounterUpdateEventTASucceeded(taskAttempt));
+ taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.SUCCEEDED);
+
+ //notify the task even though the container might not have exited yet.
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId,
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ taskAttempt.eventHandler.handle
+ (new SpeculatorEvent
+ (taskAttempt.reportedStatus, taskAttempt.clock.getTime()));
- taskAttempt.reportedStatus.progress = 1.0f;
- taskAttempt.updateProgressSplits();
- //send the cleanup event to containerLauncher
- taskAttempt.eventHandler.handle(new ContainerLauncherEvent(
- taskAttempt.attemptId,
- taskAttempt.container.getId(), StringInterner
- .weakIntern(taskAttempt.container.getNodeId().toString()),
- taskAttempt.container.getContainerToken(),
- ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP));
}
}
+ /**
+ * Transition to FAIL_FINISHING_CONTAINER upon receiving TA_FAILMSG event
+ */
+ private static class MoveContainerToFailedFinishingTransition implements
+ SingleArcTransition<TaskAttemptImpl, TaskAttemptEvent> {
+ @SuppressWarnings("unchecked")
+ @Override
+ public void transition(TaskAttemptImpl taskAttempt,
+ TaskAttemptEvent event) {
+ finalizeProgress(taskAttempt);
+ // register it to finishing state
+ taskAttempt.appContext.getTaskAttemptFinishingMonitor().register(
+ taskAttempt.attemptId);
+ notifyTaskAttemptFailed(taskAttempt);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) {
+ // set the finish time
+ taskAttempt.setFinishTime();
+
+ if (taskAttempt.getLaunchTime() != 0) {
+ taskAttempt.eventHandler
+ .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false));
+ TaskAttemptUnsuccessfulCompletionEvent tauce =
+ createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt,
+ TaskAttemptStateInternal.FAILED);
+ taskAttempt.eventHandler.handle(new JobHistoryEvent(
+ taskAttempt.attemptId.getTaskId().getJobId(), tauce));
+ // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not
+ // handling failed map/reduce events.
+ }else {
+ LOG.debug("Not generating HistoryFinish event since start event not " +
+ "generated for taskAttempt: " + taskAttempt.getID());
+ }
+ taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
+ taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED));
+
+ }
+
private void addDiagnosticInfo(String diag) {
if (diag != null && !diag.equals("")) {
diagnostics.add(diag);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
index 40ecdb2..82360f0 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncher.java
@@ -27,7 +27,13 @@ public interface ContainerLauncher
enum EventType {
CONTAINER_REMOTE_LAUNCH,
- CONTAINER_REMOTE_CLEANUP
+ CONTAINER_REMOTE_CLEANUP,
+ // When TaskAttempt receives TA_CONTAINER_COMPLETED,
+ // it will notify ContainerLauncher so that the container can be removed
+ // from ContainerLauncher's launched containers list
+ // Otherwise, ContainerLauncher will try to stop the containers as part of
+ // serviceStop.
+ CONTAINER_COMPLETED
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index 9c1125d..a7e966c 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -121,7 +121,11 @@ public class ContainerLauncherImpl extends AbstractService implements
public synchronized boolean isCompletelyDone() {
return state == ContainerState.DONE || state == ContainerState.FAILED;
}
-
+
+ public synchronized void done() {
+ state = ContainerState.DONE;
+ }
+
@SuppressWarnings("unchecked")
public synchronized void launch(ContainerRemoteLaunchEvent event) {
LOG.info("Launching " + taskAttemptID);
@@ -378,6 +382,11 @@ public class ContainerLauncherImpl extends AbstractService implements
case CONTAINER_REMOTE_CLEANUP:
c.kill();
break;
+
+ case CONTAINER_COMPLETED:
+ c.done();
+ break;
+
}
removeContainerIfDone(containerID);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
new file mode 100644
index 0000000..800f0e2
--- /dev/null
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapred/TestTaskAttemptFinishingMonitor.java
@@ -0,0 +1,108 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.mapred;
+
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.TaskAttemptFinishingMonitor;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
+import org.apache.hadoop.mapreduce.v2.app.rm.preemption.CheckpointAMPreemptionPolicy;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
+import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
+import org.apache.hadoop.yarn.event.Event;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.util.SystemClock;
+
+import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestTaskAttemptFinishingMonitor {
+
+ @Test
+ public void testFinshingAttemptTimeout()
+ throws IOException, InterruptedException {
+ SystemClock clock = new SystemClock();
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT, 100);
+ conf.setInt(MRJobConfig.TASK_EXIT_TIMEOUT_CHECK_INTERVAL_MS, 10);
+
+ AppContext appCtx = mock(AppContext.class);
+ JobTokenSecretManager secret = mock(JobTokenSecretManager.class);
+ RMHeartbeatHandler rmHeartbeatHandler =
+ mock(RMHeartbeatHandler.class);
+ MockEventHandler eventHandler = new MockEventHandler();
+ TaskAttemptFinishingMonitor taskAttemptFinishingMonitor =
+ new TaskAttemptFinishingMonitor(eventHandler);
+ taskAttemptFinishingMonitor.init(conf);
+ taskAttemptFinishingMonitor.start();
+
+ when(appCtx.getEventHandler()).thenReturn(eventHandler);
+ when(appCtx.getNMHostname()).thenReturn("0.0.0.0");
+ when(appCtx.getTaskAttemptFinishingMonitor()).thenReturn(
+ taskAttemptFinishingMonitor);
+ when(appCtx.getClock()).thenReturn(clock);
+
+ CheckpointAMPreemptionPolicy policy = new CheckpointAMPreemptionPolicy();
+ policy.init(appCtx);
+ TaskAttemptListenerImpl listener =
+ new TaskAttemptListenerImpl(appCtx, secret, rmHeartbeatHandler, policy);
+
+ listener.init(conf);
+ listener.start();
+
+ JobId jid = MRBuilderUtils.newJobId(12345, 1, 1);
+ TaskId tid = MRBuilderUtils.newTaskId(jid, 0,
+ org.apache.hadoop.mapreduce.v2.api.records.TaskType.MAP);
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(tid, 0);
+ appCtx.getTaskAttemptFinishingMonitor().register(attemptId);
+ int check = 0;
+ while ( !eventHandler.timedOut && check++ < 10 ) {
+ Thread.sleep(100);
+ }
+ taskAttemptFinishingMonitor.stop();
+
+ assertTrue("Finishing attempt didn't time out.", eventHandler.timedOut);
+
+ }
+
+ public static class MockEventHandler implements EventHandler {
+ public boolean timedOut = false;
+
+ @Override
+ public void handle(Event event) {
+ if (event instanceof TaskAttemptEvent) {
+ TaskAttemptEvent attemptEvent = ((TaskAttemptEvent) event);
+ if (TaskAttemptEventType.TA_TIMED_OUT == attemptEvent.getType()) {
+ timedOut = true;
+ }
+ }
+ }
+ };
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
index 58db925..4fe4c44 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
@@ -482,6 +482,20 @@ public class MRApp extends MRAppMaster {
}
@Override
+ protected TaskAttemptFinishingMonitor
+ createTaskAttemptFinishingMonitor(
+ EventHandler eventHandler) {
+ return new TaskAttemptFinishingMonitor(eventHandler) {
+ @Override
+ public synchronized void register(TaskAttemptId attemptID) {
+ getContext().getEventHandler().handle(
+ new TaskAttemptEvent(attemptID,
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
+ }
+ };
+ }
+
+ @Override
protected TaskAttemptListener createTaskAttemptListener(
AppContext context, AMPreemptionPolicy policy) {
return new TaskAttemptListener(){
@@ -541,6 +555,8 @@ public class MRApp extends MRAppMaster {
new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
break;
+ case CONTAINER_COMPLETED:
+ break;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
index a900241..e690f3f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MockAppContext.java
@@ -148,4 +148,10 @@ public class MockAppContext implements AppContext {
// bogus - Not Required
return null;
}
+
+ @Override
+ public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+ return null;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
index 4a36938..4d3f6f4 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFail.java
@@ -223,6 +223,8 @@ public class TestFail {
new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
break;
+ case CONTAINER_COMPLETED:
+ super.handle(event);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
index c33bd4d..aae591e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
@@ -159,7 +159,7 @@ public class TestKill {
super.dispatch(new TaskAttemptEvent(taID,
TaskAttemptEventType.TA_DONE));
super.dispatch(new TaskAttemptEvent(taID,
- TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ TaskAttemptEventType.TA_CONTAINER_COMPLETED));
super.dispatch(new TaskTAttemptEvent(taID,
TaskEventType.T_ATTEMPT_SUCCEEDED));
this.cachedKillEvent = killEvent;
@@ -211,40 +211,9 @@ public class TestKill {
app.getContext().getEventHandler()
.handle(new JobEvent(jobId, JobEventType.JOB_KILL));
- app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
- }
-
- static class MyAsyncDispatch extends AsyncDispatcher {
- private CountDownLatch latch;
- private TaskAttemptEventType attemptEventTypeToWait;
- MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
- super();
- this.latch = latch;
- this.attemptEventTypeToWait = attemptEventTypeToWait;
- }
-
- @Override
- protected void dispatch(Event event) {
- if (event instanceof TaskAttemptEvent) {
- TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
- TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
- if (attemptEvent.getType() == this.attemptEventTypeToWait
- && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
- try {
- latch.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- super.dispatch(event);
- }
+ app.waitForInternalState((JobImpl) job, JobStateInternal.KILLED);
}
- // This is to test a race condition where JobEventType.JOB_KILL is generated
- // right after TaskAttemptEventType.TA_DONE is generated.
- // TaskImpl's state machine might receive both T_ATTEMPT_SUCCEEDED
- // and T_ATTEMPT_KILLED from the same attempt.
@Test
public void testKillTaskWaitKillJobAfterTA_DONE() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
@@ -269,15 +238,12 @@ public class TestKill {
TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
- // The order in the dispatch event queue, from the oldest to the newest
+ // The order in the dispatch event queue, from first to last
// TA_DONE
- // JOB_KILL
- // CONTAINER_REMOTE_CLEANUP ( from TA_DONE's handling )
- // T_KILL ( from JOB_KILL's handling )
- // TA_CONTAINER_CLEANED ( from CONTAINER_REMOTE_CLEANUP's handling )
- // TA_KILL ( from T_KILL's handling )
- // T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_CLEANED's handling )
- // T_ATTEMPT_KILLED ( from TA_KILL's handling )
+ // JobEventType.JOB_KILL
+ // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
+ // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
+ // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
// Finish map
app.getContext().getEventHandler().handle(
@@ -295,6 +261,100 @@ public class TestKill {
app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
}
+
+ @Test
+ public void testKillTaskWaitKillJobBeforeTA_DONE() throws Exception {
+ CountDownLatch latch = new CountDownLatch(1);
+ final Dispatcher dispatcher = new MyAsyncDispatch(latch, JobEventType.JOB_KILL);
+ MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+ @Override
+ public Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ Job job = app.submit(new Configuration());
+ JobId jobId = app.getJobId();
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask = it.next();
+ Task reduceTask = it.next();
+ app.waitForState(mapTask, TaskState.RUNNING);
+ app.waitForState(reduceTask, TaskState.RUNNING);
+ TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+ TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+ // The order in the dispatch event queue, from first to last
+ // JobEventType.JOB_KILL
+ // TA_DONE
+ // TaskEventType.T_KILL ( from JobEventType.JOB_KILL handling )
+ // TaskAttemptEventType.TA_CONTAINER_COMPLETED ( from TA_DONE handling )
+ // TaskAttemptEventType.TA_KILL ( from TaskEventType.T_KILL handling )
+ // TaskEventType.T_ATTEMPT_SUCCEEDED ( from TA_CONTAINER_COMPLETED handling )
+ // TaskEventType.T_ATTEMPT_KILLED ( from TA_KILL handling )
+
+ // Now kill the job
+ app.getContext().getEventHandler()
+ .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+ // Finish map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ mapAttempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+
+ //unblock
+ latch.countDown();
+
+ app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+ }
+
+ static class MyAsyncDispatch extends AsyncDispatcher {
+ private CountDownLatch latch;
+ private TaskAttemptEventType attemptEventTypeToWait;
+ private JobEventType jobEventTypeToWait;
+ MyAsyncDispatch(CountDownLatch latch, TaskAttemptEventType attemptEventTypeToWait) {
+ super();
+ this.latch = latch;
+ this.attemptEventTypeToWait = attemptEventTypeToWait;
+ }
+
+ MyAsyncDispatch(CountDownLatch latch, JobEventType jobEventTypeToWait) {
+ super();
+ this.latch = latch;
+ this.jobEventTypeToWait = jobEventTypeToWait;
+ }
+
+ @Override
+ protected void dispatch(Event event) {
+ if (event instanceof TaskAttemptEvent) {
+ TaskAttemptEvent attemptEvent = (TaskAttemptEvent) event;
+ TaskAttemptId attemptID = ((TaskAttemptEvent) event).getTaskAttemptID();
+ if (attemptEvent.getType() == this.attemptEventTypeToWait
+ && attemptID.getTaskId().getId() == 0 && attemptID.getId() == 0 ) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ } else if ( event instanceof JobEvent) {
+ JobEvent jobEvent = (JobEvent) event;
+ if (jobEvent.getType() == this.jobEventTypeToWait) {
+ try {
+ latch.await();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ super.dispatch(event);
+ }
+ }
+
@Test
public void testKillTaskAttempt() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5625ac46/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
index 69f2709..475cd1f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRuntimeEstimators.java
@@ -884,5 +884,10 @@ public class TestRuntimeEstimators {
// bogus - Not Required
return null;
}
+
+ @Override
+ public TaskAttemptFinishingMonitor getTaskAttemptFinishingMonitor() {
+ return null;
+ }
}
}