You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2013/07/16 17:13:51 UTC
svn commit: r1503751 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src...
Author: jlowe
Date: Tue Jul 16 15:13:51 2013
New Revision: 1503751
URL: http://svn.apache.org/r1503751
Log:
Revert change 1503506 for MAPREDUCE-5317. Stale files left behind for failed jobs
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1503751&r1=1503750&r2=1503751&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Tue Jul 16 15:13:51 2013
@@ -15,9 +15,6 @@ Release 0.23.10 - UNRELEASED
MAPREDUCE-3193. FileInputFormat doesn't read files recursively in the
input path dir (Devaraj K via jlowe)
- MAPREDUCE-5317. Stale files left behind for failed jobs (Ravi Prakash via
- jlowe)
-
Release 0.23.9 - 2013-07-08
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java?rev=1503751&r1=1503750&r2=1503751&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/JobStateInternal.java Tue Jul 16 15:13:51 2013
@@ -25,7 +25,6 @@ public enum JobStateInternal {
RUNNING,
COMMITTING,
SUCCEEDED,
- FAIL_WAIT,
FAIL_ABORT,
FAILED,
KILL_WAIT,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java?rev=1503751&r1=1503750&r2=1503751&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/JobEventType.java Tue Jul 16 15:13:51 2013
@@ -44,7 +44,6 @@ public enum JobEventType {
//Producer:Job
JOB_COMPLETED,
- JOB_FAIL_WAIT_TIMEDOUT,
//Producer:Any component
JOB_DIAGNOSTIC_UPDATE,
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1503751&r1=1503750&r2=1503751&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Tue Jul 16 15:13:51 2013
@@ -29,9 +29,6 @@ import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -283,8 +280,7 @@ public class JobImpl implements org.apac
.addTransition
(JobStateInternal.RUNNING,
EnumSet.of(JobStateInternal.RUNNING,
- JobStateInternal.COMMITTING, JobStateInternal.FAIL_WAIT,
- JobStateInternal.FAIL_ABORT),
+ JobStateInternal.COMMITTING, JobStateInternal.FAIL_ABORT),
JobEventType.JOB_TASK_COMPLETED,
new TaskCompletedTransition())
.addTransition
@@ -379,35 +375,7 @@ public class JobImpl implements org.apac
EnumSet.of(JobEventType.JOB_KILL,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
- // Transitions from FAIL_WAIT state
- .addTransition(JobStateInternal.FAIL_WAIT,
- JobStateInternal.FAIL_WAIT,
- JobEventType.JOB_DIAGNOSTIC_UPDATE,
- DIAGNOSTIC_UPDATE_TRANSITION)
- .addTransition(JobStateInternal.FAIL_WAIT,
- JobStateInternal.FAIL_WAIT,
- JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
- .addTransition(JobStateInternal.FAIL_WAIT,
- EnumSet.of(JobStateInternal.FAIL_WAIT, JobStateInternal.FAIL_ABORT),
- JobEventType.JOB_TASK_COMPLETED,
- new JobFailWaitTransition())
- .addTransition(JobStateInternal.FAIL_WAIT,
- JobStateInternal.FAIL_ABORT, JobEventType.JOB_FAIL_WAIT_TIMEDOUT,
- new JobFailWaitTimedOutTransition())
- .addTransition(JobStateInternal.FAIL_WAIT, JobStateInternal.KILLED,
- JobEventType.JOB_KILL,
- new KilledDuringAbortTransition())
- .addTransition(JobStateInternal.FAIL_WAIT,
- JobStateInternal.ERROR, JobEventType.INTERNAL_ERROR,
- INTERNAL_ERROR_TRANSITION)
- // Ignore-able events
- .addTransition(JobStateInternal.FAIL_WAIT,
- JobStateInternal.FAIL_WAIT,
- EnumSet.of(JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
- JobEventType.JOB_MAP_TASK_RESCHEDULED,
- JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
-
- //Transitions from FAIL_ABORT state
+ // Transitions from FAIL_ABORT state
.addTransition(JobStateInternal.FAIL_ABORT,
JobStateInternal.FAIL_ABORT,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
@@ -553,10 +521,6 @@ public class JobImpl implements org.apac
private JobStateInternal forcedState = null;
- //Executor used for running future tasks. Setting thread pool size to 1
- private ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
- private ScheduledFuture failWaitTriggerScheduledFuture;
-
public JobImpl(JobId jobId, ApplicationAttemptId applicationAttemptId,
Configuration conf, EventHandler eventHandler,
TaskAttemptListener taskAttemptListener,
@@ -917,7 +881,6 @@ public class JobImpl implements org.apac
case SETUP:
case COMMITTING:
return JobState.RUNNING;
- case FAIL_WAIT:
case FAIL_ABORT:
return JobState.FAILED;
default:
@@ -1442,43 +1405,7 @@ public class JobImpl implements org.apac
job.unsuccessfulFinish(finalState);
}
}
-
- //This transition happens when a job is to be failed. It waits for all the
- //tasks to finish / be killed.
- private static class JobFailWaitTransition
- implements MultipleArcTransition<JobImpl, JobEvent, JobStateInternal> {
- @Override
- public JobStateInternal transition(JobImpl job, JobEvent event) {
- if(!job.failWaitTriggerScheduledFuture.isCancelled()) {
- for(Task task: job.tasks.values()) {
- if(!task.isFinished()) {
- return JobStateInternal.FAIL_WAIT;
- }
- }
- }
- //Finished waiting. All tasks finished / were killed
- job.failWaitTriggerScheduledFuture.cancel(false);
- job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
- job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
- return JobStateInternal.FAIL_ABORT;
- }
- }
-
- //This transition happens when a job to be failed times out while waiting on
- //tasks that had been sent the KILL signal. It is triggered by a
- //ScheduledFuture task queued in the executor.
- private static class JobFailWaitTimedOutTransition
- implements SingleArcTransition<JobImpl, JobEvent> {
- @Override
- public void transition(JobImpl job, JobEvent event) {
- LOG.info("Timeout expired in FAIL_WAIT waiting for tasks to get killed."
- + " Going to fail job anyway");
- job.failWaitTriggerScheduledFuture.cancel(false);
- job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
- job.jobContext, org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
- }
- }
-
+
// JobFinishedEvent triggers the move of the history file out of the staging
// area. May need to create a new event type for this if JobFinished should
// not be generated for KilledJobs, etc.
@@ -1689,23 +1616,6 @@ public class JobImpl implements org.apac
return checkJobAfterTaskCompletion(job);
}
- //This class is used to queue a ScheduledFuture to send an event to a job
- //after some delay. This can be used to wait for maximum amount of time
- //before proceeding anyway. e.g. When a job is waiting in FAIL_WAIT for
- //all tasks to be killed.
- static class TriggerScheduledFuture implements Runnable {
- JobEvent toSend;
- JobImpl job;
- TriggerScheduledFuture(JobImpl job, JobEvent toSend) {
- this.toSend = toSend;
- this.job = job;
- }
- public void run() {
- LOG.info("Sending event " + toSend + " to " + job.getID());
- job.getEventHandler().handle(toSend);
- }
- }
-
protected JobStateInternal checkJobAfterTaskCompletion(JobImpl job) {
//check for Job failure
if (job.failedMapTaskCount*100 >
@@ -1719,30 +1629,10 @@ public class JobImpl implements org.apac
" failedReduces:" + job.failedReduceTaskCount;
LOG.info(diagnosticMsg);
job.addDiagnostic(diagnosticMsg);
-
- //Send kill signal to all unfinished tasks here.
- boolean allDone = true;
- for (Task task : job.tasks.values()) {
- if(!task.isFinished()) {
- allDone = false;
- job.eventHandler.handle(
- new TaskEvent(task.getID(), TaskEventType.T_KILL));
- }
- }
-
- //If all tasks are already done, we should go directly to FAIL_ABORT
- if(allDone) {
- return JobStateInternal.FAIL_ABORT;
- }
-
- //Set max timeout to wait for the tasks to get killed
- job.failWaitTriggerScheduledFuture = job.executor.schedule(
- new TriggerScheduledFuture(job, new JobEvent(job.getID(),
- JobEventType.JOB_FAIL_WAIT_TIMEDOUT)), job.conf.getInt(
- MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS,
- MRJobConfig.DEFAULT_MR_AM_COMMITTER_CANCEL_TIMEOUT_MS),
- TimeUnit.MILLISECONDS);
- return JobStateInternal.FAIL_WAIT;
+ job.eventHandler.handle(new CommitterJobAbortEvent(job.jobId,
+ job.jobContext,
+ org.apache.hadoop.mapreduce.JobStatus.State.FAILED));
+ return JobStateInternal.FAIL_ABORT;
}
return job.checkReadyForCommit();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1503751&r1=1503750&r2=1503751&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Tue Jul 16 15:13:51 2013
@@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
import org.apache.hadoop.yarn.state.StateMachine;
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -80,7 +79,6 @@ import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.mockito.Mockito;
/**
@@ -241,39 +239,6 @@ public class TestJobImpl {
commitHandler.stop();
}
- @Test
- public void testAbortJobCalledAfterKillingTasks() throws IOException,
- InterruptedException {
- Configuration conf = new Configuration();
- conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
- conf.set(MRJobConfig.MR_AM_COMMITTER_CANCEL_TIMEOUT_MS, "1000");
- InlineDispatcher dispatcher = new InlineDispatcher();
- dispatcher.init(conf);
- dispatcher.start();
- OutputCommitter committer = Mockito.mock(OutputCommitter.class);
- CommitterEventHandler commitHandler =
- createCommitterEventHandler(dispatcher, committer);
- commitHandler.init(conf);
- commitHandler.start();
- JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
-
- //Fail one task. This should land the JobImpl in the FAIL_WAIT state
- job.handle(new JobTaskEvent(
- MRBuilderUtils.newTaskId(job.getID(), 1, TaskType.MAP),
- TaskState.FAILED));
- //Verify abort job hasn't been called
- Mockito.verify(committer, Mockito.never())
- .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
- assertJobState(job, JobStateInternal.FAIL_WAIT);
-
- //Verify abortJob is called once and the job failed
- Mockito.verify(committer, Mockito.timeout(2000).times(1))
- .abortJob((JobContext) Mockito.any(), (State) Mockito.any());
- assertJobState(job, JobStateInternal.FAILED);
-
- dispatcher.stop();
- }
-
@Test(timeout=20000)
public void testKilledDuringFailAbort() throws Exception {
Configuration conf = new Configuration();