You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2016/12/13 13:19:38 UTC
[07/48] oozie git commit: OOZIE-2594 correctly implement
MapReduceActionExecutor.kill()
OOZIE-2594 correctly implement MapReduceActionExecutor.kill()
Change-Id: Ia091bd3943f4abf1b4e9c505a01fbb926fceac91
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/782837fc
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/782837fc
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/782837fc
Branch: refs/heads/oya
Commit: 782837fcef594ae73a46a620923fb69a8248d1de
Parents: 61f3a9f
Author: Peter Bacsko <pb...@cloudera.com>
Authored: Fri Nov 11 14:42:00 2016 +0100
Committer: Peter Bacsko <pb...@cloudera.com>
Committed: Fri Nov 11 17:36:56 2016 +0100
----------------------------------------------------------------------
.../org/apache/oozie/action/ActionExecutor.java | 2 +-
.../action/hadoop/MapReduceActionExecutor.java | 41 ++++++++++++++++++++
.../wf/TestWorkflowActionKillXCommand.java | 14 -------
.../org/apache/oozie/test/XDataTestCase.java | 2 +-
4 files changed, 43 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 1d6456b..919509d 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -596,7 +596,7 @@ public abstract class ActionExecutor {
* @param action the action
* @return the action yarn tag
*/
- public String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
+ public static String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName();
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
index 1b975ab..e97de7e 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/MapReduceActionExecutor.java
@@ -33,6 +33,14 @@ import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.service.ConfigurationService;
@@ -393,6 +401,39 @@ public class MapReduceActionExecutor extends JavaActionExecutor {
}
@Override
+ public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
+ // Kill the LauncherAM which submits the MR job
+ super.kill(context, action);
+
+ // We have to check whether the MapReduce execution has started or not. If it has started, then we have to get
+ // the YARN ApplicationID based on the tag and kill it as well
+
+ // TODO: this must be tested in TestMapReduceActionExecutor
+ try {
+ String tag = ActionExecutor.getActionYarnTag(new Configuration(), context.getWorkflow(), action);
+ GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+ gar.setScope(ApplicationsRequestScope.ALL);
+ gar.setApplicationTags(Collections.singleton(tag));
+ Element actionXml = XmlUtils.parseXml(action.getConf());
+ Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
+ ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
+ GetApplicationsResponse apps = proxy.getApplications(gar);
+ List<ApplicationReport> appsList = apps.getApplicationList();
+
+ YarnClient yarnClient = YarnClient.createYarnClient();
+ yarnClient.init(actionConf);
+ yarnClient.start();
+
+ for (ApplicationReport app : appsList) {
+ LOG.info("Killing MapReduce job {0}", app.getApplicationId().toString());
+ yarnClient.killApplication(app.getApplicationId());
+ }
+ } catch (Exception e) {
+ throw convertException(e);
+ }
+ }
+
+ @Override
void injectActionCallback(Context context, Configuration actionConf) {
injectCallback(context, actionConf);
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index 71b46d1..ef75f14 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -115,20 +115,6 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
assertEquals(action.getExternalStatus(), "RUNNING");
}
- // FIXME - fix JAE.kill()
- public void testWfActionKillChildJob() throws Exception {
- String externalJobID = launchSleepJob(1000);
- String childId = launchSleepJob(1000000);
-
- WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
- WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), externalJobID, "1",
- WorkflowAction.Status.KILLED, childId);
-
- new ActionKillXCommand(action.getId()).call();
-
- waitUntilYarnAppKilledAndAssertSuccess(childId);
- }
-
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName,
WorkflowAction.Status status, String childID) throws Exception {
WorkflowActionBean action = new WorkflowActionBean();
http://git-wip-us.apache.org/repos/asf/oozie/blob/782837fc/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
index ea778bd..2105e2f 100644
--- a/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XDataTestCase.java
@@ -1452,7 +1452,7 @@ public abstract class XDataTestCase extends XHCatTestCase {
action.setUserRetryMax(2);
action.setUserRetryInterval(1);
action.setErrorInfo("dummyErrorCode", "dummyErrorMessage");
- action.setExternalId("dummy external id");
+ action.setExternalId("application_1234567890123_0001");
action.setExternalStatus("RUNNING");
return action;