You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/10 00:19:07 UTC
svn commit: r1407689 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/
hadoop-mapreduce-client/hadoop-mapreduce-client-ap...
Author: bobby
Date: Fri Nov 9 23:19:07 2012
New Revision: 1407689
URL: http://svn.apache.org/viewvc?rev=1407689&view=rev
Log:
svn merge -c 1407679 FIXES: MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED state (jlowe via bobby)
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1407689&r1=1407688&r2=1407689&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Nov 9 23:19:07 2012
@@ -83,6 +83,9 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit
(Mark Fuhs via bobby)
+
+ MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
+ state (jlowe via bobby)
Release 0.23.4 - UNRELEASED
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1407689&r1=1407688&r2=1407689&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Nov 9 23:19:07 2012
@@ -323,6 +323,9 @@ public class JobImpl implements org.apac
// Ignore-able events
.addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
EnumSet.of(JobEventType.JOB_KILL,
+ JobEventType.JOB_TASK_COMPLETED,
+ JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+ JobEventType.JOB_MAP_TASK_RESCHEDULED,
JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
// Transitions from KILLED state
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1407689&r1=1407688&r2=1407689&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Nov 9 23:19:07 2012
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.jobhi
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -51,10 +53,14 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
@@ -340,7 +346,7 @@ public class TestJobImpl {
return isUber;
}
- private InitTransition getInitTransition() {
+ private static InitTransition getInitTransition() {
InitTransition initTransition = new InitTransition() {
@Override
protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
@@ -350,4 +356,63 @@ public class TestJobImpl {
};
return initTransition;
}
+
+ @Test
+ public void testTransitionsAtFailed() throws IOException {
+ Configuration conf = new Configuration();
+ JobID jobID = JobID.forName("job_1234567890000_0001");
+ JobId jobId = TypeConverter.toYarn(jobID);
+ OutputCommitter committer = mock(OutputCommitter.class);
+ doThrow(new IOException("forcefail"))
+ .when(committer).setupJob(any(JobContext.class));
+ InlineDispatcher dispatcher = new InlineDispatcher();
+ JobImpl job = new StubbedJob(jobId, Records
+ .newRecord(ApplicationAttemptId.class), conf,
+ dispatcher.getEventHandler(), committer, true, null);
+
+ dispatcher.register(JobEventType.class, job);
+ job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+ Assert.assertEquals(JobState.FAILED, job.getState());
+
+ job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
+ Assert.assertEquals(JobState.FAILED, job.getState());
+ job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
+ Assert.assertEquals(JobState.FAILED, job.getState());
+ job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
+ Assert.assertEquals(JobState.FAILED, job.getState());
+ job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
+ Assert.assertEquals(JobState.FAILED, job.getState());
+ }
+
+ private static class StubbedJob extends JobImpl {
+ //override the init transition
+ private final InitTransition initTransition = getInitTransition();
+ StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+ = stateMachineFactory.addTransition(JobStateInternal.NEW,
+ EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+ JobEventType.JOB_INIT,
+ // This is abusive.
+ initTransition);
+
+ private final StateMachine<JobStateInternal, JobEventType, JobEvent>
+ localStateMachine;
+
+ @Override
+ protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
+ return localStateMachine;
+ }
+
+ public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+ Configuration conf, EventHandler eventHandler,
+ OutputCommitter committer, boolean newApiCommitter, String user) {
+ super(jobId, applicationAttemptId, conf, eventHandler,
+ null, new JobTokenSecretManager(), new Credentials(),
+ new SystemClock(), null, MRAppMetrics.create(), committer,
+ newApiCommitter, user, System.currentTimeMillis(), null, null);
+
+ // This "this leak" is okay because the retained pointer is in an
+ // instance variable.
+ localStateMachine = localFactory.make(this);
+ }
+ }
}