You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:38 UTC

[07/48] oozie git commit: OOZIE-2594 correctly implement MapReduceActionExecutor.kill()

OOZIE-2594 correctly implement MapReduceActionExecutor.kill()

Change-Id: Ia091bd3943f4abf1b4e9c505a01fbb926fceac91


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/782837fc
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/782837fc
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/782837fc

Branch: refs/heads/oya
Commit: 782837fcef594ae73a46a620923fb69a8248d1de
Parents: 61f3a9f
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Fri Nov 11 14:42:00 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Fri Nov 11 17:36:56 2016 +0100

----------------------------------------------------------------------
 .../org/apache/oozie/action/ActionExecutor.java |  2 +-
 .../action/hadoop/MapReduceActionExecutor.java  | 41 ++++++++++++++++++++
 .../wf/TestWorkflowActionKillXCommand.java      | 14 -------
 .../org/apache/oozie/test/XDataTestCase.java    |  2 +-
 4 files changed, 43 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 1d6456b..919509d 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -596,7 +596,7 @@ public abstract class ActionExecutor {
      * @param action the action
      * @return the action yarn tag
      */
-    public String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
+    public static String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
         if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
             return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName();
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 1b975ab..e97de7e 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -33,6 +33,14 @@ import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
@@ -393,6 +401,39 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
     }
 
     @Override
+    public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
+        // Kill the LauncherAM which submits the MR job
+        super.kill(context, action);
+
+        // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get
+        // the YARN ApplicationID based on the tag and kill it as well
+
+        // TODO: this must be tested in TestMapReduceActionExecutor
+        try {
+            String tag = ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action);
+            GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+            gar.setScope(ApplicationsRequestScope.ALL);
+            gar.setApplicationTags(Collections.singleton(tag));
+            Element actionXml = XmlUtils.parseXml(action.getConf());
+            Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
+            ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
+            GetApplicationsResponse apps = proxy.getApplications(gar);
+            List<ApplicationReport> appsList = apps.getApplicationList();
+
+            YarnClient yarnClient = YarnClient.createYarnClient();
+            yarnClient.init(actionConf);
+            yarnClient.start();
+
+            for (ApplicationReport app : appsList) {
+                LOG.info("Killing MapReduce job {0}", app.getApplicationId().toString());
+                yarnClient.killApplication(app.getApplicationId());
+            }
+        } catch (Exception e) {
+            throw convertException(e);
+        }
+    }
+
+    @Override
     void injectActionCallback(Context context, Configuration actionConf) {
         injectCallback(context, actionConf);
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index 71b46d1..ef75f14 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -115,20 +115,6 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
         assertEquals(action.getExternalStatus(), "RUNNING");
     }
 
-    // FIXME - fix JAE.kill()
-    public void testWfActionKillChildJob() throws Exception {
-        String externalJobID = launchSleepJob(1000);
-        String childId = launchSleepJob(1000000);
-
-        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
-        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), externalJobID, "1",
-                WorkflowAction.Status.KILLED, childId);
-
-        new ActionKillXCommand(action.getId()).call();
-
-        waitUntilYarnAppKilledAndAssertSuccess(childId);
-    }
-
     protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName,
             WorkflowAction.Status status, String childID) throws Exception {
         WorkflowActionBean action = new WorkflowActionBean();

http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index ea778bd..2105e2f 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -1452,7 +1452,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
         action.setUserRetryMax(2);
         action.setUserRetryInterval(1);
         action.setErrorInfo("dummyErrorCode", "dummyErrorMessage");
-        action.setExternalId("dummy external id");
+        action.setExternalId("application_1234567890123_0001");
         action.setExternalStatus("RUNNING");
 
         return action;