You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2014/03/25 03:01:13 UTC
svn commit: r1581181 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/ma...
Author: vinodkv
Date: Tue Mar 25 02:01:13 2014
New Revision: 1581181
URL: http://svn.apache.org/r1581181
Log:
MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it recovers from a commit during a previous attempt. Contributed by Xuan Gong.
svn merge --ignore-ancestry -c 1581180 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Tue Mar 25 02:01:13 2014
@@ -121,6 +121,9 @@ Release 2.4.0 - UNRELEASED
FadviseFileRegion::transferTo does not read disks efficiently.
(Nikola Vujic via cnauroth)
+ MAPREDUCE-5795. Fixed MRAppMaster to record the correct job-state after it
+ recovers from a commit during a previous attempt. (Xuan Gong via vinodkv)
+
Release 2.3.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Tue Mar 25 02:01:13 2014
@@ -31,6 +31,7 @@ import java.util.concurrent.LinkedBlocki
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@@ -49,6 +50,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
import org.apache.hadoop.mapreduce.v2.jobhistory.FileNameIndexUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils;
import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
@@ -348,7 +350,9 @@ public class JobHistoryEventHandler exte
JobUnsuccessfulCompletionEvent jucEvent =
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
System.currentTimeMillis(), job.getCompletedMaps(),
- job.getCompletedReduces(), JobState.KILLED.toString(),
+ job.getCompletedReduces(),
+ createJobStateForJobUnsuccessfulCompletionEvent(
+ mi.getForcedJobStateOnShutDown()),
job.getDiagnostics());
JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
//Bypass the queue mechanism which might wait. Call the method directly
@@ -381,9 +385,10 @@ public class JobHistoryEventHandler exte
* This should be the first call to history for a job
*
* @param jobId the jobId.
+ * @param forcedJobStateOnShutDown
* @throws IOException
*/
- protected void setupEventWriter(JobId jobId)
+ protected void setupEventWriter(JobId jobId, String forcedJobStateOnShutDown)
throws IOException {
if (stagingDirPath == null) {
LOG.error("Log Directory is null, returning");
@@ -438,7 +443,7 @@ public class JobHistoryEventHandler exte
}
MetaInfo fi = new MetaInfo(historyFile, logDirConfPath, writer,
- user, jobName, jobId);
+ user, jobName, jobId, forcedJobStateOnShutDown);
fi.getJobSummary().setJobId(jobId);
fileMap.put(jobId, fi);
}
@@ -481,13 +486,17 @@ public class JobHistoryEventHandler exte
return false;
}
- protected void handleEvent(JobHistoryEvent event) {
+ @Private
+ public void handleEvent(JobHistoryEvent event) {
synchronized (lock) {
// If this is JobSubmitted Event, setup the writer
if (event.getHistoryEvent().getEventType() == EventType.AM_STARTED) {
try {
- setupEventWriter(event.getJobID());
+ AMStartedEvent amStartedEvent =
+ (AMStartedEvent) event.getHistoryEvent();
+ setupEventWriter(event.getJobID(),
+ amStartedEvent.getForcedJobStateOnShutDown());
} catch (IOException ioe) {
LOG.error("Error JobHistoryEventHandler in handleEvent: " + event,
ioe);
@@ -804,9 +813,10 @@ public class JobHistoryEventHandler exte
Timer flushTimer;
FlushTimerTask flushTimerTask;
private boolean isTimerShutDown = false;
+ private String forcedJobStateOnShutDown;
MetaInfo(Path historyFile, Path conf, EventWriter writer, String user,
- String jobName, JobId jobId) {
+ String jobName, JobId jobId, String forcedJobStateOnShutDown) {
this.historyFile = historyFile;
this.confFile = conf;
this.writer = writer;
@@ -814,6 +824,7 @@ public class JobHistoryEventHandler exte
new JobIndexInfo(-1, -1, user, jobName, jobId, -1, -1, null);
this.jobSummary = new JobSummary();
this.flushTimer = new Timer("FlushTimer", true);
+ this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
}
Path getHistoryFile() {
@@ -840,6 +851,10 @@ public class JobHistoryEventHandler exte
return isTimerShutDown;
}
+ String getForcedJobStateOnShutDown() {
+ return forcedJobStateOnShutDown;
+ }
+
@Override
public String toString() {
return "Job MetaInfo for "+ jobSummary.getJobId()
@@ -983,4 +998,20 @@ public class JobHistoryEventHandler exte
LOG.info("JobHistoryEventHandler notified that forceJobCompletion is "
+ forceJobCompletion);
}
+
+ private String createJobStateForJobUnsuccessfulCompletionEvent(
+ String forcedJobStateOnShutDown) {
+ if (forcedJobStateOnShutDown == null || forcedJobStateOnShutDown
+ .isEmpty()) {
+ return JobState.KILLED.toString();
+ } else if (forcedJobStateOnShutDown.equals(
+ JobStateInternal.ERROR.toString()) ||
+ forcedJobStateOnShutDown.equals(JobStateInternal.FAILED.toString())) {
+ return JobState.FAILED.toString();
+ } else if (forcedJobStateOnShutDown.equals(JobStateInternal.SUCCEEDED
+ .toString())) {
+ return JobState.SUCCEEDED.toString();
+ }
+ return JobState.KILLED.toString();
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Tue Mar 25 02:01:13 2014
@@ -1012,14 +1012,13 @@ public class MRAppMaster extends Composi
AMInfo amInfo =
MRBuilderUtils.newAMInfo(appAttemptID, startTime, containerID, nmHost,
nmPort, nmHttpPort);
- amInfos.add(amInfo);
// /////////////////// Create the job itself.
job = createJob(getConfig(), forcedState, shutDownMessage);
// End of creating the job.
- // Send out an MR AM inited event for this AM and all previous AMs.
+ // Send out an MR AM inited event for all previous AMs.
for (AMInfo info : amInfos) {
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(info
@@ -1028,6 +1027,15 @@ public class MRAppMaster extends Composi
.getNodeManagerHttpPort())));
}
+ // Send out an MR AM inited event for this AM.
+ dispatcher.getEventHandler().handle(
+ new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
+ .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
+ amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
+ .getNodeManagerHttpPort(), this.forcedState == null ? null
+ : this.forcedState.toString())));
+ amInfos.add(amInfo);
+
// metrics system init is really init & start.
// It's more test friendly to put it here.
DefaultMetricsSystem.initialize("MRAppMaster");
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Tue Mar 25 02:01:13 2014
@@ -497,7 +497,7 @@ class JHEventHandlerForSigtermTest exten
JobHistoryEvent lastEventHandled;
int eventsHandled = 0;
@Override
- protected void handleEvent(JobHistoryEvent event) {
+ public void handleEvent(JobHistoryEvent event) {
this.lastEventHandled = event;
this.eventsHandled++;
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRAppMaster.java Tue Mar 25 02:01:13 2014
@@ -21,7 +21,8 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
-
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.times;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
@@ -44,6 +45,10 @@ import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TypeConverter;
+import org.apache.hadoop.mapreduce.jobhistory.HistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
+import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler;
+import org.apache.hadoop.mapreduce.jobhistory.JobUnsuccessfulCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.commit.CommitterEvent;
@@ -70,6 +75,8 @@ import org.apache.log4j.Logger;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
public class TestMRAppMaster {
private static final Log LOG = LogFactory.getLog(TestMRAppMaster.class);
@@ -120,7 +127,7 @@ public class TestMRAppMaster {
assertEquals(userStagingPath.toString(),
appMaster.stagingDirPath.toString());
}
-
+
@Test
public void testMRAppMasterMidLock() throws IOException,
InterruptedException {
@@ -154,6 +161,9 @@ public class TestMRAppMaster {
assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.ERROR, appMaster.forcedState);
appMaster.stop();
+
+ // verify the final status is FAILED
+ verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
}
@Test
@@ -190,6 +200,9 @@ public class TestMRAppMaster {
assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.SUCCEEDED, appMaster.forcedState);
appMaster.stop();
+
+ // verify the final status is SUCCEEDED
+ verifyFailedStatus((MRAppMasterTest)appMaster, "SUCCEEDED");
}
@Test
@@ -226,6 +239,9 @@ public class TestMRAppMaster {
assertTrue(appMaster.errorHappenedShutDown);
assertEquals(JobStateInternal.FAILED, appMaster.forcedState);
appMaster.stop();
+
+ // verify the final status is FAILED
+ verifyFailedStatus((MRAppMasterTest)appMaster, "FAILED");
}
@Test
@@ -423,8 +439,20 @@ public class TestMRAppMaster {
}
-}
+ private void verifyFailedStatus(MRAppMasterTest appMaster,
+ String expectedJobState) {
+ ArgumentCaptor<JobHistoryEvent> captor = ArgumentCaptor
+ .forClass(JobHistoryEvent.class);
+ // handle two events: AMStartedEvent and JobUnsuccessfulCompletionEvent
+ verify(appMaster.spyHistoryService, times(2))
+ .handleEvent(captor.capture());
+ HistoryEvent event = captor.getValue().getHistoryEvent();
+ assertTrue(event instanceof JobUnsuccessfulCompletionEvent);
+ assertEquals(((JobUnsuccessfulCompletionEvent) event).getStatus()
+ , expectedJobState);
+ }
+}
class MRAppMasterTest extends MRAppMaster {
Path stagingDirPath;
@@ -434,6 +462,7 @@ class MRAppMasterTest extends MRAppMaste
ContainerAllocator mockContainerAllocator;
CommitterEventHandler mockCommitterEventHandler;
RMHeartbeatHandler mockRMHeartbeatHandler;
+ JobHistoryEventHandler spyHistoryService;
public MRAppMasterTest(ApplicationAttemptId applicationAttemptId,
ContainerId containerId, String host, int port, int httpPort,
@@ -502,4 +531,14 @@ class MRAppMasterTest extends MRAppMaste
public UserGroupInformation getUgi() {
return currentUser;
}
+
+ @Override
+ protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
+ AppContext context) {
+ spyHistoryService =
+ Mockito.spy((JobHistoryEventHandler) super
+ .createJobHistoryHandler(context));
+ spyHistoryService.setForcejobCompletion(this.isLastAMRetry);
+ return spyHistoryService;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java?rev=1581181&r1=1581180&r2=1581181&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/AMStartedEvent.java Tue Mar 25 02:01:13 2014
@@ -34,6 +34,7 @@ import org.apache.avro.util.Utf8;
@InterfaceStability.Unstable
public class AMStartedEvent implements HistoryEvent {
private AMStarted datum = new AMStarted();
+ private String forcedJobStateOnShutDown;
/**
* Create an event to record the start of an MR AppMaster
@@ -54,12 +55,38 @@ public class AMStartedEvent implements H
public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
int nodeManagerHttpPort) {
+ this(appAttemptId, startTime, containerId, nodeManagerHost,
+ nodeManagerPort, nodeManagerHttpPort, null);
+ }
+
+ /**
+ * Create an event to record the start of an MR AppMaster
+ *
+ * @param appAttemptId
+ * the application attempt id.
+ * @param startTime
+ * the start time of the AM.
+ * @param containerId
+ * the containerId of the AM.
+ * @param nodeManagerHost
+ * the node on which the AM is running.
+ * @param nodeManagerPort
+ * the port on which the AM is running.
+ * @param nodeManagerHttpPort
+ * the httpPort for the node running the AM.
+ * @param forcedJobStateOnShutDown
+ * the state to force the job into
+ */
+ public AMStartedEvent(ApplicationAttemptId appAttemptId, long startTime,
+ ContainerId containerId, String nodeManagerHost, int nodeManagerPort,
+ int nodeManagerHttpPort, String forcedJobStateOnShutDown) {
datum.applicationAttemptId = new Utf8(appAttemptId.toString());
datum.startTime = startTime;
datum.containerId = new Utf8(containerId.toString());
datum.nodeManagerHost = new Utf8(nodeManagerHost);
datum.nodeManagerPort = nodeManagerPort;
datum.nodeManagerHttpPort = nodeManagerHttpPort;
+ this.forcedJobStateOnShutDown = forcedJobStateOnShutDown;
}
AMStartedEvent() {
@@ -116,6 +143,13 @@ public class AMStartedEvent implements H
return datum.nodeManagerHttpPort;
}
+ /**
+ * @return the state to force the job into
+ */
+ public String getForcedJobStateOnShutDown() {
+ return this.forcedJobStateOnShutDown;
+ }
+
/** Get the attempt id */
@Override