You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by jl...@apache.org on 2012/10/24 17:25:22 UTC

svn commit: r1401729 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src: main/java/org/apache/hadoop/mapreduce/v2/app/ test/java/org/apache/hadoop/mapreduce/v2/app/

Author: jlowe
Date: Wed Oct 24 15:25:22 2012
New Revision: 1401729

URL: http://svn.apache.org/viewvc?rev=1401729&view=rev
Log:
svn merge -c 1401726 FIXES: YARN-139. Interrupted Exception within AsyncDispatcher leads to user confusion. Contributed by Vinod Kumar Vavilapalli

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1401729&r1=1401728&r2=1401729&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Wed Oct 24 15:25:22 2012
@@ -107,6 +107,8 @@ import org.apache.hadoop.yarn.service.Co
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * The Map-Reduce Application Master.
  * The state machine is encapsulated in the implementation of Job interface.
@@ -398,52 +400,65 @@ public class MRAppMaster extends Composi
   protected void sysexit() {
     System.exit(0);
   }
-  
-  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
-    @Override
-    public void handle(JobFinishEvent event) {
-      // job has finished
-      // this is the only job, so shut down the Appmaster
-      // note in a workflow scenario, this may lead to creation of a new
-      // job (FIXME?)
-      // Send job-end notification
-      if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
-        try {
-          LOG.info("Job end notification started for jobID : "
-              + job.getReport().getJobId());
-          JobEndNotifier notifier = new JobEndNotifier();
-          notifier.setConf(getConfig());
-          notifier.notify(job.getReport());
-        } catch (InterruptedException ie) {
-          LOG.warn("Job end notification interrupted for jobID : "
-              + job.getReport().getJobId(), ie);
-        }
-      }
 
-      // TODO:currently just wait for some time so clients can know the
-      // final states. Will be removed once RM come on.
+  @VisibleForTesting
+  public void shutDownJob() {
+    // job has finished
+    // this is the only job, so shut down the Appmaster
+    // note in a workflow scenario, this may lead to creation of a new
+    // job (FIXME?)
+    // Send job-end notification
+    if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
       try {
-        Thread.sleep(5000);
-      } catch (InterruptedException e) {
-        e.printStackTrace();
+        LOG.info("Job end notification started for jobID : "
+            + job.getReport().getJobId());
+        JobEndNotifier notifier = new JobEndNotifier();
+        notifier.setConf(getConfig());
+        notifier.notify(job.getReport());
+      } catch (InterruptedException ie) {
+        LOG.warn("Job end notification interrupted for jobID : "
+            + job.getReport().getJobId(), ie);
       }
+    }
 
-      try {
-        //We are finishing cleanly so this is the last retry
-        isLastAMRetry = true;
-        // Stop all services
-        // This will also send the final report to the ResourceManager
-        LOG.info("Calling stop for all the services");
-        stop();
-
-      } catch (Throwable t) {
-        LOG.warn("Graceful stop failed ", t);
-      }
+    // TODO:currently just wait for some time so clients can know the
+    // final states. Will be removed once RM come on.
+    try {
+      Thread.sleep(5000);
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+
+    try {
+      //We are finishing cleanly so this is the last retry
+      isLastAMRetry = true;
+      // Stop all services
+      // This will also send the final report to the ResourceManager
+      LOG.info("Calling stop for all the services");
+      MRAppMaster.this.stop();
+
+    } catch (Throwable t) {
+      LOG.warn("Graceful stop failed ", t);
+    }
 
-      //Bring the process down by force.
-      //Not needed after HADOOP-7140
-      LOG.info("Exiting MR AppMaster..GoodBye!");
-      sysexit();
+    //Bring the process down by force.
+    //Not needed after HADOOP-7140
+    LOG.info("Exiting MR AppMaster..GoodBye!");
+    sysexit();   
+  }
+ 
+  private class JobFinishEventHandler implements EventHandler<JobFinishEvent> {
+    @Override
+    public void handle(JobFinishEvent event) {
+      // Create a new thread to shutdown the AM. We should not do it in-line
+      // to avoid blocking the dispatcher itself.
+      new Thread() {
+        
+        @Override
+        public void run() {
+          shutDownJob();
+        }
+      }.start();
     }
   }
   

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java?rev=1401729&r1=1401728&r2=1401729&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestStagingCleanup.java Wed Oct 24 15:25:22 2012
@@ -21,17 +21,15 @@ package org.apache.hadoop.mapreduce.v2.a
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.times;
 
 import java.io.IOException;
 
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -49,7 +47,6 @@ import org.apache.hadoop.yarn.YarnExcept
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.service.AbstractService;
@@ -68,7 +65,6 @@ import org.junit.Test;
    private Path stagingJobPath = new Path(stagingJobDir);
    private final static RecordFactory recordFactory = RecordFactoryProvider.
        getRecordFactory(null);
-   private static final Log LOG = LogFactory.getLog(TestStagingCleanup.class);
    
    @Test
    public void testDeletionofStaging() throws IOException {
@@ -86,9 +82,7 @@ import org.junit.Test;
      jobid.setAppId(appId);
      MRAppMaster appMaster = new TestMRApp(attemptId);
      appMaster.init(conf);
-     EventHandler<JobFinishEvent> handler = 
-         appMaster.createJobFinishEventHandler();
-     handler.handle(new JobFinishEvent(jobid));
+     appMaster.shutDownJob();
      verify(fs).delete(stagingJobPath, true);
    }