You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/12 16:19:42 UTC
svn commit: r1408316 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: bobby
Date: Mon Nov 12 15:19:41 2012
New Revision: 1408316
URL: http://svn.apache.org/viewvc?rev=1408316&view=rev
Log:
svn merge -c 1408314 FIXES: MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1408316&r1=1408315&r2=1408316&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Mon Nov 12 15:19:41 2012
@@ -503,6 +503,8 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
state (jlowe via bobby)
+
+ MAPREDUCE-4751. AM stuck in KILL_WAIT for days (vinodkv via bobby)
Release 0.23.4 - UNRELEASED
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1408316&r1=1408315&r2=1408316&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Mon Nov 12 15:19:41 2012
@@ -712,7 +712,10 @@ public class JobImpl implements org.apac
* The only entry point to change the Job.
*/
public void handle(JobEvent event) {
- LOG.debug("Processing " + event.getJobId() + " of type " + event.getType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing " + event.getJobId() + " of type "
+ + event.getType());
+ }
try {
writeLock.lock();
JobStateInternal oldState = getInternalState();
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java?rev=1408316&r1=1408315&r2=1408316&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java Mon Nov 12 15:19:41 2012
@@ -22,9 +22,11 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -118,9 +120,18 @@ public abstract class TaskImpl implement
protected Credentials credentials;
protected Token<JobTokenIdentifier> jobToken;
+ //should be set to one which comes first
+ //saying COMMIT_PENDING
+ private TaskAttemptId commitAttempt;
+
+ private TaskAttemptId successfulAttempt;
+
+ private final Set<TaskAttemptId> failedAttempts;
+ // Track the finished attempts - successful, failed and killed
+ private final Set<TaskAttemptId> finishedAttempts;
// counts the number of attempts that are either running or in a state where
// they will come to be running when they get a Container
- private int numberUncompletedAttempts = 0;
+ private final Set<TaskAttemptId> inProgressAttempts;
private boolean historyTaskStartGenerated = false;
@@ -182,6 +193,14 @@ public abstract class TaskImpl implement
EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
TaskEventType.T_ATTEMPT_KILLED,
new KillWaitAttemptKilledTransition())
+ .addTransition(TaskStateInternal.KILL_WAIT,
+ EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+ TaskEventType.T_ATTEMPT_SUCCEEDED,
+ new KillWaitAttemptSucceededTransition())
+ .addTransition(TaskStateInternal.KILL_WAIT,
+ EnumSet.of(TaskStateInternal.KILL_WAIT, TaskStateInternal.KILLED),
+ TaskEventType.T_ATTEMPT_FAILED,
+ new KillWaitAttemptFailedTransition())
// Ignore-able transitions.
.addTransition(
TaskStateInternal.KILL_WAIT,
@@ -189,8 +208,6 @@ public abstract class TaskImpl implement
EnumSet.of(TaskEventType.T_KILL,
TaskEventType.T_ATTEMPT_LAUNCHED,
TaskEventType.T_ATTEMPT_COMMIT_PENDING,
- TaskEventType.T_ATTEMPT_FAILED,
- TaskEventType.T_ATTEMPT_SUCCEEDED,
TaskEventType.T_ADD_SPEC_ATTEMPT))
// Transitions from SUCCEEDED state
@@ -242,15 +259,6 @@ public abstract class TaskImpl implement
private static final RecoverdAttemptsComparator RECOVERED_ATTEMPTS_COMPARATOR =
new RecoverdAttemptsComparator();
- //should be set to one which comes first
- //saying COMMIT_PENDING
- private TaskAttemptId commitAttempt;
-
- private TaskAttemptId successfulAttempt;
-
- private int failedAttempts;
- private int finishedAttempts;//finish are total of success, failed and killed
-
@Override
public TaskState getState() {
readLock.lock();
@@ -275,6 +283,9 @@ public abstract class TaskImpl implement
readLock = readWriteLock.readLock();
writeLock = readWriteLock.writeLock();
this.attempts = Collections.emptyMap();
+ this.finishedAttempts = new HashSet<TaskAttemptId>(2);
+ this.failedAttempts = new HashSet<TaskAttemptId>(2);
+ this.inProgressAttempts = new HashSet<TaskAttemptId>(2);
// This overridable method call is okay in a constructor because we
// have a convention that none of the overrides depends on any
// fields that need initialization.
@@ -611,9 +622,9 @@ public abstract class TaskImpl implement
taskAttemptsFromPreviousGeneration.remove(0).getAttemptId().getId();
}
- ++numberUncompletedAttempts;
+ inProgressAttempts.add(attempt.getID());
//schedule the nextAttemptNumber
- if (failedAttempts > 0) {
+ if (failedAttempts.size() > 0) {
eventHandler.handle(new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_RESCHEDULE));
} else {
@@ -788,12 +799,14 @@ public abstract class TaskImpl implement
implements SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
+ TaskTAttemptEvent taskTAttemptEvent = (TaskTAttemptEvent) event;
+ TaskAttemptId taskAttemptId = taskTAttemptEvent.getTaskAttemptID();
task.handleTaskAttemptCompletion(
- ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ taskAttemptId,
TaskAttemptCompletionEventStatus.SUCCEEDED);
- task.finishedAttempts++;
- --task.numberUncompletedAttempts;
- task.successfulAttempt = ((TaskTAttemptEvent) event).getTaskAttemptID();
+ task.finishedAttempts.add(taskAttemptId);
+ task.inProgressAttempts.remove(taskAttemptId);
+ task.successfulAttempt = taskAttemptId;
task.eventHandler.handle(new JobTaskEvent(
task.taskId, TaskState.SUCCEEDED));
LOG.info("Task succeeded with attempt " + task.successfulAttempt);
@@ -824,11 +837,13 @@ public abstract class TaskImpl implement
SingleArcTransition<TaskImpl, TaskEvent> {
@Override
public void transition(TaskImpl task, TaskEvent event) {
+ TaskAttemptId taskAttemptId =
+ ((TaskTAttemptEvent) event).getTaskAttemptID();
task.handleTaskAttemptCompletion(
- ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ taskAttemptId,
TaskAttemptCompletionEventStatus.KILLED);
- task.finishedAttempts++;
- --task.numberUncompletedAttempts;
+ task.finishedAttempts.add(taskAttemptId);
+ task.inProgressAttempts.remove(taskAttemptId);
if (task.successfulAttempt == null) {
task.addAndScheduleAttempt();
}
@@ -840,15 +855,25 @@ public abstract class TaskImpl implement
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
protected TaskStateInternal finalState = TaskStateInternal.KILLED;
+ protected final TaskAttemptCompletionEventStatus taCompletionEventStatus;
+
+ public KillWaitAttemptKilledTransition() {
+ this(TaskAttemptCompletionEventStatus.KILLED);
+ }
+
+ public KillWaitAttemptKilledTransition(
+ TaskAttemptCompletionEventStatus taCompletionEventStatus) {
+ this.taCompletionEventStatus = taCompletionEventStatus;
+ }
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
- task.handleTaskAttemptCompletion(
- ((TaskTAttemptEvent) event).getTaskAttemptID(),
- TaskAttemptCompletionEventStatus.KILLED);
- task.finishedAttempts++;
+ TaskAttemptId taskAttemptId =
+ ((TaskTAttemptEvent) event).getTaskAttemptID();
+ task.handleTaskAttemptCompletion(taskAttemptId, taCompletionEventStatus);
+ task.finishedAttempts.add(taskAttemptId);
// check whether all attempts are finished
- if (task.finishedAttempts == task.attempts.size()) {
+ if (task.finishedAttempts.size() == task.attempts.size()) {
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, null,
finalState, null); // TODO JH verify failedAttempt null
@@ -867,43 +892,57 @@ public abstract class TaskImpl implement
}
}
+ private static class KillWaitAttemptSucceededTransition extends
+ KillWaitAttemptKilledTransition {
+ public KillWaitAttemptSucceededTransition() {
+ super(TaskAttemptCompletionEventStatus.SUCCEEDED);
+ }
+ }
+
+ private static class KillWaitAttemptFailedTransition extends
+ KillWaitAttemptKilledTransition {
+ public KillWaitAttemptFailedTransition() {
+ super(TaskAttemptCompletionEventStatus.FAILED);
+ }
+ }
+
private static class AttemptFailedTransition implements
MultipleArcTransition<TaskImpl, TaskEvent, TaskStateInternal> {
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
- task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
- if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
+ TaskAttemptId taskAttemptId = castEvent.getTaskAttemptID();
+ task.failedAttempts.add(taskAttemptId);
+ if (taskAttemptId.equals(task.commitAttempt)) {
task.commitAttempt = null;
}
- TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
+ TaskAttempt attempt = task.attempts.get(taskAttemptId);
if (attempt.getAssignedContainerMgrAddress() != null) {
//container was assigned
task.eventHandler.handle(new ContainerFailedEvent(attempt.getID(),
attempt.getAssignedContainerMgrAddress()));
}
- task.finishedAttempts++;
- if (task.failedAttempts < task.maxAttempts) {
+ task.finishedAttempts.add(taskAttemptId);
+ if (task.failedAttempts.size() < task.maxAttempts) {
task.handleTaskAttemptCompletion(
- ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ taskAttemptId,
TaskAttemptCompletionEventStatus.FAILED);
// we don't need a new event if we already have a spare
- if (--task.numberUncompletedAttempts == 0
+ task.inProgressAttempts.remove(taskAttemptId);
+ if (task.inProgressAttempts.size() == 0
&& task.successfulAttempt == null) {
task.addAndScheduleAttempt();
}
} else {
task.handleTaskAttemptCompletion(
- ((TaskTAttemptEvent) event).getTaskAttemptID(),
+ taskAttemptId,
TaskAttemptCompletionEventStatus.TIPFAILED);
- TaskTAttemptEvent ev = (TaskTAttemptEvent) event;
- TaskAttemptId taId = ev.getTaskAttemptID();
if (task.historyTaskStartGenerated) {
TaskFailedEvent taskFailedEvent = createTaskFailedEvent(task, attempt.getDiagnostics(),
- TaskStateInternal.FAILED, taId);
+ TaskStateInternal.FAILED, taskAttemptId);
task.eventHandler.handle(new JobHistoryEvent(task.taskId.getJobId(),
taskFailedEvent));
} else {
@@ -927,14 +966,12 @@ public abstract class TaskImpl implement
@Override
public TaskStateInternal transition(TaskImpl task, TaskEvent event) {
- if (event instanceof TaskTAttemptEvent) {
- TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
- if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
- !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
- // don't allow a different task attempt to override a previous
- // succeeded state
- return TaskStateInternal.SUCCEEDED;
- }
+ TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
+ if (task.getInternalState() == TaskStateInternal.SUCCEEDED &&
+ !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) {
+ // don't allow a different task attempt to override a previous
+ // succeeded state
+ return TaskStateInternal.SUCCEEDED;
}
// a successful REDUCE task should not be overridden
@@ -953,7 +990,7 @@ public abstract class TaskImpl implement
// believe that there's no redundancy.
unSucceed(task);
// fake increase in Uncomplete attempts for super.transition
- ++task.numberUncompletedAttempts;
+ task.inProgressAttempts.add(castEvent.getTaskAttemptID());
return super.transition(task, event);
}
@@ -1045,7 +1082,7 @@ public abstract class TaskImpl implement
(attempt, "Task KILL is received. Killing attempt!");
}
- task.numberUncompletedAttempts = 0;
+ task.inProgressAttempts.clear();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java?rev=1408316&r1=1408315&r2=1408316&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java Mon Nov 12 15:19:41 2012
@@ -54,6 +54,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttemptStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
@@ -63,6 +64,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskImpl;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
@@ -243,6 +245,39 @@ public class MRApp extends MRAppMaster {
return job;
}
+ public void waitForInternalState(JobImpl job,
+ JobStateInternal finalState) throws Exception {
+ int timeoutSecs = 0;
+ JobStateInternal iState = job.getInternalState();
+ while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+ System.out.println("Job Internal State is : " + iState
+ + " Waiting for Internal state : " + finalState);
+ Thread.sleep(500);
+ iState = job.getInternalState();
+ }
+ System.out.println("Task Internal State is : " + iState);
+ Assert.assertEquals("Task Internal state is not correct (timedout)",
+ finalState, iState);
+ }
+
+ public void waitForInternalState(TaskImpl task,
+ TaskStateInternal finalState) throws Exception {
+ int timeoutSecs = 0;
+ TaskReport report = task.getReport();
+ TaskStateInternal iState = task.getInternalState();
+ while (!finalState.equals(iState) && timeoutSecs++ < 20) {
+ System.out.println("Task Internal State is : " + iState
+ + " Waiting for Internal state : " + finalState + " progress : "
+ + report.getProgress());
+ Thread.sleep(500);
+ report = task.getReport();
+ iState = task.getInternalState();
+ }
+ System.out.println("Task Internal State is : " + iState);
+ Assert.assertEquals("Task Internal state is not correct (timedout)",
+ finalState, iState);
+ }
+
public void waitForInternalState(TaskAttemptImpl attempt,
TaskAttemptStateInternal finalState) throws Exception {
int timeoutSecs = 0;
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java?rev=1408316&r1=1408315&r2=1408316&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestKill.java Mon Nov 12 15:19:41 2012
@@ -25,12 +25,15 @@ import java.util.concurrent.CountDownLat
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
+import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
@@ -39,12 +42,18 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.junit.Test;
/**
* Tests the state machine with respect to Job/Task/TaskAttempt kill scenarios.
*
*/
+@SuppressWarnings({"unchecked", "rawtypes"})
public class TestKill {
@Test
@@ -132,6 +141,80 @@ public class TestKill {
}
@Test
+ public void testKillTaskWait() throws Exception {
+ final Dispatcher dispatcher = new AsyncDispatcher() {
+ private TaskAttemptEvent cachedKillEvent;
+ @Override
+ protected void dispatch(Event event) {
+ if (event instanceof TaskAttemptEvent) {
+ TaskAttemptEvent killEvent = (TaskAttemptEvent) event;
+ if (killEvent.getType() == TaskAttemptEventType.TA_KILL) {
+ TaskAttemptId taID = killEvent.getTaskAttemptID();
+ if (taID.getTaskId().getTaskType() == TaskType.REDUCE
+ && taID.getTaskId().getId() == 0 && taID.getId() == 0) {
+ // Task is asking the reduce TA to kill itself. 'Create' a race
+ // condition. Make the task succeed and then inform the task that
+ // TA has succeeded. Once Task gets the TA succeeded event at
+ // KILL_WAIT, then relay the actual kill signal to TA
+ super.dispatch(new TaskAttemptEvent(taID,
+ TaskAttemptEventType.TA_DONE));
+ super.dispatch(new TaskAttemptEvent(taID,
+ TaskAttemptEventType.TA_CONTAINER_CLEANED));
+ super.dispatch(new TaskTAttemptEvent(taID,
+ TaskEventType.T_ATTEMPT_SUCCEEDED));
+ this.cachedKillEvent = killEvent;
+ return;
+ }
+ }
+ } else if (event instanceof TaskEvent) {
+ TaskEvent taskEvent = (TaskEvent) event;
+ if (taskEvent.getType() == TaskEventType.T_ATTEMPT_SUCCEEDED
+ && this.cachedKillEvent != null) {
+ // When the TA comes and reports that it is done, send the
+ // cachedKillEvent
+ super.dispatch(this.cachedKillEvent);
+ return;
+ }
+
+ }
+ super.dispatch(event);
+ }
+ };
+ MRApp app = new MRApp(1, 1, false, this.getClass().getName(), true) {
+ @Override
+ public Dispatcher createDispatcher() {
+ return dispatcher;
+ }
+ };
+ Job job = app.submit(new Configuration());
+ JobId jobId = app.getJobId();
+ app.waitForState(job, JobState.RUNNING);
+ Assert.assertEquals("Num tasks not correct", 2, job.getTasks().size());
+ Iterator<Task> it = job.getTasks().values().iterator();
+ Task mapTask = it.next();
+ Task reduceTask = it.next();
+ app.waitForState(mapTask, TaskState.RUNNING);
+ app.waitForState(reduceTask, TaskState.RUNNING);
+ TaskAttempt mapAttempt = mapTask.getAttempts().values().iterator().next();
+ app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
+ TaskAttempt reduceAttempt = reduceTask.getAttempts().values().iterator().next();
+ app.waitForState(reduceAttempt, TaskAttemptState.RUNNING);
+
+ // Finish map
+ app.getContext().getEventHandler().handle(
+ new TaskAttemptEvent(
+ mapAttempt.getID(),
+ TaskAttemptEventType.TA_DONE));
+ app.waitForState(mapTask, TaskState.SUCCEEDED);
+
+ // Now kill the job
+ app.getContext().getEventHandler()
+ .handle(new JobEvent(jobId, JobEventType.JOB_KILL));
+
+ app.waitForInternalState((JobImpl)job, JobStateInternal.KILLED);
+ }
+
+ @Test
public void testKillTaskAttempt() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
MRApp app = new BlockingMRApp(2, 0, latch);