You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2014/03/25 03:01:13 UTC

svn commit: r1581181 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/ma...

Author: vinodkv
Date: Tue Mar 25 02:01:13 2014
New Revision: 1581181

URL: http://svn.apache.org/r1581181
Log:
MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it recovers from a commit during a previous attempt. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1581180 ../../trunk/

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Mar 25 02:01:13 2014
@@ -121,6 +121,9 @@ Release 2.4.0 - UNRELEASED
     FadviseFileRegion::transferTo does not read disks efficiently.
     (Nikola Vujic via cnauroth)
 
+    MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it
+    recovers from a commit during a previous attempt. (Xuan Gong via vinodkv)
+
 Release 2.3.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Mar 25 02:01:13 2014
@@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlocki
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -49,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
 import org.apache.hadoop.mapreduce.v2.api.records.JobState;
 import org.apache.hadoop.mapreduce.v2.app.AppContext;
 import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
 import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
@@ -348,7 +350,9 @@ public class JobHistoryEventHandler exte
           JobUnsuccessfulCompletionEvent jucEvent =
             new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
                 System.currentTimeMillis(), job.getCompletedMaps(),
-                job.getCompletedReduces(), JobState.KILLED.toString(),
+                job.getCompletedReduces(),
+                createJobStateForJobUnsuccessfulCompletionEvent(
+                    mi.getForcedJobStateOnShutDown()),
                 job.getDiagnostics());
           JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
           //Bypass the queue mechanism which might wait. Call the method directly
@@ -381,9 +385,10 @@ public class JobHistoryEventHandler exte
    * This should be the first call to history for a job
    * 
    * @param jobId the jobId.
+   * @param forcedJobStateOnShutDown
    * @throws IOException
    */
-  protected void setupEventWriter(JobId jobId)
+  protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
       throws IOException {
     if (stagingDirPath == null) {
       LOG.error("Log Directory is null, returning");
@@ -438,7 +443,7 @@ public class JobHistoryEventHandler exte
     }
 
     MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
-        user, jobName, jobId);
+        user, jobName, jobId, forcedJobStateOnShutDown);
     fi.getJobSummary().setJobId(jobId);
     fileMap.put(jobId, fi);
   }
@@ -481,13 +486,17 @@ public class JobHistoryEventHandler exte
     return false;
   }
 
-  protected void handleEvent(JobHistoryEvent event) {
+  @Private
+  public void handleEvent(JobHistoryEvent event) {
     synchronized (lock) {
 
       // If this is JobSubmitted Event, setup the writer
       if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
         try {
-          setupEventWriter(event.getJobID());
+          AMStartedEvent amStartedEvent =
+              (AMStartedEvent) event.getHistoryEvent();
+          setupEventWriter(event.getJobID(),
+              amStartedEvent.getForcedJobStateOnShutDown());
         } catch (IOException ioe) {
           LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
               ioe);
@@ -804,9 +813,10 @@ public class JobHistoryEventHandler exte
     Timer flushTimer; 
     FlushTimerTask flushTimerTask;
     private boolean isTimerShutDown = false;
+    private String forcedJobStateOnShutDown;
 
     MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
-        String jobName, JobId jobId) {
+        String jobName, JobId jobId, String forcedJobStateOnShutDown) {
       this.historyFile = historyFile;
       this.confFile = conf;
       this.writer = writer;
@@ -814,6 +824,7 @@ public class JobHistoryEventHandler exte
           new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
       this.jobSummary = new JobSummary();
       this.flushTimer = new Timer("FlushTimer", true);
+      this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
     }
 
     Path getHistoryFile() {
@@ -840,6 +851,10 @@ public class JobHistoryEventHandler exte
       return isTimerShutDown;
     }
 
+    String getForcedJobStateOnShutDown() {
+      return forcedJobStateOnShutDown;
+    }
+
     @Override
     public String toString() {
       return "Job MetaInfo for "+ jobSummary.getJobId()
@@ -983,4 +998,20 @@ public class JobHistoryEventHandler exte
     LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
       + forceJobCompletion);
   }
+
+  private String createJobStateForJobUnsuccessfulCompletionEvent(
+      String forcedJobStateOnShutDown) {
+    if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown
+        .isEmpty()) {
+      return JobState.KILLED.toString();
+    } else if (forcedJobStateOnShutDown.equals(
+        JobStateInternal.ERROR.toString()) ||
+        forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) {
+      return JobState.FAILED.toString();
+    } else if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED
+        .toString())) {
+      return JobState.SUCCEEDED.toString();
+    }
+    return JobState.KILLED.toString();
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Mar 25 02:01:13 2014
@@ -1012,14 +1012,13 @@ public class MRAppMaster extends Composi
     AMInfo amInfo =
         MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
             nmPort, nmHttpPort);
-    amInfos.add(amInfo);
 
     // /////////////////// Create the job itself.
     job = createJob(getConfig(), forcedState, shutDownMessage);
 
     // End of creating the job.
 
-    // Send out an MR AM inited event for this AM and all previous AMs.
+    // Send out an MR AM inited event for all previous AMs.
     for (AMInfo info : amInfos) {
       dispatcher.getEventHandler().handle(
           new JobHistoryEvent(job.getID(), new AMStartedEvent(info
@@ -1028,6 +1027,15 @@ public class MRAppMaster extends Composi
                   .getNodeManagerHttpPort())));
     }
 
+    // Send out an MR AM inited event for this AM.
+    dispatcher.getEventHandler().handle(
+        new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
+            .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
+            amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
+                .getNodeManagerHttpPort(), this.forcedState == null ? null
+                    : this.forcedState.toString())));
+    amInfos.add(amInfo);
+
     // metrics system init is really init & start.
     // It's more test friendly to put it here.
     DefaultMetricsSystem.initialize("MRAppMaster");

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Tue Mar 25 02:01:13 2014
@@ -497,7 +497,7 @@ class JHEventHandlerForSigtermTest exten
   JobHistoryEvent lastEventHandled;
   int eventsHandled = 0;
   @Override
-  protected void handleEvent(JobHistoryEvent event) {
+  public void handleEvent(JobHistoryEvent event) {
     this.lastEventHandled = event;
     this.eventsHandled++;
   }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Tue Mar 25 02:01:13 2014
@@ -21,7 +21,8 @@ import static org.junit.Assert.assertEqu
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
-
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -44,6 +45,10 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
 import org.apache.hadoop.mapreduce.v2.api.records.JobId;
 import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
 import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@@ -70,6 +75,8 @@ import org.apache.log4j.Logger;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
 
 public class TestMRAppMaster {
   private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
@@ -120,7 +127,7 @@ public class TestMRAppMaster {
     assertEquals(userStagingPath.toString(),
       appMaster.stagingDirPath.toString());
   }
-  
+
   @Test
   public void testMRAppMasterMidLock() throws IOException,
       InterruptedException {
@@ -154,6 +161,9 @@ public class TestMRAppMaster {
     assertTrue(appMaster.errorHappenedShutDown);
     assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
     appMaster.stop();
+
+    // verify the final status is FAILED
+    verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
   }
   
   @Test
@@ -190,6 +200,9 @@ public class TestMRAppMaster {
     assertTrue(appMaster.errorHappenedShutDown);
     assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
     appMaster.stop();
+
+    // verify the final status is SUCCEEDED
+    verifyFailedStatus((MRAppMasterTest)appMaster, "SUCCEEDED");
   }
   
   @Test
@@ -226,6 +239,9 @@ public class TestMRAppMaster {
     assertTrue(appMaster.errorHappenedShutDown);
     assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
     appMaster.stop();
+
+    // verify the final status is FAILED
+    verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
   }
   
   @Test
@@ -423,8 +439,20 @@ public class TestMRAppMaster {
 
 
   }
-}
 
+  private void verifyFailedStatus(MRAppMasterTest appMaster,
+      String expectedJobState) {
+    ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
+        .forClass(JobHistoryEvent.class);
+    // handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
+    verify(appMaster.spyHistoryService, times(2))
+        .handleEvent(captor.capture());
+    HistoryEvent event = captor.getValue().getHistoryEvent();
+    assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
+    assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
+        , expectedJobState);
+  }
+}
 class MRAppMasterTest extends MRAppMaster {
 
   Path stagingDirPath;
@@ -434,6 +462,7 @@ class MRAppMasterTest extends MRAppMaste
   ContainerAllocator mockContainerAllocator;
   CommitterEventHandler mockCommitterEventHandler;
   RMHeartbeatHandler mockRMHeartbeatHandler;
+  JobHistoryEventHandler spyHistoryService;
 
   public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
       ContainerId containerId, String host, int port, int httpPort,
@@ -502,4 +531,14 @@ class MRAppMasterTest extends MRAppMaste
   public UserGroupInformation getUgi() {
     return currentUser;
   }
+
+  @Override
+  protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+      AppContext context) {
+    spyHistoryService =
+        Mockito.spy((JobHistoryEventHandler) super
+            .createJobHistoryHandler(context));
+    spyHistoryService.setForcejobCompletion(this.isLastAMRetry);
+    return spyHistoryService;
+  }
 }

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java Tue Mar 25 02:01:13 2014
@@ -34,6 +34,7 @@ import org.apache.avro.util.Utf8;
 @InterfaceStability.Unstable
 public class AMStartedEvent implements HistoryEvent {
   private AMStarted datum = new AMStarted();
+  private String forcedJobStateOnShutDown;
 
   /**
    * Create an event to record the start of an MR AppMaster
@@ -54,12 +55,38 @@ public class AMStartedEvent implements H
   public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
       ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
       int nodeManagerHttpPort) {
+    this(appAttemptId, startTime, containerId, nodeManagerHost,
+        nodeManagerPort, nodeManagerHttpPort, null);
+  }
+
+  /**
+   * Create an event to record the start of an MR AppMaster
+   *
+   * @param appAttemptId
+   *          the application attempt id.
+   * @param startTime
+   *          the start time of the AM.
+   * @param containerId
+   *          the containerId of the AM.
+   * @param nodeManagerHost
+   *          the node on which the AM is running.
+   * @param nodeManagerPort
+   *          the port on which the AM is running.
+   * @param nodeManagerHttpPort
+   *          the httpPort for the node running the AM.
+   * @param forcedJobStateOnShutDown
+   *          the state to force the job into
+   */
+  public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
+      ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
+      int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
     datum.applicationAttemptId = new Utf8(appAttemptId.toString());
     datum.startTime = startTime;
     datum.containerId = new Utf8(containerId.toString());
     datum.nodeManagerHost = new Utf8(nodeManagerHost);
     datum.nodeManagerPort = nodeManagerPort;
     datum.nodeManagerHttpPort = nodeManagerHttpPort;
+    this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
   }
 
   AMStartedEvent() {
@@ -116,6 +143,13 @@ public class AMStartedEvent implements H
     return datum.nodeManagerHttpPort;
   }
 
+  /**
+   * @return the state to force the job into
+   */
+  public String getForcedJobStateOnShutDown() {
+    return this.forcedJobStateOnShutDown;
+  }
+
   /** Get the attempt id */
 
   @Override