You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/03/04 06:25:59 UTC
svn commit: r1296749 - in
/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/s...
Author: vinodkv
Date: Sun Mar 4 05:25:59 2012
New Revision: 1296749
URL: http://svn.apache.org/viewvc?rev=1296749&view=rev
Log:
MAPREDUCE-3614. Fixed MR AM to close history file quickly and send a correct final state to the RM when it is killed. Contributed by Ravi Prakash.
svn merge --ignore-ancestry -c 1296747 ../../trunk/
Modified:
hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/CHANGES.txt Sun Mar 4 05:25:59 2012
@@ -26,6 +26,9 @@ Release 0.23.2 - UNRELEASED
MAPREDUCE-2793. Corrected AppIDs, JobIDs, TaskAttemptIDs to be of correct
format on the web pages. (Bikas Saha via vinodkv)
+ MAPREDUCE-3614. Fixed MR AM to close history file quickly and send a correct
+ final state to the RM when it is killed. (Ravi Prakash via vinodkv)
+
OPTIMIZATIONS
MAPREDUCE-3901. Modified JobHistory records in YARN to lazily load job and
Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Sun Mar 4 05:25:59 2012
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.Count
import org.apache.hadoop.mapreduce.JobCounter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
@@ -95,9 +96,12 @@ public class JobHistoryEventHandler exte
private static final Log LOG = LogFactory.getLog(
JobHistoryEventHandler.class);
- private static final Map<JobId, MetaInfo> fileMap =
+ protected static final Map<JobId, MetaInfo> fileMap =
Collections.<JobId,MetaInfo>synchronizedMap(new HashMap<JobId,MetaInfo>());
+ // Has a signal (SIGTERM etc) been issued?
+ protected volatile boolean isSignalled = false;
+
public JobHistoryEventHandler(AppContext context, int startCount) {
super("JobHistoryEventHandler");
this.context = context;
@@ -314,7 +318,30 @@ public class JobHistoryEventHandler exte
LOG.info("In stop, writing event " + ev.getType());
handleEvent(ev);
}
-
+
+ // Process JobUnsuccessfulCompletionEvent for jobIds which still haven't
+ // closed their event writers
+ Iterator<JobId> jobIt = fileMap.keySet().iterator();
+ if(isSignalled) {
+ while (jobIt.hasNext()) {
+ JobId toClose = jobIt.next();
+ MetaInfo mi = fileMap.get(toClose);
+ if(mi != null && mi.isWriterActive()) {
+ LOG.warn("Found jobId " + toClose
+ + " to have not been closed. Will close");
+ //Create a JobFinishEvent so that it is written to the job history
+ JobUnsuccessfulCompletionEvent jucEvent =
+ new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(toClose),
+ System.currentTimeMillis(), context.getJob(toClose)
+ .getCompletedMaps(), context.getJob(toClose).getCompletedReduces(),
+ JobState.KILLED.toString());
+ JobHistoryEvent jfEvent = new JobHistoryEvent(toClose, jucEvent);
+ //Bypass the queue mechanism which might wait. Call the method directly
+ handleEvent(jfEvent);
+ }
+ }
+ }
+
//close all file handles
for (MetaInfo mi : fileMap.values()) {
try {
@@ -710,7 +737,7 @@ public class JobHistoryEventHandler exte
}
}
- private class MetaInfo {
+ protected class MetaInfo {
private Path historyFile;
private Path confFile;
private EventWriter writer;
@@ -880,4 +907,10 @@ public class JobHistoryEventHandler exte
//TODO. Some error checking here.
return tmpFileName.substring(0, tmpFileName.length()-4);
}
+
+ public void setSignalled(boolean isSignalled) {
+ this.isSignalled = isSignalled;
+ LOG.info("JobHistoryEventHandler notified that isSignalled was "
+ + isSignalled);
+ }
}
Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sun Mar 4 05:25:59 2012
@@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent;
+import org.apache.hadoop.mapreduce.v2.app.rm.RMCommunicator;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
@@ -155,6 +156,7 @@ public class MRAppMaster extends Composi
private boolean newApiCommitter;
private OutputCommitter committer;
private JobEventDispatcher jobEventDispatcher;
+ private JobHistoryEventHandler jobHistoryEventHandler;
private boolean inRecovery = false;
private SpeculatorEventDispatcher speculatorEventDispatcher;
@@ -502,9 +504,9 @@ public class MRAppMaster extends Composi
protected EventHandler<JobHistoryEvent> createJobHistoryHandler(
AppContext context) {
- JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context,
- getStartCount());
- return eventHandler;
+ this.jobHistoryEventHandler = new JobHistoryEventHandler(context,
+ getStartCount());
+ return this.jobHistoryEventHandler;
}
protected Speculator createSpeculator(Configuration conf, AppContext context) {
@@ -659,6 +661,10 @@ public class MRAppMaster extends Composi
public void handle(ContainerAllocatorEvent event) {
this.containerAllocator.handle(event);
}
+
+ public void setSignalled(boolean isSignalled) {
+ ((RMCommunicator) containerAllocator).setSignalled(true);
+ }
}
/**
@@ -957,12 +963,16 @@ public class MRAppMaster extends Composi
Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime);
Runtime.getRuntime().addShutdownHook(
- new CompositeServiceShutdownHook(appMaster));
+ new MRAppMasterShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
String jobUserName = System
.getenv(ApplicationConstants.Environment.USER.name());
conf.set(MRJobConfig.USER_NAME, jobUserName);
+ // Do not automatically close FileSystem objects so that in case of
+ // SIGTERM I have a chance to write out the job history. I'll be closing
+ // the objects myself.
+ conf.setBoolean("fs.automatic.close", false);
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
@@ -970,6 +980,35 @@ public class MRAppMaster extends Composi
}
}
+ // The shutdown hook that runs when a signal is received AND during normal
+ // close of the JVM.
+ static class MRAppMasterShutdownHook extends Thread {
+ MRAppMaster appMaster;
+ MRAppMasterShutdownHook(MRAppMaster appMaster) {
+ this.appMaster = appMaster;
+ }
+ public void run() {
+ LOG.info("MRAppMaster received a signal. Signaling RMCommunicator and "
+ + "JobHistoryEventHandler.");
+ // Notify the JHEH and RMCommunicator that a SIGTERM has been received so
+ // that they don't take too long in shutting down
+ if(appMaster.containerAllocator instanceof ContainerAllocatorRouter) {
+ ((ContainerAllocatorRouter) appMaster.containerAllocator)
+ .setSignalled(true);
+ }
+ if(appMaster.jobHistoryEventHandler != null) {
+ appMaster.jobHistoryEventHandler.setSignalled(true);
+ }
+ appMaster.stop();
+ try {
+ //Close all the FileSystem objects
+ FileSystem.closeAll();
+ } catch (IOException ioe) {
+ LOG.warn("Failed to close all FileSystem objects", ioe);
+ }
+ }
+ }
+
protected static void initAndStartAppMaster(final MRAppMaster appMaster,
final YarnConfiguration conf, String jobUserName) throws IOException,
InterruptedException {
Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMCommunicator.java Sun Mar 4 05:25:59 2012
@@ -81,6 +81,8 @@ public abstract class RMCommunicator ext
private final AppContext context;
private Job job;
+ // Has a signal (SIGTERM etc) been issued?
+ protected volatile boolean isSignalled = false;
public RMCommunicator(ClientService clientService, AppContext context) {
super("RMCommunicator");
@@ -158,7 +160,8 @@ public abstract class RMCommunicator ext
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
if (job.getState() == JobState.SUCCEEDED) {
finishState = FinalApplicationStatus.SUCCEEDED;
- } else if (job.getState() == JobState.KILLED) {
+ } else if (job.getState() == JobState.KILLED
+ || (job.getState() == JobState.RUNNING && isSignalled)) {
finishState = FinalApplicationStatus.KILLED;
} else if (job.getState() == JobState.FAILED
|| job.getState() == JobState.ERROR) {
@@ -278,4 +281,9 @@ public abstract class RMCommunicator ext
}
protected abstract void heartbeat() throws Exception;
+
+ public void setSignalled(boolean isSignalled) {
+ this.isSignalled = isSignalled;
+ LOG.info("RMCommunicator notified that iSignalled was : " + isSignalled);
+ }
}
Modified: hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1296749&r1=1296748&r2=1296749&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/branches/branch-0.23.2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Sun Mar 4 05:25:59 2012
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.jobhistory;
+import static junit.framework.Assert.assertFalse;
+import static junit.framework.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
@@ -48,6 +50,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.verification.VerificationMode;
public class TestJobHistoryEventHandler {
@@ -277,6 +281,68 @@ public class TestJobHistoryEventHandler
JobId jobId = MRBuilderUtils.newJobId(appId, 1);
AppContext mockAppContext = mockAppContext(jobId);
}
+
+ private JobHistoryEvent getEventToEnqueue(JobId jobId) {
+ JobHistoryEvent toReturn = Mockito.mock(JobHistoryEvent.class);
+ HistoryEvent he = Mockito.mock(HistoryEvent.class);
+ Mockito.when(he.getEventType()).thenReturn(EventType.JOB_STATUS_CHANGED);
+ Mockito.when(toReturn.getHistoryEvent()).thenReturn(he);
+ Mockito.when(toReturn.getJobID()).thenReturn(jobId);
+ return toReturn;
+ }
+
+ @Test
+ /**
+ * Tests that in case of SIGTERM, the JHEH stops without processing its event
+ * queue (because we must stop quickly lest we get SIGKILLed) and processes
+ * a JobUnsuccessfulEvent for jobs which were still running (so that they may
+ * show up in the JobHistoryServer)
+ */
+ public void testSigTermedFunctionality() throws IOException {
+ AppContext mockedContext = Mockito.mock(AppContext.class);
+ JHEventHandlerForSigtermTest jheh =
+ new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+ JobId jobId = Mockito.mock(JobId.class);
+ jheh.addToFileMap(jobId);
+
+ //Submit 4 events and check that they're handled in the absence of a signal
+ final int numEvents = 4;
+ JobHistoryEvent events[] = new JobHistoryEvent[numEvents];
+ for(int i=0; i < numEvents; ++i) {
+ events[i] = getEventToEnqueue(jobId);
+ jheh.handle(events[i]);
+ }
+ jheh.stop();
+ //Make sure events were handled
+ assertTrue("handleEvent should've been called only 4 times but was "
+ + jheh.eventsHandled, jheh.eventsHandled == 4);
+
+ //Create a new jheh because the last stop closed the eventWriter etc.
+ jheh = new JHEventHandlerForSigtermTest(mockedContext, 0);
+
+ // Make constructor of JobUnsuccessfulCompletionEvent pass
+ Job job = Mockito.mock(Job.class);
+ Mockito.when(mockedContext.getJob(jobId)).thenReturn(job);
+ // Make TypeConverter(JobID) pass
+ ApplicationId mockAppId = Mockito.mock(ApplicationId.class);
+ Mockito.when(mockAppId.getClusterTimestamp()).thenReturn(1000l);
+ Mockito.when(jobId.getAppId()).thenReturn(mockAppId);
+
+ jheh.addToFileMap(jobId);
+ jheh.setSignalled(true);
+ for(int i=0; i < numEvents; ++i) {
+ events[i] = getEventToEnqueue(jobId);
+ jheh.handle(events[i]);
+ }
+ jheh.stop();
+ //Make sure events were handled, 4 + 1 finish event
+ assertTrue("handleEvent should've been called only 5 times but was "
+ + jheh.eventsHandled, jheh.eventsHandled == 5);
+ assertTrue("Last event handled wasn't JobUnsuccessfulCompletionEvent",
+ jheh.lastEventHandled.getHistoryEvent()
+ instanceof JobUnsuccessfulCompletionEvent);
+ }
}
class JHEvenHandlerForTest extends JobHistoryEventHandler {
@@ -307,4 +373,28 @@ class JHEvenHandlerForTest extends JobHi
public EventWriter getEventWriter() {
return this.eventWriter;
}
+}
+
+/**
+ * Class to help with testSigTermedFunctionality
+ */
+class JHEventHandlerForSigtermTest extends JobHistoryEventHandler {
+ private MetaInfo metaInfo;
+ public JHEventHandlerForSigtermTest(AppContext context, int startCount) {
+ super(context, startCount);
+ }
+
+ public void addToFileMap(JobId jobId) {
+ metaInfo = Mockito.mock(MetaInfo.class);
+ Mockito.when(metaInfo.isWriterActive()).thenReturn(true);
+ fileMap.put(jobId, metaInfo);
+ }
+
+ JobHistoryEvent lastEventHandled;
+ int eventsHandled = 0;
+ @Override
+ protected void handleEvent(JobHistoryEvent event) {
+ this.lastEventHandled = event;
+ this.eventsHandled++;
+ }
}
\ No newline at end of file