You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/09/28 21:14:17 UTC

svn commit: r1527219 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apac...

Author: vinodkv
Date: Sat Sep 28 19:14:16 2013
New Revision: 1527219

URL: http://svn.apache.org/r1527219
Log:
MAPREDUCE-5538. Fixed MR AppMaster to send job-notification URL only after the job is really done - a bug caused by MAPREDUCE-5505. Contributed by Zhijie Shen.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1527219&r1=1527218&r2=1527219&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sat Sep 28 19:14:16 2013
@@ -245,6 +245,10 @@ Release 2.1.2 - UNRELEASED
     by re-introducing (get,set)PartitionFile which takes in JobConf. (Robert 
     Kanter via acmurthy)
 
+    MAPREDUCE-5538. Fixed MR AppMaster to send job-notification URL only after
+    the job is really done - a bug caused by MAPREDUCE-5505. (Zhijie Shen via
+    vinodkv)
+
 Release 2.1.1-beta - 2013-09-23
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java?rev=1527219&r1=1527218&r2=1527219&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java Sat Sep 28 19:14:16 2013
@@ -531,19 +531,6 @@ public class MRAppMaster extends Composi
     // this is the only job, so shut down the Appmaster
     // note in a workflow scenario, this may lead to creation of a new
     // job (FIXME?)
-    // Send job-end notification
-    if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
-      try {
-        LOG.info("Job end notification started for jobID : "
-            + job.getReport().getJobId());
-        JobEndNotifier notifier = new JobEndNotifier();
-        notifier.setConf(getConfig());
-        notifier.notify(job.getReport());
-      } catch (InterruptedException ie) {
-        LOG.warn("Job end notification interrupted for jobID : "
-            + job.getReport().getJobId(), ie);
-      }
-    }
 
     try {
       //if isLastAMRetry comes as true, should never set it to false
@@ -559,10 +546,28 @@ public class MRAppMaster extends Composi
       LOG.info("Calling stop for all the services");
       MRAppMaster.this.stop();
 
-      // Except ClientService, other services are already stopped, it is safe to
-      // let clients know the final states. ClientService should wait for some
-      // time so clients have enough time to know the final states.
-      safeToReportTerminationToUser.set(true);
+      if (isLastAMRetry) {
+        // Except ClientService, other services are already stopped, it is safe to
+        // let clients know the final states. ClientService should wait for some
+        // time so clients have enough time to know the final states.
+        safeToReportTerminationToUser.set(true);
+
+        // Send job-end notification when it is safe to report termination to
+        // users and it is the last AM retry
+        if (getConfig().get(MRJobConfig.MR_JOB_END_NOTIFICATION_URL) != null) {
+          try {
+            LOG.info("Job end notification started for jobID : "
+                + job.getReport().getJobId());
+            JobEndNotifier notifier = new JobEndNotifier();
+            notifier.setConf(getConfig());
+            notifier.notify(job.getReport());
+          } catch (InterruptedException ie) {
+            LOG.warn("Job end notification interrupted for jobID : "
+                + job.getReport().getJobId(), ie);
+          }
+        }
+      }
+
       try {
         Thread.sleep(5000);
       } catch (InterruptedException e) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java?rev=1527219&r1=1527218&r2=1527219&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java Sat Sep 28 19:14:16 2013
@@ -128,6 +128,8 @@ import org.apache.hadoop.yarn.state.Stat
 import org.apache.hadoop.yarn.state.StateMachineFactory;
 import org.apache.hadoop.yarn.util.Clock;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /** Implementation of Job interface. Maintains the state machines of Job.
  * The read and write calls use ReadWriteLock for concurrency.
  */

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java?rev=1527219&r1=1527218&r2=1527219&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestJobEndNotifier.java Sat Sep 28 19:14:16 2013
@@ -18,19 +18,41 @@
 
 package org.apache.hadoop.mapreduce.v2.app;
 
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
 import java.net.Proxy;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.http.HttpServer;
+import org.apache.hadoop.mapred.JobContext;
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
+import org.apache.hadoop.mapreduce.v2.api.records.JobState;
+import org.apache.hadoop.mapreduce.v2.app.job.JobStateInternal;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
+import org.apache.hadoop.mapreduce.v2.app.job.event.JobEventType;
+import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
 /**
  * Tests job end notification
  *
  */
+@SuppressWarnings("unchecked")
 public class TestJobEndNotifier extends JobEndNotifier {
 
   //Test maximum retries is capped by MR_JOB_END_NOTIFICATION_MAX_ATTEMPTS
@@ -133,7 +155,7 @@ public class TestJobEndNotifier extends 
   public void testNotifyRetries() throws InterruptedException {
     Configuration conf = new Configuration();
     conf.set(MRJobConfig.MR_JOB_END_NOTIFICATION_URL, "http://nonexistent");
-    JobReport jobReport = Mockito.mock(JobReport.class);
+    JobReport jobReport = mock(JobReport.class);
  
     long startTime = System.currentTimeMillis();
     this.notificationCount = 0;
@@ -162,4 +184,100 @@ public class TestJobEndNotifier extends 
 
   }
 
+  @Test
+  public void testNotificationOnNormalShutdown() throws Exception {
+    HttpServer server = startHttpServer();
+    // Act like it is the second attempt. Default max attempts is 2
+    MRApp app = spy(new MRApp(2, 2, true, this.getClass().getName(), true, 2));
+    // Make use of safeToReportflag so that we can look at final job-state as
+    // seen by real users.
+    app.safeToReportTerminationToUser.set(false);
+    doNothing().when(app).sysexit();
+    Configuration conf = new Configuration();
+    conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
+        JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+    JobImpl job = (JobImpl)app.submit(conf);
+    // Even though auto-complete is true, because app is not shut-down yet, user
+    // will only see RUNNING state.
+    app.waitForInternalState(job, JobStateInternal.SUCCEEDED);
+    app.waitForState(job, JobState.RUNNING);
+    // Now shutdown. User should see SUCCEEDED state.
+    app.shutDownJob();
+    app.waitForState(job, JobState.SUCCEEDED);
+    Assert.assertEquals(true, app.isLastAMRetry());
+    Assert.assertEquals(1, JobEndServlet.calledTimes);
+    Assert.assertEquals("jobid=" + job.getID() + "&status=SUCCEEDED",
+        JobEndServlet.requestUri.getQuery());
+    Assert.assertEquals(JobState.SUCCEEDED.toString(),
+      JobEndServlet.foundJobState);
+    server.stop();
+  }
+
+  @Test
+  public void testNotificationOnNonLastRetryShutdown() throws Exception {
+    HttpServer server = startHttpServer();
+    MRApp app = spy(new MRApp(2, 2, false, this.getClass().getName(), true));
+    doNothing().when(app).sysexit();
+    // Make use of safeToReportflag so that we can look at final job-state as
+    // seen by real users.
+    app.safeToReportTerminationToUser.set(false);
+    Configuration conf = new Configuration();
+    conf.set(JobContext.MR_JOB_END_NOTIFICATION_URL,
+        JobEndServlet.baseUrl + "jobend?jobid=$jobId&status=$jobStatus");
+    JobImpl job = (JobImpl)app.submit(new Configuration());
+    app.waitForState(job, JobState.RUNNING);
+    app.getContext().getEventHandler()
+      .handle(new JobEvent(app.getJobId(), JobEventType.JOB_AM_REBOOT));
+    app.waitForInternalState(job, JobStateInternal.REBOOT);
+    // Not the last AM attempt. So user should that the job is still running.
+    app.waitForState(job, JobState.RUNNING);
+    app.shutDownJob();
+    Assert.assertEquals(false, app.isLastAMRetry());
+    Assert.assertEquals(0, JobEndServlet.calledTimes);
+    Assert.assertEquals(null, JobEndServlet.requestUri);
+    Assert.assertEquals(null, JobEndServlet.foundJobState);
+    server.stop();
+  }
+
+  private static HttpServer startHttpServer() throws Exception {
+    new File(System.getProperty(
+        "build.webapps", "build/webapps") + "/test").mkdirs();
+    HttpServer server = new HttpServer.Builder().setName("test")
+        .setBindAddress("0.0.0.0").setPort(0).setFindPort(true).build();
+    server.addServlet("jobend", "/jobend", JobEndServlet.class);
+    server.start();
+
+    JobEndServlet.calledTimes = 0;
+    JobEndServlet.requestUri = null;
+    JobEndServlet.baseUrl = "http://localhost:" + server.getPort() + "/";
+    JobEndServlet.foundJobState = null;
+    return server;
+  }
+
+  @SuppressWarnings("serial")
+  public static class JobEndServlet extends HttpServlet {
+    public static volatile int calledTimes = 0;
+    public static URI requestUri;
+    public static String baseUrl;
+    public static String foundJobState;
+
+    @Override
+    public void doGet(HttpServletRequest request, HttpServletResponse response)
+        throws ServletException, IOException {
+      InputStreamReader in = new InputStreamReader(request.getInputStream());
+      PrintStream out = new PrintStream(response.getOutputStream());
+
+      calledTimes++;
+      try {
+        requestUri = new URI(null, null,
+            request.getRequestURI(), request.getQueryString(), null);
+        foundJobState = request.getParameter("status");
+      } catch (URISyntaxException e) {
+      }
+
+      in.close();
+      out.close();
+    }
+  }
+
 }