You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/08/02 02:22:59 UTC
svn commit: r1509494 - in
/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/
hadoop-mapreduce-client/hadoop-mapreduce-client-app...
Author: jlowe
Date: Fri Aug 2 00:22:58 2013
New Revision: 1509494
URL: http://svn.apache.org/r1509494
Log:
svn merge -c 1506154 FIXES: MAPREDUCE-5317. Stale files left behind for failed jobs. Contributed by Ravi Prakash
Modified:
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1509494&r1=1509493&r2=1509494&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt Fri Aug 2 00:22:58 2013
@@ -24,6 +24,9 @@ Release 2.1.1-beta - UNRELEASED
MAPREDUCE-5251. Reducer should not implicate map attempt if it has
insufficient space to fetch map output (Ashwin Shankar via jlowe)
+ MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
+ jlowe)
+
Release 2.1.0-beta - 2013-08-06
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1509494&r1=1509493&r2=1509494&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Fri Aug 2 00:22:58 2013
@@ -25,6 +25,7 @@ public enum JobStateInternal {
RUNNING,
COMMITTING,
SUCCEEDED,
+ FAIL_WAIT,
FAIL_ABORT,
FAILED,
KILL_WAIT,
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1509494&r1=1509493&r2=1509494&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Fri Aug 2 00:22:58 2013
@@ -44,6 +44,7 @@ public enum JobEventType {
//Producer:Job
JOB_COMPLETED,
+ JOB_FAIL_WAIT_TIMEDOUT,
//Producer:Any component
JOB_DIAGNOSTIC_UPDATE,
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1509494&r1=1509493&r2=1509494&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Aug 2 00:22:58 2013
@@ -30,6 +30,9 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -313,7 +316,8 @@ public class JobImpl implements org.apac
.addTransition
(JobStateInternal.RUNNING,
EnumSet.of(JobStateInternal.RUNNING,
- JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT),
+ JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT,
+ JobStateInternal.FAIL_ABORT),
JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition())
.addTransition
@@ -422,7 +426,37 @@ public class JobImpl implements org.apac
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_AM_REBOOT))
- // Transitions from FAIL_ABORT state
+ // Transitions from FAIL_WAIT state
+ .addTransition(JobStateInternal.FAIL_WAIT,
+ JobStateInternal.FAIL_WAIT,
+ JobEventType.JOB_DIAGNOSTIC_UPDATE,
+ DIAGNOSTIC_UPDATE_TRANSITION)
+ .addTransition(JobStateInternal.FAIL_WAIT,
+ JobStateInternal.FAIL_WAIT,
+ JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
+ .addTransition(JobStateInternal.FAIL_WAIT,
+ EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT),
+ JobEventType.JOB_TASK_COMPLETED,
+ new JobFailWaitTransition())
+ .addTransition(JobStateInternal.FAIL_WAIT,
+ JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT,
+ new JobFailWaitTimedOutTransition())
+ .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED,
+ JobEventType.JOB_KILL,
+ new KilledDuringAbortTransition())
+ .addTransition(JobStateInternal.FAIL_WAIT,
+ JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
+ INTERNAL_ERROR_TRANSITION)
+ // Ignore-able events
+ .addTransition(JobStateInternal.FAIL_WAIT,
+ JobStateInternal.FAIL_WAIT,
+ EnumSet.of(JobEventType.JOB_UPDATED_NODES,
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+ JobEventType.JOB_MAP_TASK_RESCHEDULED,
+ JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
+ JobEventType.JOB_AM_REBOOT))
+
+ //Transitions from FAIL_ABORT state
.addTransition(JobStateInternal.FAIL_ABORT,
JobStateInternal.FAIL_ABORT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -449,7 +483,8 @@ public class JobImpl implements org.apac
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE,
JobEventType.JOB_COMMIT_COMPLETED,
JobEventType.JOB_COMMIT_FAILED,
- JobEventType.JOB_AM_REBOOT))
+ JobEventType.JOB_AM_REBOOT,
+ JobEventType.JOB_FAIL_WAIT_TIMEDOUT))
// Transitions from KILL_ABORT state
.addTransition(JobStateInternal.KILL_ABORT,
@@ -600,6 +635,10 @@ public class JobImpl implements org.apac
private JobStateInternal forcedState = null;
+ //Executor used for running future tasks. Setting thread pool size to 1
+ private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ private ScheduledFuture failWaitTriggerScheduledFuture;
+
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
@@ -960,6 +999,7 @@ public class JobImpl implements org.apac
case SETUP:
case COMMITTING:
return JobState.RUNNING;
+ case FAIL_WAIT:
case FAIL_ABORT:
return JobState.FAILED;
case REBOOT:
@@ -1563,7 +1603,43 @@ public class JobImpl implements org.apac
job.unsuccessfulFinish(finalState);
}
}
-
+
+ //This transition happens when a job is to be failed. It waits for all the
+ //tasks to finish / be killed.
+ private static class JobFailWaitTransition
+ implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
+ @Override
+ public JobStateInternal transition(JobImpl job, JobEvent event) {
+ if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
+ for(Task task: job.tasks.values()) {
+ if(!task.isFinished()) {
+ return JobStateInternal.FAIL_WAIT;
+ }
+ }
+ }
+ //Finished waiting. All tasks finished / were killed
+ job.failWaitTriggerScheduledFuture.cancel(false);
+ job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+ job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+ return JobStateInternal.FAIL_ABORT;
+ }
+ }
+
+ //This transition happens when a job to be failed times out while waiting on
+ //tasks that had been sent the KILL signal. It is triggered by a
+ //ScheduledFuture task queued in the executor.
+ private static class JobFailWaitTimedOutTransition
+ implements SingleArcTransition<JobImpl, JobEvent> {
+ @Override
+ public void transition(JobImpl job, JobEvent event) {
+ LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
+ + " Going to fail job anyway");
+ job.failWaitTriggerScheduledFuture.cancel(false);
+ job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+ job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+ }
+ }
+
// JobFinishedEvent triggers the move of the history file out of the staging
// area. May need to create a new event type for this if JobFinished should
// not be generated for KilledJobs, etc.
@@ -1796,6 +1872,23 @@ public class JobImpl implements org.apac
return checkJobAfterTaskCompletion(job);
}
+ //This class is used to queue a ScheduledFuture to send an event to a job
+ //after some delay. This can be used to wait for maximum amount of time
+ //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for
+ //all tasks to be killed.
+ static class TriggerScheduledFuture implements Runnable {
+ JobEvent toSend;
+ JobImpl job;
+ TriggerScheduledFuture(JobImpl job, JobEvent toSend) {
+ this.toSend = toSend;
+ this.job = job;
+ }
+ public void run() {
+ LOG.info("Sending event " + toSend + " to " + job.getID());
+ job.getEventHandler().handle(toSend);
+ }
+ }
+
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
//check for Job failure
if (job.failedMapTaskCount*100 >
@@ -1809,10 +1902,33 @@ public class JobImpl implements org.apac
" failedReduces:" + job.failedReduceTaskCount;
LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg);
- job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
- job.jobContext,
- org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
- return JobStateInternal.FAIL_ABORT;
+
+ //Send kill signal to all unfinished tasks here.
+ boolean allDone = true;
+ for (Task task : job.tasks.values()) {
+ if(!task.isFinished()) {
+ allDone = false;
+ job.eventHandler.handle(
+ new TaskEvent(task.getID(), TaskEventType.T_KILL));
+ }
+ }
+
+ //If all tasks are already done, we should go directly to FAIL_ABORT
+ if(allDone) {
+ job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+ job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED)
+ );
+ return JobStateInternal.FAIL_ABORT;
+ }
+
+ //Set max timeout to wait for the tasks to get killed
+ job.failWaitTriggerScheduledFuture = job.executor.schedule(
+ new TriggerScheduledFuture(job, new JobEvent(job.getID(),
+ JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt(
+ MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
+ MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS),
+ TimeUnit.MILLISECONDS);
+ return JobStateInternal.FAIL_WAIT;
}
return job.checkReadyForCommit();
Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1509494&r1=1509493&r2=1509494&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Aug 2 00:22:58 2013
@@ -57,13 +57,17 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventHandler;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEventType;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.Task;
+import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobDiagnosticsUpdateEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobFinishEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobStartEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.rm.RMHeartbeatHandler;
@@ -74,7 +78,9 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -84,6 +90,7 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.Mockito;
/**
@@ -323,6 +330,78 @@ public class TestJobImpl {
commitHandler.stop();
}
+ @Test
+ public void testAbortJobCalledAfterKillingTasks() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
+ InlineDispatcher dispatcher = new InlineDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = Mockito.mock(OutputCommitter.class);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+
+ //Fail one task. This should land the JobImpl in the FAIL_WAIT state
+ job.handle(new JobTaskEvent(
+ MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
+ TaskState.FAILED));
+ //Verify abort job hasn't been called
+ Mockito.verify(committer, Mockito.never())
+ .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
+ assertJobState(job, JobStateInternal.FAIL_WAIT);
+
+ //Verify abortJob is called once and the job failed
+ Mockito.verify(committer, Mockito.timeout(2000).times(1))
+ .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
+ assertJobState(job, JobStateInternal.FAILED);
+
+ dispatcher.stop();
+ }
+
+ @Test (timeout=10000)
+ public void testFailAbortDoesntHang() throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+ conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
+
+ DrainDispatcher dispatcher = new DrainDispatcher();
+ dispatcher.init(conf);
+ dispatcher.start();
+ OutputCommitter committer = Mockito.mock(OutputCommitter.class);
+ CommitterEventHandler commitHandler =
+ createCommitterEventHandler(dispatcher, committer);
+ commitHandler.init(conf);
+ commitHandler.start();
+ //Job has only 1 mapper task. No reducers
+ conf.setInt(MRJobConfig.NUM_REDUCES, 0);
+ conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
+ JobImpl job = createRunningStubbedJob(conf, dispatcher, 1);
+
+ //Fail / finish all the tasks. This should land the JobImpl directly in the
+ //FAIL_ABORT state
+ for(Task t: job.tasks.values()) {
+ TaskImpl task = (TaskImpl) t;
+ task.handle(new TaskEvent(task.getID(), TaskEventType.T_SCHEDULE));
+ for(TaskAttempt ta: task.getAttempts().values()) {
+ task.handle(new TaskTAttemptEvent(ta.getID(),
+ TaskEventType.T_ATTEMPT_FAILED));
+ }
+ }
+ assertJobState(job, JobStateInternal.FAIL_ABORT);
+
+ dispatcher.await();
+ //Verify abortJob is called once and the job failed
+ Mockito.verify(committer, Mockito.timeout(2000).times(1))
+ .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
+ assertJobState(job, JobStateInternal.FAILED);
+
+ dispatcher.stop();
+ }
+
@Test(timeout=20000)
public void testKilledDuringFailAbort() throws Exception {
Configuration conf = new Configuration();