You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/09/04 12:35:20 UTC
oozie git commit: OOZIE-3061 Kill only those child jobs which are not
already killed (matijhs via gezapeti, andras.piros)
Repository: oozie
Updated Branches:
refs/heads/master 9735dd38c -> 6323a8e43
OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/6323a8e4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/6323a8e4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/6323a8e4
Branch: refs/heads/master
Commit: 6323a8e43e4d70bcad69059a5e162f48cd0ed43c
Parents: 9735dd3
Author: Andras Piros <an...@cloudera.com>
Authored: Tue Sep 4 14:35:09 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Tue Sep 4 14:35:09 2018 +0200
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 93 +++++++++++++-------
release-log.txt | 1 +
.../oozie/action/hadoop/LauncherMain.java | 48 +++++++---
.../oozie/action/hadoop/TestLauncherMain.java | 40 +++++++--
4 files changed, 135 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 8f0f244..05fac39 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -77,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
@@ -1780,36 +1781,11 @@ public class JavaActionExecutor extends ActionExecutor {
String launcherTag = getActionYarnTag(context, action);
jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
yarnClient = createYarnClient(context, jobConf);
- if(action.getExternalId() != null) {
- try {
- LOG.info("Killing action {0}'s external application {1}", action.getId(), action.getExternalId());
- yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
- } catch (Exception e) {
- LOG.warn("Could not kill {0}", action.getExternalId(), e);
- }
- }
- String externalChildIDs = action.getExternalChildIDs();
- if(externalChildIDs != null) {
- for(String childId : externalChildIDs.split(",")) {
- try {
- LOG.info("Killing action {0}'s external child application {1}", action.getId(), childId);
- yarnClient.killApplication(ConverterUtils.toApplicationId(childId.trim()));
- } catch (Exception e) {
- LOG.warn("Could not kill external child of {0}, {1}", action.getExternalId(),
- childId, e);
- }
- }
- }
- for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL,
- action.getStartTime().getTime())){
- try {
- LOG.info("Killing action {0}'s external child application {1} based on tags",
- action.getId(), id.toString());
- yarnClient.killApplication(id);
- } catch (Exception e) {
- LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id, e);
- }
- }
+
+ String appExternalId = action.getExternalId();
+ killExternalApp(action, yarnClient, appExternalId);
+ killExternalChildApp(action, yarnClient, appExternalId);
+ killExternalChildAppByTags(action, yarnClient, jobConf, appExternalId);
context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
@@ -1828,6 +1804,63 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
+ private boolean finalAppStatusUndefined(ApplicationReport appReport) {
+ FinalApplicationStatus status = appReport.getFinalApplicationStatus();
+ return !FinalApplicationStatus.SUCCEEDED.equals(status) &&
+ !FinalApplicationStatus.FAILED.equals(status) &&
+ !FinalApplicationStatus.KILLED.equals(status);
+ }
+
+ void killExternalApp(WorkflowAction action, YarnClient yarnClient, String appExternalId)
+ throws YarnException, IOException {
+ if (appExternalId != null) {
+ ApplicationId appId = ConverterUtils.toApplicationId(appExternalId);
+ if (finalAppStatusUndefined(yarnClient.getApplicationReport(appId))) {
+ try {
+ LOG.info("Killing action {0}''s external application {1}", action.getId(), appExternalId);
+ yarnClient.killApplication(appId);
+ } catch (Exception e) {
+ LOG.warn("Could not kill {0}", appExternalId, e);
+ }
+ }
+ }
+ }
+
+ void killExternalChildApp(WorkflowAction action, YarnClient yarnClient, String appExternalId)
+ throws YarnException, IOException {
+ String externalChildIDs = action.getExternalChildIDs();
+ if (externalChildIDs != null) {
+ for (String childId : externalChildIDs.split(",")) {
+ ApplicationId appChildId = ConverterUtils.toApplicationId(childId.trim());
+ if (finalAppStatusUndefined(yarnClient.getApplicationReport(appChildId))) {
+ try {
+ LOG.info("Killing action {0}''s external child application {1}", action.getId(), childId);
+ yarnClient.killApplication(appChildId);
+ } catch (Exception e) {
+ LOG.warn("Could not kill external child of {0}, {1}",
+ appExternalId, childId, e);
+ }
+ }
+ }
+ }
+ }
+
+ void killExternalChildAppByTags(WorkflowAction action, YarnClient yarnClient, Configuration jobConf, String appExternalId)
+ throws YarnException, IOException {
+ for (ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL,
+ action.getStartTime().getTime())) {
+ if (finalAppStatusUndefined(yarnClient.getApplicationReport(id))) {
+ try {
+ LOG.info("Killing action {0}''s external child application {1} based on tags",
+ action.getId(), id.toString());
+ yarnClient.killApplication(id);
+ } catch (Exception e) {
+ LOG.warn("Could not kill child of {0}, {1}", appExternalId, id, e);
+ }
+ }
+ }
+ }
+
private String getActionYarnTag(Context context, WorkflowAction action) {
return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ce945b4..caacb1f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3061 Kill only those child jobs which are not already killed (matijhs via gezapeti, andras.piros)
OOZIE-3155 [ui] Job DAG is not refreshed when a job is finished (asalamon74 via andras.piros)
OOZIE-3334 Don't use org.apache.hadoop.hbase.security.User in HDFSCredentials (gezapeti)
OOZIE-3210 [build] Revision information is empty (asalamon74 via andras.piros)
http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index e6e182c..c9e2a91 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -31,7 +31,9 @@ import java.io.OutputStreamWriter;
import java.io.StringWriter;
import java.io.Writer;
import java.net.URL;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashSet;
@@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.ClientRMProxy;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -202,6 +205,7 @@ public abstract class LauncherMain {
System.out.print("Could not find YARN tags property " + CHILD_MAPREDUCE_JOB_TAGS);
return childYarnJobs;
}
+
System.out.println("tag id : " + tag);
GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
gar.setScope(scope);
@@ -254,24 +258,44 @@ public abstract class LauncherMain {
try {
Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
if (!childYarnJobs.isEmpty()) {
- System.out.println();
- System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher");
- System.out.println("Killing existing applications and starting over:");
- YarnClient yarnClient = YarnClient.createYarnClient();
- yarnClient.init(actionConf);
- yarnClient.start();
- for (ApplicationId app : childYarnJobs) {
- System.out.print("Killing [" + app + "] ... ");
- yarnClient.killApplication(app);
- System.out.println("Done");
- }
- System.out.println();
+ checkAndKillChildYarnJobs(YarnClient.createYarnClient(), actionConf, childYarnJobs);
}
} catch (IOException | YarnException ye) {
throw new RuntimeException("Exception occurred while killing child job(s)", ye);
}
}
+ @VisibleForTesting
+ protected static Collection<ApplicationId> checkAndKillChildYarnJobs(YarnClient yarnClient,
+ Configuration actionConf,
+ Collection<ApplicationId> childYarnJobs)
+ throws YarnException, IOException {
+
+ System.out.println();
+ System.out.println("Found [" + childYarnJobs.size() + "] YARN application(s) from this launcher");
+ System.out.println("Killing existing applications and starting over:");
+ yarnClient.init(actionConf);
+ yarnClient.start();
+ Collection<ApplicationId> killedapps = new ArrayList<>();
+ for (ApplicationId app : childYarnJobs) {
+ if (finalAppStatusUndefined(yarnClient.getApplicationReport(app))) {
+ System.out.print("Killing [" + app + "] ... ");
+ yarnClient.killApplication(app);
+ System.out.println("Done");
+ killedapps.add(app);
+ }
+ }
+ System.out.println();
+ return killedapps;
+ }
+
+ private static boolean finalAppStatusUndefined(ApplicationReport appReport) {
+ FinalApplicationStatus status = appReport.getFinalApplicationStatus();
+ return !FinalApplicationStatus.SUCCEEDED.equals(status) &&
+ !FinalApplicationStatus.FAILED.equals(status) &&
+ !FinalApplicationStatus.KILLED.equals(status);
+ }
+
protected abstract void run(String[] args) throws Exception;
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/6323a8e4/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
index 5fb7cf5..b613978 100644
--- a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherMain.java
@@ -18,10 +18,11 @@
package org.apache.oozie.action.hadoop;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.conf.Configuration;
import java.io.ByteArrayOutputStream;
import java.io.File;
@@ -29,10 +30,16 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Properties;
-import org.apache.hadoop.conf.Configuration;
import org.junit.rules.TemporaryFolder;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -110,4 +117,27 @@ public class TestLauncherMain {
String contents = new String(Files.readAllBytes(f.toPath()));
assertTrue(contents.contains("foo=bar"));
}
+
+ @Test
+ public void testKillChildYarnJobs() throws Exception {
+ YarnClient yc = Mockito.mock(YarnClient.class);
+ ApplicationReport ar = Mockito.mock(ApplicationReport.class);
+ Mockito.when(yc.getApplicationReport(Mockito.any(ApplicationId.class))).thenReturn(ar);
+
+ Mockito.when(ar.getFinalApplicationStatus())
+ .thenReturn(FinalApplicationStatus.UNDEFINED)
+ .thenReturn(FinalApplicationStatus.FAILED)
+ .thenReturn(FinalApplicationStatus.KILLED);
+
+ ApplicationId appz[] = {
+ ApplicationId.newInstance(System.currentTimeMillis(), 1),
+ ApplicationId.newInstance(System.currentTimeMillis(), 2),
+ ApplicationId.newInstance(System.currentTimeMillis(), 3)
+ };
+
+ Collection<ApplicationId> result = LauncherMain.checkAndKillChildYarnJobs(yc, null, Arrays.asList(appz));
+
+ assertEquals(1, result.size());
+ assertEquals(appz[0].getId(), result.iterator().next().getId());
+ }
}