You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2018/08/01 21:10:01 UTC
oozie git commit: OOZIE-3193 Applications are not killed when
submitted via subworkflow (kmarton via gezapeti, andras.piros)
Repository: oozie
Updated Branches:
refs/heads/master 6c01dc7ff -> 05636799e
OOZIE-3193 Applications are not killed when submitted via subworkflow (kmarton via gezapeti, andras.piros)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/05636799
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/05636799
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/05636799
Branch: refs/heads/master
Commit: 05636799ea12c348474205fb7379d70ab5ef9395
Parents: 6c01dc7
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed Aug 1 23:09:52 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed Aug 1 23:09:52 2018 +0200
----------------------------------------------------------------------
.../org/apache/oozie/action/ActionExecutor.java | 2 +-
.../oozie/action/hadoop/JavaActionExecutor.java | 6 +-
.../oozie/action/hadoop/LauncherMainTester.java | 64 ++++-
.../hadoop/SleepMapperReducerForTest.java | 68 +++++
.../oozie/TestSubWorkflowActionExecutor.java | 249 +++++++++++++------
release-log.txt | 1 +
6 files changed, 307 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 71bd36a..1770b97 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -603,7 +603,7 @@ public abstract class ActionExecutor {
* @return the action yarn tag
*/
public static String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
- if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+ if (conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName();
}
else if (wfJob.getParentId() != null) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index a1a9671..8f0f244 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -1777,7 +1777,7 @@ public class JavaActionExecutor extends ActionExecutor {
try {
Element actionXml = XmlUtils.parseXml(action.getConf());
final Configuration jobConf = createBaseHadoopConf(context, actionXml);
- String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action);
+ String launcherTag = getActionYarnTag(context, action);
jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
yarnClient = createYarnClient(context, jobConf);
if(action.getExternalId() != null) {
@@ -1828,6 +1828,10 @@ public class JavaActionExecutor extends ActionExecutor {
}
}
+ private String getActionYarnTag(Context context, WorkflowAction action) {
+ return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action);
+ }
+
private static Set<String> FINAL_STATUS = new HashSet<String>();
static {
http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
index fdca570..ada7005 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
@@ -18,14 +18,31 @@
package org.apache.oozie.action.hadoop;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
import java.util.Properties;
public class LauncherMainTester {
+ public static final String JOB_ID_FILE_NAME = "jobID.txt";
+
public static void main(String[] args) throws Throwable {
if (args.length == 0) {
System.out.println("Hello World!");
@@ -84,10 +101,55 @@ public class LauncherMainTester {
sm.checkPermission(null, sm.getSecurityContext());
}
}
-
+ if(args.length == 3) {
+ if(args[0].equals("javamapreduce")) {
+ executeJavaMapReduce(args);
+ }
+ }
checkAndSleep(args);
}
+ private static void executeJavaMapReduce(String[] args) throws IOException, InterruptedException {
+ JobConf jConf = createSleepMapperReducerJobConf();
+ final Path input = new Path(args[1]);
+ FileInputFormat.setInputPaths(jConf, input);
+ FileOutputFormat.setOutputPath(jConf, new Path(args[2]));
+ writeToFile(input, jConf, "dummy\n", "data.txt");
+ JobClient jc = new JobClient(jConf);
+ System.out.println("Submitting MR job");
+ RunningJob job = jc.submitJob(jConf);
+ System.out.println("Submitted job " + job.getID().toString());
+ writeToFile(input, jConf, job.getID().toString(), JOB_ID_FILE_NAME);
+ job.waitForCompletion();
+ jc.monitorAndPrintJob(jConf, job);
+ if (job.getJobState() != JobStatus.SUCCEEDED) {
+ System.err.println(job.getJobState() + " job state instead of" + JobStatus.SUCCEEDED);
+ System.exit(-1);
+ }
+ }
+
+ private static JobConf createSleepMapperReducerJobConf() {
+ JobConf jConf = new JobConf(true);
+ jConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+ jConf.setMapperClass(SleepMapperReducerForTest.class);
+ jConf.setReducerClass(SleepMapperReducerForTest.class);
+ jConf.setOutputKeyClass(Text.class);
+ jConf.setOutputValueClass(IntWritable.class);
+ jConf.setInputFormat(TextInputFormat.class);
+ jConf.setOutputFormat(TextOutputFormat.class);
+ jConf.setNumReduceTasks(1);
+ jConf.set(SleepMapperReducerForTest.SLEEP_TIME_MILLIS_KEY, "60000");
+ return jConf;
+ }
+
+ private static void writeToFile(Path input, JobConf jConf, String content, String fileName) throws IOException {
+ try (FileSystem fs = FileSystem.get(jConf);
+ Writer w = new OutputStreamWriter(fs.create(new Path(input, fileName)))) {
+ w.write(content);
+ }
+ System.out.println("Job Id written to file");
+ }
+
private static void checkAndSleep(String args[]) throws InterruptedException {
if (args.length == 2 && args[0].equals("sleep")) {
long sleepTime = Long.parseLong(args[1]);
http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java b/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java
new file mode 100644
index 0000000..846441f
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class SleepMapperReducerForTest implements Mapper<Object, Object, Object, Object>, Reducer<Object, Object, Object, Object> {
+ private long sleepTimeMillis = 20_000l;
+ public static final String SLEEP_TIME_MILLIS_KEY = "oozie.test.sleep.time.millis";
+
+ public static void main(String[] args) {
+ System.out.println("hello!");
+ }
+
+ public void close() throws IOException {
+ }
+
+ @Override
+ public void map(Object key, Object value, OutputCollector<Object, Object> collector, Reporter reporter) throws IOException {
+ sleepUninterrupted(sleepTimeMillis, "Mapper sleeping for " + sleepTimeMillis + " millis.", "Mapper woke up");
+ }
+
+ @Override
+ public void reduce(Object key, Iterator<Object> values, OutputCollector<Object, Object> collector, Reporter reporter)
+ throws IOException {
+ sleepUninterrupted(sleepTimeMillis, "Reducer sleeping for " + sleepTimeMillis + " millis.", "Reducer woke up");
+ }
+
+ @Override
+ public void configure(JobConf jobConf) {
+ sleepTimeMillis = jobConf.getLong(SLEEP_TIME_MILLIS_KEY, sleepTimeMillis);
+ System.out.println("Configuring MR to sleep for" + sleepTimeMillis + " millis.");
+
+ }
+
+ private void sleepUninterrupted(long millis, String preSleepMessage, String postSleepMessage) {
+ try {
+ System.out.println(preSleepMessage);
+ Thread.sleep(millis);
+ System.out.println(postSleepMessage);
+ } catch (InterruptedException e) {
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
index e074d48..893405e 100644
--- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
@@ -21,15 +21,23 @@ package org.apache.oozie.action.oozie;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.hadoop.ActionExecutorTestCase;
import org.apache.oozie.action.hadoop.LauncherMainTester;
+import org.apache.oozie.action.hadoop.OozieJobInfo;
import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.wf.KillXCommand;
import org.apache.oozie.command.wf.SuspendXCommand;
import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.XLogService;
@@ -37,8 +45,18 @@ import org.apache.oozie.util.XConfiguration;
import org.apache.oozie.util.XmlUtils;
import org.jdom.Element;
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.Writer;
import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
@@ -483,45 +501,12 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
public void testSubWorkflowSuspend() throws Exception {
try {
- Path subWorkflowAppPath = getFsTestCaseDir();
- FileSystem fs = getFileSystem();
- Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml");
- Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath));
- writer.write(getLazyWorkflow());
- writer.close();
-
- String workflowUri = getTestCaseFileUri("workflow.xml");
- String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" +
- "<start to=\"subwf\"/>" +
- "<action name=\"subwf\">" +
- " <sub-workflow xmlns='uri:oozie:workflow:0.4'>" +
- " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" +
- " </sub-workflow>" +
- " <ok to=\"end\"/>" +
- " <error to=\"fail\"/>" +
- "</action>" +
- "<kill name=\"fail\">" +
- " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
- "</kill>" +
- "<end name=\"end\"/>" +
- "</workflow-app>";
-
- writeToFile(appXml, workflowUri);
+ String workflowUri = createSubWorkflowWithLazyAction(true);
LocalOozie.start();
final OozieClient wfClient = LocalOozie.getClient();
- Properties conf = wfClient.createConfiguration();
- conf.setProperty(OozieClient.APP_PATH, workflowUri);
- conf.setProperty(OozieClient.USER_NAME, getTestUser());
- conf.setProperty("appName", "var-app-name");
- final String jobId = wfClient.submit(conf);
- wfClient.start(jobId);
+ final String jobId = submitWorkflow(workflowUri, wfClient);
- waitFor(JOB_TIMEOUT, new Predicate() {
- public boolean evaluate() throws Exception {
- return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) &&
- (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING);
- }
- });
+ waitForSubWFtoStart(wfClient, jobId);
WorkflowJob wf = wfClient.getJobInfo(jobId);
// Suspending subworkflow
new SuspendXCommand(wf.getActions().get(1).getExternalId()).call();
@@ -536,6 +521,66 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
}
+ public void testSubWorkflowKillExternalChild() throws Exception {
+ try {
+ LocalOozie.start();
+ final String workflowUri = createSubWorkflowWithLazyAction(true);
+ final OozieClient wfClient = LocalOozie.getClient();
+ final String jobId = submitWorkflow(workflowUri, wfClient);
+ final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
+
+ waitForSubWFtoStart(wfClient, jobId);
+
+ final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf);
+ killWorkflow(jobId);
+ waitUntilYarnAppKilledAndAssertSuccess(externalChildJobId.toString());
+ } finally {
+ LocalOozie.stop();
+ }
+
+ }
+
+ private void killWorkflow(String jobId) throws CommandException {
+ new KillXCommand(jobId).call();
+ }
+
+ private ApplicationId getChildMRJobApplicationId(Configuration conf) throws IOException {
+ final List<ApplicationId> applicationIdList = new ArrayList<>();
+ final Path inputDir = new Path(getFsTestCaseDir(), "input");
+ final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME);
+ final FileSystem fs = FileSystem.get(conf);
+
+ // wait until we have the running child MR job's ID from HDFS
+ waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile));
+
+ try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(wfIDFile)))) {
+ String line = reader.readLine();
+ JobID.forName(line);
+ String jobID = line;
+ String appID = jobID.replace("job", "application");
+ ApplicationId id = ConverterUtils.toApplicationId(appID);
+ applicationIdList.add(id);
+ }
+
+ assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(),
+ applicationIdList.size() == 1);
+ return applicationIdList.get(0);
+ }
+
+ private void waitForSubWFtoStart(OozieClient wfClient, String jobId) {
+ waitFor(JOB_TIMEOUT, new SubWorkflowActionRunningPredicate(wfClient,jobId));
+ }
+
+ private String submitWorkflow(String workflowUri, OozieClient wfClient) throws OozieClientException {
+ Properties conf = wfClient.createConfiguration();
+ conf.setProperty(OozieClient.APP_PATH, workflowUri);
+ conf.setProperty(OozieClient.USER_NAME, getTestUser());
+ conf.setProperty("appName", "var-app-name");
+ final String jobId = wfClient.submit(conf);
+ wfClient.start(jobId);
+ return jobId;
+ }
+
private void writeToFile(String appXml, String appPath) throws IOException {
// TODO Auto-generated method stub
File wf = new File(URI.create(appPath));
@@ -554,16 +599,11 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
}
}
- public String getLazyWorkflow() {
+ public String getLazyWorkflow(boolean launchMRAction) {
return "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" +
"<start to='java' />" +
" <action name='java'>" +
- "<java>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<main-class>" + JavaSleepAction.class.getName() + "</main-class>" +
- "<arg>exit0</arg>" +
- "</java>"
+ getAction(launchMRAction)
+ "<ok to='end' />"
+ "<error to='fail' />"
+ "</action>"
@@ -574,32 +614,29 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
+ "</workflow-app>";
}
+ private String getAction(boolean launchMRAction) {
+ Path inputDir = new Path(getFsTestCaseDir(), "input");
+ Path outputDir = new Path(getFsTestCaseDir(), "output");
+ String javaActionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" +
+ "</java>";
+ String javaWithMRActionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" +
+ "<arg>javamapreduce</arg>" +
+ "<arg>"+inputDir.toString()+"</arg>" +
+ "<arg>"+outputDir.toString()+"</arg>" +
+ "</java>";
+ String actionXml = launchMRAction ? javaWithMRActionXml : javaActionXml;
+ return actionXml;
+ }
+
public void testSubWorkflowRerun() throws Exception {
try {
- Path subWorkflowAppPath = getFsTestCaseDir();
- FileSystem fs = getFileSystem();
- Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml");
- Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath));
- writer.write(getLazyWorkflow());
- writer.close();
-
- String workflowUri = getTestCaseFileUri("workflow.xml");
- String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" +
- "<start to=\"subwf\"/>" +
- "<action name=\"subwf\">" +
- " <sub-workflow xmlns='uri:oozie:workflow:0.4'>" +
- " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" +
- " </sub-workflow>" +
- " <ok to=\"end\"/>" +
- " <error to=\"fail\"/>" +
- "</action>" +
- "<kill name=\"fail\">" +
- " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
- "</kill>" +
- "<end name=\"end\"/>" +
- "</workflow-app>";
-
- writeToFile(appXml, workflowUri);
+ String workflowUri = createSubWorkflowWithLazyAction(false);
LocalOozie.start();
final OozieClient wfClient = LocalOozie.getClient();
Properties conf = wfClient.createConfiguration();
@@ -609,12 +646,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
final String jobId = wfClient.submit(conf);
wfClient.start(jobId);
- waitFor(JOB_TIMEOUT, new Predicate() {
- public boolean evaluate() throws Exception {
- return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) &&
- (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING);
- }
- });
+ waitForSubWFtoStart(wfClient, jobId);
String subWorkflowExternalId = wfClient.getJobInfo(jobId).getActions().get(1).getExternalId();
wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId());
@@ -647,6 +679,34 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
}
+ private String createSubWorkflowWithLazyAction(boolean launchMRAction) throws IOException {
+ Path subWorkflowAppPath = getFsTestCaseDir();
+ FileSystem fs = getFileSystem();
+ Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml");
+ try (Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath))) {
+ writer.write(getLazyWorkflow(launchMRAction));
+ }
+
+ String workflowUri = getTestCaseFileUri("workflow.xml");
+ String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"workflow\">" +
+ "<start to=\"subwf\"/>" +
+ "<action name=\"subwf\">" +
+ " <sub-workflow xmlns='uri:oozie:workflow:1.0'>" +
+ " <app-path>" + subWorkflowAppPath.toString() + "</app-path>" +
+ " </sub-workflow>" +
+ " <ok to=\"end\"/>" +
+ " <error to=\"fail\"/>" +
+ "</action>" +
+ "<kill name=\"fail\">" +
+ " <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
+ "</kill>" +
+ "<end name=\"end\"/>" +
+ "</workflow-app>";
+
+ writeToFile(appXml, workflowUri);
+ return workflowUri;
+ }
+
public void testParentGlobalConf() throws Exception {
try {
Path subWorkflowAppPath = createSubWorkflowXml();
@@ -654,12 +714,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
String workflowUri = createTestWorkflowXml(subWorkflowAppPath);
LocalOozie.start();
final OozieClient wfClient = LocalOozie.getClient();
- Properties conf = wfClient.createConfiguration();
- conf.setProperty(OozieClient.APP_PATH, workflowUri);
- conf.setProperty(OozieClient.USER_NAME, getTestUser());
- conf.setProperty("appName", "var-app-name");
- final String jobId = wfClient.submit(conf);
- wfClient.start(jobId);
+ final String jobId = submitWorkflow(workflowUri, wfClient);
waitFor(JOB_TIMEOUT, new Predicate() {
public boolean evaluate() throws Exception {
@@ -870,4 +925,38 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
+ "<end name='end' />"
+ "</workflow-app>";
}
+
+ private static class ApplicationIdExistsPredicate implements Predicate {
+
+ private final FileSystem fs;
+ private final Path wfIDFile;
+
+ public ApplicationIdExistsPredicate(FileSystem fs, Path wfIDFile) {
+ this.fs = fs;
+ this.wfIDFile = wfIDFile;
+ }
+
+ @Override
+ public boolean evaluate() throws Exception {
+ return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0;
+ }
+ }
+
+ private static class SubWorkflowActionRunningPredicate implements Predicate {
+ private final OozieClient wfClient;
+ private final String jobId;
+
+ public SubWorkflowActionRunningPredicate(OozieClient wfClient, String jobId) {
+ this.wfClient = wfClient;
+ this.jobId = jobId;
+ }
+
+ @Override
+ public boolean evaluate() throws Exception {
+ boolean isSubWfRunning = wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING;
+ boolean isSubWfActionRunning = wfClient.getJobInfo(jobId)
+ .getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING;
+ return isSubWfRunning && isSubWfActionRunning;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f04eeb2..48dbceb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3193 Applications are not killed when submitted via subworkflow (kmarton via gezapeti, andras.piros)
OOZIE-3310 SQL error during /v2/sla filtering (asalamon74 via andras.piros)
OOZIE-2942 [examples] Fix Findbugs warnings (Jan Hentschel, kmarton via andras.piros)
OOZIE-2718 Improve -dryrun for bundles (zhengxb2005, asalamon74 via andras.piros)