You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by bo...@apache.org on 2012/11/09 23:55:58 UTC

svn commit: r1407679 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java...

Author: bobby
Date: Fri Nov  9 22:55:57 2012
New Revision: 1407679

URL: http://svn.apache.org/viewvc?rev=1407679&view=rev
Log:
MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED state (jlowe via bobby)

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1407679&r1=1407678&r2=1407679&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Nov  9 22:55:57 2012
@@ -647,6 +647,9 @@ Release 0.23.5 - UNRELEASED
 
     MAPREDUCE-4782. NLineInputFormat skips first line of last InputSplit 
     (Mark Fuhs via bobby)
+
+    MAPREDUCE-4774. JobImpl does not handle asynchronous task events in FAILED
+    state (jlowe via bobby)
  
 Release 0.23.4 - UNRELEASED
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1407679&r1=1407678&r2=1407679&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Fri Nov  9 22:55:57 2012
@@ -348,6 +348,9 @@ public class JobImpl implements org.apac
           .addTransition(JobStateInternal.FAILED, JobStateInternal.FAILED,
               EnumSet.of(JobEventType.JOB_KILL, 
                   JobEventType.JOB_UPDATED_NODES,
+                  JobEventType.JOB_TASK_COMPLETED,
+                  JobEventType.JOB_TASK_ATTEMPT_COMPLETED,
+                  JobEventType.JOB_MAP_TASK_RESCHEDULED,
                   JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE))
 
           // Transitions from KILLED state

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1407679&r1=1407678&r2=1407679&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Fri Nov  9 22:55:57 2012
@@ -27,6 +27,7 @@ import static org.mockito.Mockito.verify
 import static org.mockito.Mockito.when;
 
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -42,6 +43,7 @@ import org.apache.hadoop.mapreduce.jobhi
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
@@ -51,10 +53,14 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.InitTransition;
 import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
 import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.SystemClock;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.InlineDispatcher;
+import org.apache.hadoop.yarn.state.StateMachine;
+import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
@@ -340,7 +346,7 @@ public class TestJobImpl {
     return isUber;
   }
 
-  private InitTransition getInitTransition() {
+  private static InitTransition getInitTransition() {
     InitTransition initTransition = new InitTransition() {
       @Override
       protected TaskSplitMetaInfo[] createSplits(JobImpl job, JobId jobId) {
@@ -350,4 +356,63 @@ public class TestJobImpl {
     };
     return initTransition;
   }
+
+  @Test
+  public void testTransitionsAtFailed() throws IOException {
+    Configuration conf = new Configuration();
+    JobID jobID = JobID.forName("job_1234567890000_0001");
+    JobId jobId = TypeConverter.toYarn(jobID);
+    OutputCommitter committer = mock(OutputCommitter.class);
+    doThrow(new IOException("forcefail"))
+      .when(committer).setupJob(any(JobContext.class));
+    InlineDispatcher dispatcher = new InlineDispatcher();
+    JobImpl job = new StubbedJob(jobId, Records
+        .newRecord(ApplicationAttemptId.class), conf,
+        dispatcher.getEventHandler(), committer, true, null);
+
+    dispatcher.register(JobEventType.class, job);
+    job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+
+    job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_COMPLETED));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+    job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_COMPLETED));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+    job.handle(new JobEvent(jobId, JobEventType.JOB_MAP_TASK_RESCHEDULED));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+    job.handle(new JobEvent(jobId, JobEventType.JOB_TASK_ATTEMPT_FETCH_FAILURE));
+    Assert.assertEquals(JobState.FAILED, job.getState());
+  }
+
+  private static class StubbedJob extends JobImpl {
+    //override the init transition
+    private final InitTransition initTransition = getInitTransition();
+    StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent> localFactory
+        = stateMachineFactory.addTransition(JobStateInternal.NEW,
+            EnumSet.of(JobStateInternal.INITED, JobStateInternal.FAILED),
+            JobEventType.JOB_INIT,
+            // This is abusive.
+            initTransition);
+
+    private final StateMachine<JobStateInternal, JobEventType, JobEvent>
+        localStateMachine;
+
+    @Override
+    protected StateMachine<JobStateInternal, JobEventType, JobEvent> getStateMachine() {
+      return localStateMachine;
+    }
+
+    public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
+        Configuration conf, EventHandler eventHandler,
+        OutputCommitter committer, boolean newApiCommitter, String user) {
+      super(jobId, applicationAttemptId, conf, eventHandler,
+          null, new JobTokenSecretManager(), new Credentials(),
+          new SystemClock(), null, MRAppMetrics.create(), committer,
+          newApiCommitter, user, System.currentTimeMillis(), null, null);
+
+      // This "this leak" is okay because the retained pointer is in an
+      //  instance variable.
+      localStateMachine = localFactory.make(this);
+    }
+  }
 }