You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/10 21:04:08 UTC

[2/4] oozie git commit: consolidate killing children in ActionExecutors

consolidate killing children in ActionExecutors

Change-Id: Ifa2b4c0cc6fab7c4d24815c6da5ec9a604d4d28d


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/e8e5bd0b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/e8e5bd0b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/e8e5bd0b

Branch: refs/heads/oya
Commit: e8e5bd0b47d6ce384374dae997b35d92d12d005a
Parents: 4d8549d
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Tue May 9 18:34:13 2017 -0700
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Tue May 9 18:44:39 2017 -0700

----------------------------------------------------------------------
 .../oozie/action/hadoop/JavaActionExecutor.java | 13 ++++-
 .../action/hadoop/MapReduceActionExecutor.java  | 52 --------------------
 2 files changed, 11 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/e8e5bd0b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 980bd95..45a43bf 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -1506,9 +1507,17 @@ public class JavaActionExecutor extends ActionExecutor {
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
 
-            Configuration jobConf = createBaseHadoopConf(context, actionXml);
+            final Configuration jobConf = createBaseHadoopConf(context, actionXml);
+            String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action);
+            jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
+            jobConf.set(LauncherMain.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime()));
             yarnClient = createYarnClient(context, jobConf);
-            yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
+            for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL)){
+                yarnClient.killApplication(id);
+            }
+            if(action.getExternalId() != null) {
+                yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
+            }
 
             context.setExternalStatus(KILLED);
             context.setExecutionData(KILLED, null);

http://git-wip-us.apache.org/repos/asf/oozie/blob/e8e5bd0b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index c15e362..dfe19d1 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -398,58 +398,6 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    public void kill(final Context context, final WorkflowAction action) throws ActionExecutorException {
-        // Kill the LauncherAM which submits the MR job
-        super.kill(context, action);
-        // TODO if action.getExternalChildIDs() is not empty, then kill based on that
-        // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get
-        // the YARN ApplicationID based on the tag and kill it as well
-        YarnClient yarnClient = null;
-        try {
-            String tag = LauncherHelper.getTag(ActionExecutor.getActionYarnTag(new Configuration(),
-                    context.getWorkflow(), action));
-            GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
-            gar.setScope(ApplicationsRequestScope.ALL);
-            gar.setApplicationTags(Collections.singleton(tag));
-            Element actionXml = XmlUtils.parseXml(action.getConf());
-            Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
-            ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
-            GetApplicationsResponse apps = proxy.getApplications(gar);
-            List<ApplicationReport> appsList = apps.getApplicationList();
-
-            if (appsList.size() > 1) {
-                String applications = Joiner.on(",").join(Iterables.transform(appsList, new Function<ApplicationReport, String>() {
-                    @Override
-                    public String apply(@Nonnull ApplicationReport input) {
-                        return input.toString();
-                    }
-                }));
-
-                LOG.error("Too many applications were returned: {0}", applications);
-                throw new IllegalArgumentException("Too many applications were returned");
-            } else if (appsList.size() == 1) {
-
-                yarnClient = YarnClient.createYarnClient();
-                yarnClient.init(actionConf);
-                yarnClient.start();
-
-                ApplicationReport app = appsList.get(0);
-                LOG.info("Killing MapReduce job {0}, YARN Id: {1}", action.getExternalChildIDs(),
-                        app.getApplicationId().toString());
-                yarnClient.killApplication(app.getApplicationId());
-            } else {
-                LOG.info("No MapReduce job to kill");
-            }
-        } catch (Exception e) {
-            throw convertException(e);
-        } finally {
-            if (yarnClient != null) {
-                Closeables.closeQuietly(yarnClient);
-            }
-        }
-    }
-
-    @Override
     void injectActionCallback(Context context, Configuration actionConf) {
         injectCallback(context, actionConf);
     }