You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2014/10/17 21:52:27 UTC
git commit: MAPREDUCE-5542. Killing a job just as it finishes can
generate an NPE in client. Contributed by Rohith
Repository: hadoop
Updated Branches:
refs/heads/trunk a6aa6e42c -> 209b1699f
MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in client. Contributed by Rohith
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/209b1699
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/209b1699
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/209b1699
Branch: refs/heads/trunk
Commit: 209b1699fcd150676d4cc47e8e817796086c1986
Parents: a6aa6e4
Author: Jason Lowe <jl...@apache.org>
Authored: Fri Oct 17 19:51:10 2014 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Fri Oct 17 19:51:10 2014 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../org/apache/hadoop/mapred/YARNRunner.java | 76 ++++++++++++++------
.../apache/hadoop/mapred/TestYARNRunner.java | 11 +++
3 files changed, 69 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/209b1699/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 80f6940..e152b48 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -435,6 +435,9 @@ Release 2.6.0 - UNRELEASED
MAPREDUCE-5873. Shuffle bandwidth computation includes time spent waiting
for maps (Siqi Li via jlowe)
+ MAPREDUCE-5542. Killing a job just as it finishes can generate an NPE in
+ client (Rohith via jlowe)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/209b1699/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
index d3b80f3..40ef982 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java
@@ -594,16 +594,50 @@ public class YARNRunner implements ClientProtocol {
.getTaskReports(jobID, taskType);
}
+ private void killUnFinishedApplication(ApplicationId appId)
+ throws IOException {
+ ApplicationReport application = null;
+ try {
+ application = resMgrDelegate.getApplicationReport(appId);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ if (application.getYarnApplicationState() == YarnApplicationState.FINISHED
+ || application.getYarnApplicationState() == YarnApplicationState.FAILED
+ || application.getYarnApplicationState() == YarnApplicationState.KILLED) {
+ return;
+ }
+ killApplication(appId);
+ }
+
+ private void killApplication(ApplicationId appId) throws IOException {
+ try {
+ resMgrDelegate.killApplication(appId);
+ } catch (YarnException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private boolean isJobInTerminalState(JobStatus status) {
+ return status.getState() == JobStatus.State.KILLED
+ || status.getState() == JobStatus.State.FAILED
+ || status.getState() == JobStatus.State.SUCCEEDED;
+ }
+
@Override
public void killJob(JobID arg0) throws IOException, InterruptedException {
/* check if the status is not running, if not send kill to RM */
JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0);
+ ApplicationId appId = TypeConverter.toYarn(arg0).getAppId();
+
+ // get status from RM and return
+ if (status == null) {
+ killUnFinishedApplication(appId);
+ return;
+ }
+
if (status.getState() != JobStatus.State.RUNNING) {
- try {
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
- } catch (YarnException e) {
- throw new IOException(e);
- }
+ killApplication(appId);
return;
}
@@ -612,26 +646,26 @@ public class YARNRunner implements ClientProtocol {
clientCache.getClient(arg0).killJob(arg0);
long currentTimeMillis = System.currentTimeMillis();
long timeKillIssued = currentTimeMillis;
- while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState()
- != JobStatus.State.KILLED)) {
- try {
- Thread.sleep(1000L);
- } catch(InterruptedException ie) {
- /** interrupted, just break */
- break;
- }
- currentTimeMillis = System.currentTimeMillis();
- status = clientCache.getClient(arg0).getJobStatus(arg0);
+ while ((currentTimeMillis < timeKillIssued + 10000L)
+ && !isJobInTerminalState(status)) {
+ try {
+ Thread.sleep(1000L);
+ } catch (InterruptedException ie) {
+ /** interrupted, just break */
+ break;
+ }
+ currentTimeMillis = System.currentTimeMillis();
+ status = clientCache.getClient(arg0).getJobStatus(arg0);
+ if (status == null) {
+ killUnFinishedApplication(appId);
+ return;
+ }
}
} catch(IOException io) {
LOG.debug("Error when checking for application status", io);
}
- if (status.getState() != JobStatus.State.KILLED) {
- try {
- resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId());
- } catch (YarnException e) {
- throw new IOException(e);
- }
+ if (status != null && !isJobInTerminalState(status)) {
+ killApplication(appId);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/209b1699/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
index 2567785..420a95f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java
@@ -86,6 +86,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.YarnClusterMetrics;
@@ -188,6 +189,16 @@ public class TestYARNRunner extends TestCase {
State.RUNNING, JobPriority.HIGH, "tmp", "tmp", "tmp", "tmp"));
yarnRunner.killJob(jobId);
verify(clientDelegate).killJob(jobId);
+
+ when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(null);
+ when(resourceMgrDelegate.getApplicationReport(any(ApplicationId.class)))
+ .thenReturn(
+ ApplicationReport.newInstance(appId, null, "tmp", "tmp", "tmp",
+ "tmp", 0, null, YarnApplicationState.FINISHED, "tmp", "tmp",
+ 0l, 0l, FinalApplicationStatus.SUCCEEDED, null, null, 0f,
+ "tmp", null));
+ yarnRunner.killJob(jobId);
+ verify(clientDelegate).killJob(jobId);
}
@Test(timeout=20000)