You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/10/17 21:52:27 UTC

git commit: MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in client. Contributed by Rohith

Repository: hadoop
Updated Branches:
  refs/heads/trunk a6aa6e42c -> 209b1699f


MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in client. Contributed by Rohith


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/209b1699
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/209b1699
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/209b1699

Branch: refs/heads/trunk
Commit: 209b1699fcd150676d4cc47e8e817796086c1986
Parents: a6aa6e4
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Oct 17 19:51:10 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Oct 17 19:51:10 2014 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 +
 .../org/apache/hadoop/mapred/YARNRunner.java    | 76 ++++++++++++++------
 .../apache/hadoop/mapred/TestYARNRunner.java    | 11 +++
 3 files changed, 69 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/209b1699/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 80f6940..e152b48 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -435,6 +435,9 @@ Release 2.6.0 - UNRELEASED
     MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
     for maps (Siqi Li via jlowe)
 
+    MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in
+    client (Rohith via jlowe)
+
 Release 2.5.1 - 2014-09-05
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/209b1699/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index d3b80f3..40ef982 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -594,16 +594,50 @@ public class YARNRunner implements ClientProtocol {
         .getTaskReports(jobID, taskType);
   }
 
+  private void killUnFinishedApplication(ApplicationId appId)
+      throws IOException {
+    ApplicationReport application = null;
+    try {
+      application = resMgrDelegate.getApplicationReport(appId);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+    if (application.getYarnApplicationState() == YarnApplicationState.FINISHED
+        || application.getYarnApplicationState() == YarnApplicationState.FAILED
+        || application.getYarnApplicationState() == YarnApplicationState.KILLED) {
+      return;
+    }
+    killApplication(appId);
+  }
+
+  private void killApplication(ApplicationId appId) throws IOException {
+    try {
+      resMgrDelegate.killApplication(appId);
+    } catch (YarnException e) {
+      throw new IOException(e);
+    }
+  }
+
+  private boolean isJobInTerminalState(JobStatus status) {
+    return status.getState() == JobStatus.State.KILLED
+        || status.getState() == JobStatus.State.FAILED
+        || status.getState() == JobStatus.State.SUCCEEDED;
+  }
+
   @Override
   public void killJob(JobID arg0) throws IOException, InterruptedException {
     /* check if the status is not running, if not send kill to RM */
     JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+    ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
+
+    // get status from RM and return
+    if (status == null) {
+      killUnFinishedApplication(appId);
+      return;
+    }
+
     if (status.getState() != JobStatus.State.RUNNING) {
-      try {
-        resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
-      } catch (YarnException e) {
-        throw new IOException(e);
-      }
+      killApplication(appId);
       return;
     }
 
@@ -612,26 +646,26 @@ public class YARNRunner implements ClientProtocol {
       clientCache.getClient(arg0).killJob(arg0);
       long currentTimeMillis = System.currentTimeMillis();
       long timeKillIssued = currentTimeMillis;
-      while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
-          != JobStatus.State.KILLED)) {
-          try {
-            Thread.sleep(1000L);
-          } catch(InterruptedException ie) {
-            /** interrupted, just break */
-            break;
-          }
-          currentTimeMillis = System.currentTimeMillis();
-          status = clientCache.getClient(arg0).getJobStatus(arg0);
+      while ((currentTimeMillis < timeKillIssued + 10000L)
+          && !isJobInTerminalState(status)) {
+        try {
+          Thread.sleep(1000L);
+        } catch (InterruptedException ie) {
+          /** interrupted, just break */
+          break;
+        }
+        currentTimeMillis = System.currentTimeMillis();
+        status = clientCache.getClient(arg0).getJobStatus(arg0);
+        if (status == null) {
+          killUnFinishedApplication(appId);
+          return;
+        }
       }
     } catch(IOException io) {
       LOG.debug("Error when checking for application status", io);
     }
-    if (status.getState() != JobStatus.State.KILLED) {
-      try {
-        resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
-      } catch (YarnException e) {
-        throw new IOException(e);
-      }
+    if (status != null && !isJobInTerminalState(status)) {
+      killApplication(appId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/209b1699/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index 2567785..420a95f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -188,6 +189,16 @@ public class TestYARNRunner extends TestCase {
             State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
     yarnRunner.killJob(jobId);
     verify(clientDelegate).killJob(jobId);
+
+    when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null);
+    when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class)))
+        .thenReturn(
+            ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp",
+                "tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp",
+                0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f,
+                "tmp", null));
+    yarnRunner.killJob(jobId);
+    verify(clientDelegate).killJob(jobId);
   }
 
   @Test(timeout=20000)