You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/10 21:04:08 UTC
[2/4] oozie git commit: consolidate killing children in
ActionExecutors
consolidate killing children in ActionExecutors
Change-Id: Ifa2b4c0cc6fab7c4d24815c6da5ec9a604d4d28d
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/e8e5bd0b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/e8e5bd0b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/e8e5bd0b
Branch: refs/heads/oya
Commit: e8e5bd0b47d6ce384374dae997b35d92d12d005a
Parents: 4d8549d
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Tue May 9 18:34:13 2017 -0700
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Tue May 9 18:44:39 2017 -0700
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 13 ++++-
.../action/hadoop/MapReduceActionExecutor.java | 52 --------------------
2 files changed, 11 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/e8e5bd0b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 980bd95..45a43bf 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -1506,9 +1507,17 @@ public class JavaActionExecutor extends ActionExecutor {
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
- Configuration jobConf = createBaseHadoopConf(context, actionXml);
+ final Configuration jobConf = createBaseHadoopConf(context, actionXml);
+ String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action);
+ jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
+ jobConf.set(LauncherMain.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime()));
yarnClient = createYarnClient(context, jobConf);
- yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
+ for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL)){
+ yarnClient.killApplication(id);
+ }
+ if(action.getExternalId() != null) {
+ yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
+ }
context.setExternalStatus(KILLED);
context.setExecutionData(KILLED, null);
http://git-wip-us.apache.org/repos/asf/oozie/blob/e8e5bd0b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index c15e362..dfe19d1 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -398,58 +398,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
}
@Override
- public void kill(final Context context, final WorkflowAction action) throws ActionExecutorException {
- // Kill the LauncherAM which submits the MR job
- super.kill(context, action);
- // TODO if action.getExternalChildIDs() is not empty, then kill based on that
- // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get
- // the YARN ApplicationID based on the tag and kill it as well
- YarnClient yarnClient = null;
- try {
- String tag = LauncherHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(),
- context.getWorkflow(), action));
- GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
- gar.setScope(ApplicationsRequestScope.ALL);
- gar.setApplicationTags(Collections.singleton(tag));
- Element actionXml = XmlUtils.parseXml(action.getConf());
- Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
- ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
- GetApplicationsResponse apps = proxy.getApplications(gar);
- List<ApplicationReport> appsList = apps.getApplicationList();
-
- if (appsList.size() > 1) {
- String applications = Joiner.on(",").join(Iterables.transform(appsList, new Function<ApplicationReport, String>() {
- @Override
- public String apply(@Nonnull ApplicationReport input) {
- return input.toString();
- }
- }));
-
- LOG.error("Too many applications were returned: {0}", applications);
- throw new IllegalArgumentException("Too many applications were returned");
- } else if (appsList.size() == 1) {
-
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(actionConf);
- yarnClient.start();
-
- ApplicationReport app = appsList.get(0);
- LOG.info("Killing MapReduce job {0}, YARN Id: {1}", action.getExternalChildIDs(),
- app.getApplicationId().toString());
- yarnClient.killApplication(app.getApplicationId());
- } else {
- LOG.info("No MapReduce job to kill");
- }
- } catch (Exception e) {
- throw convertException(e);
- } finally {
- if (yarnClient != null) {
- Closeables.closeQuietly(yarnClient);
- }
- }
- }
-
- @Override
void injectActionCallback(Context context, Configuration actionConf) {
injectCallback(context, actionConf);
}