You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/10/24 17:25:22 UTC
svn commit: r1401729 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src:
main/java/org/apache/hadoop/mapreduce/v2/app/
test/java/org/apache/hadoop/mapreduce/v2/app/
Author: jlowe
Date: Wed Oct 24 15:25:22 2012
New Revision: 1401729
URL: http://svn.apache.org/viewvc?rev=1401729&view=rev
Log:
svn merge -c 1401726 FIXES: YARN-139. Interrupted Exception within AsyncDispatcher leads to user confusion. Contributed by Vinod Kumar Vavilapalli
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1401729&r1=1401728&r2=1401729&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Oct 24 15:25:22 2012
@@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.Co
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.util.ConverterUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* The Map-Reduce Application Master.
* The state machine is encapsulated in the implementation of Job interface.
@@ -398,52 +400,65 @@ public class MRAppMaster extends Composi
protected void sysexit() {
System.exit(0);
}
-
- private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
- @Override
- public void handle(JobFinishEvent event) {
- // job has finished
- // this is the only job, so shut down the Appmaster
- // note in a workflow scenario, this may lead to creation of a new
- // job (FIXME?)
- // Send job-end notification
- if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
- try {
- LOG.info("Job end notification started for jobID : "
- + job.getReport().getJobId());
- JobEndNotifier notifier = new JobEndNotifier();
- notifier.setConf(getConfig());
- notifier.notify(job.getReport());
- } catch (InterruptedException ie) {
- LOG.warn("Job end notification interrupted for jobID : "
- + job.getReport().getJobId(), ie);
- }
- }
- // TODO:currently just wait for some time so clients can know the
- // final states. Will be removed once RM come on.
+ @VisibleForTesting
+ public void shutDownJob() {
+ // job has finished
+ // this is the only job, so shut down the Appmaster
+ // note in a workflow scenario, this may lead to creation of a new
+ // job (FIXME?)
+ // Send job-end notification
+ if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
try {
- Thread.sleep(5000);
- } catch (InterruptedException e) {
- e.printStackTrace();
+ LOG.info("Job end notification started for jobID : "
+ + job.getReport().getJobId());
+ JobEndNotifier notifier = new JobEndNotifier();
+ notifier.setConf(getConfig());
+ notifier.notify(job.getReport());
+ } catch (InterruptedException ie) {
+ LOG.warn("Job end notification interrupted for jobID : "
+ + job.getReport().getJobId(), ie);
}
+ }
- try {
- //We are finishing cleanly so this is the last retry
- isLastAMRetry = true;
- // Stop all services
- // This will also send the final report to the ResourceManager
- LOG.info("Calling stop for all the services");
- stop();
-
- } catch (Throwable t) {
- LOG.warn("Graceful stop failed ", t);
- }
+ // TODO:currently just wait for some time so clients can know the
+ // final states. Will be removed once RM come on.
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+
+ try {
+ //We are finishing cleanly so this is the last retry
+ isLastAMRetry = true;
+ // Stop all services
+ // This will also send the final report to the ResourceManager
+ LOG.info("Calling stop for all the services");
+ MRAppMaster.this.stop();
+
+ } catch (Throwable t) {
+ LOG.warn("Graceful stop failed ", t);
+ }
- //Bring the process down by force.
- //Not needed after HADOOP-7140
- LOG.info("Exiting MR AppMaster..GoodBye!");
- sysexit();
+ //Bring the process down by force.
+ //Not needed after HADOOP-7140
+ LOG.info("Exiting MR AppMaster..GoodBye!");
+ sysexit();
+ }
+
+ private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+ @Override
+ public void handle(JobFinishEvent event) {
+ // Create a new thread to shutdown the AM. We should not do it in-line
+ // to avoid blocking the dispatcher itself.
+ new Thread() {
+
+ @Override
+ public void run() {
+ shutDownJob();
+ }
+ }.start();
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1401729&r1=1401728&r2=1401729&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Wed Oct 24 15:25:22 2012
@@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.a
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
import java.io.IOException;
import junit.framework.Assert;
import junit.framework.TestCase;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnExcept
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.AbstractService;
@@ -68,7 +65,6 @@ import org.junit.Test;
private Path stagingJobPath = new Path(stagingJobDir);
private final static RecordFactory recordFactory = RecordFactoryProvider.
getRecordFactory(null);
- private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
@Test
public void testDeletionofStaging() throws IOException {
@@ -86,9 +82,7 @@ import org.junit.Test;
jobid.setAppId(appId);
MRAppMaster appMaster = new TestMRApp(attemptId);
appMaster.init(conf);
- EventHandler<JobFinishEvent> handler =
- appMaster.createJobFinishEventHandler();
- handler.handle(new JobFinishEvent(jobid));
+ appMaster.shutDownJob();
verify(fs).delete(stagingJobPath, true);
}