You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/08/29 23:31:45 UTC

svn commit: r1518833 - in /hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/ hadoop-mapreduce-client/hadoop-mapreduce-clien...

Author: vinodkv
Date: Thu Aug 29 21:31:45 2013
New Revision: 1518833

URL: http://svn.apache.org/r1518833
Log:
MAPREDUCE-5441. Changed MR AM to return RUNNING state if exiting when RM commands to reboot, so that client can continue to track the overall job. Contributed by Jian He.
svn merge --ignore-ancestry -c 1518821 ../../trunk/

Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
    hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt?rev=1518833&r1=1518832&r2=1518833&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/CHANGES.txt Thu Aug 29 21:31:45 2013
@@ -72,6 +72,10 @@ Release 2.1.1-beta - UNRELEASED
 
     MAPREDUCE-5483. revert MAPREDUCE-5357. (rkanter via tucu)
 
+    MAPREDUCE-5441. Changed MR AM to return RUNNING state if exiting when RM
+    commands to reboot, so that client can continue to track the overall job.
+    (Jian He via vinodkv)
+
 Release 2.1.0-beta - 2013-08-22
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1518833&r1=1518832&r2=1518833&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Thu Aug 29 21:31:45 2013
@@ -993,7 +993,7 @@ public class JobImpl implements org.apac
     }
   }
   
-  private static JobState getExternalState(JobStateInternal smState) {
+  private JobState getExternalState(JobStateInternal smState) {
     switch (smState) {
     case KILL_WAIT:
     case KILL_ABORT:
@@ -1005,7 +1005,13 @@ public class JobImpl implements org.apac
     case FAIL_ABORT:
       return JobState.FAILED;
     case REBOOT:
-      return JobState.ERROR;
+      if (appContext.isLastAMRetry()) {
+        return JobState.ERROR;
+      } else {
+        // In case of not last retry, return the external state as RUNNING since
+        // otherwise JobClient will exit when it polls the AM for job state
+        return JobState.RUNNING;
+      }
     default:
       return JobState.valueOf(smState.name());
     }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java?rev=1518833&r1=1518832&r2=1518833&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java Thu Aug 29 21:31:45 2013
@@ -29,6 +29,7 @@ import java.util.Iterator;
 import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.TypeConverter;
 import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
@@ -41,6 +42,8 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.app.job.Task;
 import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
 import org.apache.hadoop.mapreduce.v2.app.job.event.JobUpdatedNodesEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
@@ -51,12 +54,15 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
 import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Clock;
 import org.junit.Test;
 
 /**
@@ -368,6 +374,47 @@ public class TestMRApp {
     app.waitForState(job, JobState.ERROR);
   }
 
+  @Test
+  public void testJobRebootNotLastRetry() throws Exception {
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true);
+    Job job = app.submit(new Configuration());
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+
+    //send an reboot event
+    app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
+      JobEventType.JOB_AM_REBOOT));
+
+    // return exteranl state as RUNNING since otherwise the JobClient will
+    // prematurely exit.
+    app.waitForState(job, JobState.RUNNING);
+  }
+
+  @Test
+  public void testJobRebootOnLastRetry() throws Exception {
+    // make startCount as 2 since this is last retry which equals to
+    // DEFAULT_MAX_AM_RETRY
+    MRApp app = new MRApp(1, 0, false, this.getClass().getName(), true, 2);
+
+    Configuration conf = new Configuration();
+    Job job = app.submit(conf);
+    app.waitForState(job, JobState.RUNNING);
+    Assert.assertEquals("Num tasks not correct", 1, job.getTasks().size());
+    Iterator<Task> it = job.getTasks().values().iterator();
+    Task task = it.next();
+    app.waitForState(task, TaskState.RUNNING);
+
+    //send an reboot event
+    app.getContext().getEventHandler().handle(new JobEvent(job.getID(),
+      JobEventType.JOB_AM_REBOOT));
+
+    // return exteranl state as ERROR if this is the last retry
+    app.waitForState(job, JobState.ERROR);
+  }
+
   private final class MRAppWithSpiedJob extends MRApp {
     private JobImpl spiedJob;
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java?rev=1518833&r1=1518832&r2=1518833&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestJobImpl.java Thu Aug 29 21:31:45 2013
@@ -142,7 +142,7 @@ public class TestJobImpl {
         "testName", "testNodeName", "\"key2\"=\"value2\" \"key1\"=\"value1\" ",
         "tag1,tag2");
     dispatcher.register(EventType.class, jseHandler);
-    JobImpl job = createStubbedJob(conf, dispatcher, 0);
+    JobImpl job = createStubbedJob(conf, dispatcher, 0, null);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
     job.handle(new JobStartEvent(job.getID()));
@@ -170,7 +170,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
@@ -195,7 +195,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
@@ -239,7 +239,9 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    AppContext mockContext = mock(AppContext.class);
+    when(mockContext.isLastAMRetry()).thenReturn(false);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, mockContext);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -248,6 +250,10 @@ public class TestJobImpl {
 
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
     assertJobState(job, JobStateInternal.REBOOT);
+    // return the external state as RUNNING since otherwise JobClient will
+    // exit when it polls the AM for job state
+    Assert.assertEquals(JobState.RUNNING, job.getState());
+
     dispatcher.stop();
     commitHandler.stop();
   }
@@ -256,6 +262,7 @@ public class TestJobImpl {
   public void testRebootedDuringCommit() throws Exception {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_AM_STAGING_DIR, stagingDir);
+    conf.setInt(MRJobConfig.MR_AM_MAX_ATTEMPTS, 2);
     AsyncDispatcher dispatcher = new AsyncDispatcher();
     dispatcher.init(conf);
     dispatcher.start();
@@ -266,13 +273,18 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    AppContext mockContext = mock(AppContext.class);
+    when(mockContext.isLastAMRetry()).thenReturn(true);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, mockContext);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
     syncBarrier.await();
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_AM_REBOOT));
     assertJobState(job, JobStateInternal.REBOOT);
+    // return the external state as FAILED since this is last retry.
+    Assert.assertEquals(JobState.ERROR, job.getState());
+
     dispatcher.stop();
     commitHandler.stop();
   }
@@ -301,7 +313,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -328,7 +340,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
     completeJobTasks(job);
     assertJobState(job, JobStateInternal.COMMITTING);
 
@@ -352,7 +364,7 @@ public class TestJobImpl {
         createCommitterEventHandler(dispatcher, committer);
     commitHandler.init(conf);
     commitHandler.start();
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 2, null);
 
     //Fail one task. This should land the JobImpl in the FAIL_WAIT state
     job.handle(new JobTaskEvent(
@@ -388,7 +400,7 @@ public class TestJobImpl {
     //Job has only 1 mapper task. No reducers
     conf.setInt(MRJobConfig.NUM_REDUCES, 0);
     conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, 1);
-    JobImpl job = createRunningStubbedJob(conf, dispatcher, 1);
+    JobImpl job = createRunningStubbedJob(conf, dispatcher, 1, null);
 
     //Fail / finish all the tasks. This should land the JobImpl directly in the
     //FAIL_ABORT state
@@ -440,7 +452,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -477,7 +489,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -687,7 +699,7 @@ public class TestJobImpl {
     commitHandler.init(conf);
     commitHandler.start();
 
-    JobImpl job = createStubbedJob(conf, dispatcher, 2);
+    JobImpl job = createStubbedJob(conf, dispatcher, 2, null);
     JobId jobId = job.getID();
     job.handle(new JobEvent(jobId, JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
@@ -735,12 +747,12 @@ public class TestJobImpl {
   }
 
   private static StubbedJob createStubbedJob(Configuration conf,
-      Dispatcher dispatcher, int numSplits) {
+      Dispatcher dispatcher, int numSplits, AppContext appContext) {
     JobID jobID = JobID.forName("job_1234567890000_0001");
     JobId jobId = TypeConverter.toYarn(jobID);
     StubbedJob job = new StubbedJob(jobId,
         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0),
-        conf,dispatcher.getEventHandler(), true, "somebody", numSplits);
+        conf,dispatcher.getEventHandler(), true, "somebody", numSplits, appContext);
     dispatcher.register(JobEventType.class, job);
     EventHandler mockHandler = mock(EventHandler.class);
     dispatcher.register(TaskEventType.class, mockHandler);
@@ -751,8 +763,8 @@ public class TestJobImpl {
   }
 
   private static StubbedJob createRunningStubbedJob(Configuration conf,
-      Dispatcher dispatcher, int numSplits) {
-    StubbedJob job = createStubbedJob(conf, dispatcher, numSplits);
+      Dispatcher dispatcher, int numSplits, AppContext appContext) {
+    StubbedJob job = createStubbedJob(conf, dispatcher, numSplits, appContext);
     job.handle(new JobEvent(job.getID(), JobEventType.JOB_INIT));
     assertJobState(job, JobStateInternal.INITED);
     job.handle(new JobStartEvent(job.getID()));
@@ -880,13 +892,13 @@ public class TestJobImpl {
     }
 
     public StubbedJob(JobId jobId, ApplicationAttemptId applicationAttemptId,
-        Configuration conf, EventHandler eventHandler,
-        boolean newApiCommitter, String user, int numSplits) {
+        Configuration conf, EventHandler eventHandler, boolean newApiCommitter,
+        String user, int numSplits, AppContext appContext) {
       super(jobId, applicationAttemptId, conf, eventHandler,
           null, new JobTokenSecretManager(), new Credentials(),
           new SystemClock(), Collections.<TaskId, TaskInfo> emptyMap(),
           MRAppMetrics.create(), null, newApiCommitter, user,
-          System.currentTimeMillis(), null, null, null, null);
+          System.currentTimeMillis(), null, appContext, null, null);
 
       initTransition = getInitTransition(numSplits);
       localFactory = stateMachineFactory.addTransition(JobStateInternal.NEW,