You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2018/08/01 21:10:01 UTC

oozie git commit: OOZIE-3193 Applications are not killed when submitted via subworkflow (kmarton via gezapeti, andras.piros)

Repository: oozie
Updated Branches:
  refs/heads/master 6c01dc7ff -> 05636799e


OOZIE-3193 Applications are not killed when submitted via subworkflow (kmarton via gezapeti, andras.piros)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/05636799
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/05636799
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/05636799

Branch: refs/heads/master
Commit: 05636799ea12c348474205fb7379d70ab5ef9395
Parents: 6c01dc7
Author: Gezapeti Cseh <ge...@apache.org>
Authored: Wed Aug 1 23:09:52 2018 +0200
Committer: Gezapeti Cseh <ge...@apache.org>
Committed: Wed Aug 1 23:09:52 2018 +0200

----------------------------------------------------------------------
 .../org/apache/oozie/action/ActionExecutor.java |   2 +-
 .../oozie/action/hadoop/JavaActionExecutor.java |   6 +-
 .../oozie/action/hadoop/LauncherMainTester.java |  64 ++++-
 .../hadoop/SleepMapperReducerForTest.java       |  68 +++++
 .../oozie/TestSubWorkflowActionExecutor.java    | 249 +++++++++++++------
 release-log.txt                                 |   1 +
 6 files changed, 307 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
index 71bd36a..1770b97 100644
--- a/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ActionExecutor.java
@@ -603,7 +603,7 @@ public abstract class ActionExecutor {
      * @return the action yarn tag
      */
     public static String getActionYarnTag(Configuration conf, WorkflowJob wfJob, WorkflowAction action) {
-        if (conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+        if (conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
             return conf.get(OOZIE_ACTION_YARN_TAG) + "@" + action.getName();
         }
         else if (wfJob.getParentId() != null) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index a1a9671..8f0f244 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -1777,7 +1777,7 @@ public class JavaActionExecutor extends ActionExecutor {
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
             final Configuration jobConf = createBaseHadoopConf(context, actionXml);
-            String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action);
+            String launcherTag = getActionYarnTag(context, action);
             jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
             yarnClient = createYarnClient(context, jobConf);
             if(action.getExternalId() != null) {
@@ -1828,6 +1828,10 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
+    private String getActionYarnTag(Context context, WorkflowAction action) {
+        return LauncherHelper.getActionYarnTag(context.getProtoActionConf(), context.getWorkflow().getParentId(), action);
+    }
+
     private static Set<String> FINAL_STATUS = new HashSet<String>();
 
     static {

http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
index fdca570..ada7005 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
@@ -18,14 +18,31 @@
 
 package org.apache.oozie.action.hadoop;
 
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TextInputFormat;
+import org.apache.hadoop.mapred.TextOutputFormat;
+
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.io.Writer;
 import java.util.Properties;
 
 public class LauncherMainTester {
 
+    public static final String JOB_ID_FILE_NAME = "jobID.txt";
+
     public static void main(String[] args) throws Throwable {
         if (args.length == 0) {
             System.out.println("Hello World!");
@@ -84,10 +101,55 @@ public class LauncherMainTester {
                 sm.checkPermission(null, sm.getSecurityContext());
             }
         }
-
+        if(args.length == 3) {
+            if(args[0].equals("javamapreduce")) {
+                executeJavaMapReduce(args);
+            }
+        }
         checkAndSleep(args);
     }
 
+    private static void executeJavaMapReduce(String[] args) throws IOException, InterruptedException {
+        JobConf jConf = createSleepMapperReducerJobConf();
+        final Path input = new Path(args[1]);
+        FileInputFormat.setInputPaths(jConf, input);
+        FileOutputFormat.setOutputPath(jConf, new Path(args[2]));
+        writeToFile(input, jConf, "dummy\n", "data.txt");
+        JobClient jc = new JobClient(jConf);
+        System.out.println("Submitting MR job");
+        RunningJob job = jc.submitJob(jConf);
+        System.out.println("Submitted job " + job.getID().toString());
+        writeToFile(input, jConf, job.getID().toString(), JOB_ID_FILE_NAME);
+        job.waitForCompletion();
+        jc.monitorAndPrintJob(jConf, job);
+        if (job.getJobState() != JobStatus.SUCCEEDED) {
+            System.err.println(job.getJobState() + " job state instead of" + JobStatus.SUCCEEDED);
+            System.exit(-1);
+        }
+    }
+
+    private static JobConf createSleepMapperReducerJobConf() {
+        JobConf jConf = new JobConf(true);
+        jConf.addResource(new Path("file:///", System.getProperty("oozie.action.conf.xml")));
+        jConf.setMapperClass(SleepMapperReducerForTest.class);
+        jConf.setReducerClass(SleepMapperReducerForTest.class);
+        jConf.setOutputKeyClass(Text.class);
+        jConf.setOutputValueClass(IntWritable.class);
+        jConf.setInputFormat(TextInputFormat.class);
+        jConf.setOutputFormat(TextOutputFormat.class);
+        jConf.setNumReduceTasks(1);
+        jConf.set(SleepMapperReducerForTest.SLEEP_TIME_MILLIS_KEY, "60000");
+        return jConf;
+    }
+
+    private static void writeToFile(Path input, JobConf jConf, String content, String fileName) throws IOException {
+        try (FileSystem fs = FileSystem.get(jConf);
+              Writer w = new OutputStreamWriter(fs.create(new Path(input, fileName)))) {
+            w.write(content);
+        }
+        System.out.println("Job Id written to file");
+    }
+
     private static void checkAndSleep(String args[]) throws InterruptedException {
         if (args.length == 2 && args[0].equals("sleep")) {
             long sleepTime = Long.parseLong(args[1]);

http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java b/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java
new file mode 100644
index 0000000..846441f
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/SleepMapperReducerForTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Mapper;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+public class SleepMapperReducerForTest implements Mapper<Object, Object, Object, Object>, Reducer<Object, Object, Object, Object> {
+    private long sleepTimeMillis = 20_000l;
+    public static final String SLEEP_TIME_MILLIS_KEY = "oozie.test.sleep.time.millis";
+
+    public static void main(String[] args) {
+        System.out.println("hello!");
+    }
+
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void map(Object key, Object value, OutputCollector<Object, Object> collector, Reporter reporter) throws IOException {
+        sleepUninterrupted(sleepTimeMillis, "Mapper sleeping for " + sleepTimeMillis + " millis.", "Mapper woke up");
+    }
+
+    @Override
+    public void reduce(Object key, Iterator<Object> values, OutputCollector<Object, Object> collector, Reporter reporter)
+            throws IOException {
+        sleepUninterrupted(sleepTimeMillis, "Reducer sleeping for " + sleepTimeMillis + " millis.", "Reducer woke up");
+    }
+
+    @Override
+    public void configure(JobConf jobConf) {
+        sleepTimeMillis = jobConf.getLong(SLEEP_TIME_MILLIS_KEY, sleepTimeMillis);
+        System.out.println("Configuring MR to sleep for" + sleepTimeMillis + " millis.");
+
+    }
+
+    private void sleepUninterrupted(long millis, String preSleepMessage, String postSleepMessage) {
+        try {
+            System.out.println(preSleepMessage);
+            Thread.sleep(millis);
+            System.out.println(postSleepMessage);
+        } catch (InterruptedException e) {
+        }
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
index e074d48..893405e 100644
--- a/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/oozie/TestSubWorkflowActionExecutor.java
@@ -21,15 +21,23 @@ package org.apache.oozie.action.oozie;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.hadoop.ActionExecutorTestCase;
 import org.apache.oozie.action.hadoop.LauncherMainTester;
+import org.apache.oozie.action.hadoop.OozieJobInfo;
 import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.OozieClientException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
+import org.apache.oozie.command.CommandException;
+import org.apache.oozie.command.wf.KillXCommand;
 import org.apache.oozie.command.wf.SuspendXCommand;
 import org.apache.oozie.local.LocalOozie;
+import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.XLogService;
@@ -37,8 +45,18 @@ import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 
-import java.io.*;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.io.PrintWriter;
+import java.io.StringReader;
+import java.io.Writer;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Properties;
 
 public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
@@ -483,45 +501,12 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
 
     public void testSubWorkflowSuspend() throws Exception {
         try {
-            Path subWorkflowAppPath = getFsTestCaseDir();
-            FileSystem fs = getFileSystem();
-            Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml");
-            Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath));
-            writer.write(getLazyWorkflow());
-            writer.close();
-
-            String workflowUri = getTestCaseFileUri("workflow.xml");
-            String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" +
-                    "<start to=\"subwf\"/>" +
-                    "<action name=\"subwf\">" +
-                    "     <sub-workflow xmlns='uri:oozie:workflow:0.4'>" +
-                    "          <app-path>" + subWorkflowAppPath.toString() + "</app-path>" +
-                    "     </sub-workflow>" +
-                    "     <ok to=\"end\"/>" +
-                    "     <error to=\"fail\"/>" +
-                    "</action>" +
-                    "<kill name=\"fail\">" +
-                    "     <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
-                    "</kill>" +
-                    "<end name=\"end\"/>" +
-                    "</workflow-app>";
-
-            writeToFile(appXml, workflowUri);
+            String workflowUri = createSubWorkflowWithLazyAction(true);
             LocalOozie.start();
             final OozieClient wfClient = LocalOozie.getClient();
-            Properties conf = wfClient.createConfiguration();
-            conf.setProperty(OozieClient.APP_PATH, workflowUri);
-            conf.setProperty(OozieClient.USER_NAME, getTestUser());
-            conf.setProperty("appName", "var-app-name");
-            final String jobId = wfClient.submit(conf);
-            wfClient.start(jobId);
+            final String jobId = submitWorkflow(workflowUri, wfClient);
 
-            waitFor(JOB_TIMEOUT, new Predicate() {
-                public boolean evaluate() throws Exception {
-                    return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) &&
-                            (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING);
-                }
-            });
+            waitForSubWFtoStart(wfClient, jobId);
             WorkflowJob wf = wfClient.getJobInfo(jobId);
             // Suspending subworkflow
             new SuspendXCommand(wf.getActions().get(1).getExternalId()).call();
@@ -536,6 +521,66 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
 
     }
 
+    public void testSubWorkflowKillExternalChild() throws Exception {
+        try {
+            LocalOozie.start();
+            final String workflowUri = createSubWorkflowWithLazyAction(true);
+            final OozieClient wfClient = LocalOozie.getClient();
+            final String jobId = submitWorkflow(workflowUri, wfClient);
+            final Configuration conf = Services.get().get(HadoopAccessorService.class).createConfiguration(getJobTrackerUri());
+
+            waitForSubWFtoStart(wfClient, jobId);
+
+            final ApplicationId externalChildJobId = getChildMRJobApplicationId(conf);
+            killWorkflow(jobId);
+            waitUntilYarnAppKilledAndAssertSuccess(externalChildJobId.toString());
+        } finally {
+            LocalOozie.stop();
+        }
+
+    }
+
+    private void killWorkflow(String jobId) throws CommandException {
+        new KillXCommand(jobId).call();
+    }
+
+    private ApplicationId getChildMRJobApplicationId(Configuration conf) throws IOException {
+        final List<ApplicationId> applicationIdList  = new ArrayList<>();
+        final Path inputDir = new Path(getFsTestCaseDir(), "input");
+        final Path wfIDFile = new Path(inputDir, LauncherMainTester.JOB_ID_FILE_NAME);
+        final FileSystem fs = FileSystem.get(conf);
+
+        // wait until we have the running child MR job's ID from HDFS
+        waitFor(JOB_TIMEOUT, new ApplicationIdExistsPredicate(fs, wfIDFile));
+
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(wfIDFile)))) {
+            String line = reader.readLine();
+            JobID.forName(line);
+            String jobID = line;
+            String appID = jobID.replace("job", "application");
+            ApplicationId id = ConverterUtils.toApplicationId(appID);
+            applicationIdList.add(id);
+        }
+
+        assertTrue("Application ID should've been found. No external Child ID was found in " + wfIDFile.toString(),
+                applicationIdList.size() == 1);
+        return applicationIdList.get(0);
+    }
+
+    private void waitForSubWFtoStart(OozieClient wfClient, String jobId) {
+        waitFor(JOB_TIMEOUT, new SubWorkflowActionRunningPredicate(wfClient,jobId));
+    }
+
+    private String submitWorkflow(String workflowUri, OozieClient wfClient) throws OozieClientException {
+        Properties conf = wfClient.createConfiguration();
+        conf.setProperty(OozieClient.APP_PATH, workflowUri);
+        conf.setProperty(OozieClient.USER_NAME, getTestUser());
+        conf.setProperty("appName", "var-app-name");
+        final String jobId = wfClient.submit(conf);
+        wfClient.start(jobId);
+        return jobId;
+    }
+
     private void writeToFile(String appXml, String appPath) throws IOException {
         // TODO Auto-generated method stub
         File wf = new File(URI.create(appPath));
@@ -554,16 +599,11 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
         }
     }
 
-    public String getLazyWorkflow() {
+    public String getLazyWorkflow(boolean launchMRAction) {
         return  "<workflow-app xmlns='uri:oozie:workflow:0.4' name='app'>" +
                 "<start to='java' />" +
                 "       <action name='java'>" +
-                "<java>" +
-                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-                "<name-node>" + getNameNodeUri() + "</name-node>" +
-                "<main-class>" + JavaSleepAction.class.getName() + "</main-class>" +
-                "<arg>exit0</arg>" +
-                "</java>"
+                getAction(launchMRAction)
                 + "<ok to='end' />"
                 + "<error to='fail' />"
                 + "</action>"
@@ -574,32 +614,29 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
                 + "</workflow-app>";
     }
 
+    private String getAction(boolean launchMRAction) {
+        Path inputDir = new Path(getFsTestCaseDir(), "input");
+        Path outputDir = new Path(getFsTestCaseDir(), "output");
+        String javaActionXml = "<java>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<main-class>" + JavaSleepAction.class.getName()+ "</main-class>" +
+                "</java>";
+        String javaWithMRActionXml = "<java>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<main-class>" + LauncherMainTester.class.getName()+ "</main-class>" +
+                "<arg>javamapreduce</arg>" +
+                "<arg>"+inputDir.toString()+"</arg>" +
+                "<arg>"+outputDir.toString()+"</arg>" +
+                "</java>";
+        String actionXml = launchMRAction ? javaWithMRActionXml : javaActionXml;
+        return actionXml;
+    }
+
     public void testSubWorkflowRerun() throws Exception {
         try {
-            Path subWorkflowAppPath = getFsTestCaseDir();
-            FileSystem fs = getFileSystem();
-            Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml");
-            Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath));
-            writer.write(getLazyWorkflow());
-            writer.close();
-
-            String workflowUri = getTestCaseFileUri("workflow.xml");
-            String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:0.4\" name=\"workflow\">" +
-                    "<start to=\"subwf\"/>" +
-                    "<action name=\"subwf\">" +
-                    "     <sub-workflow xmlns='uri:oozie:workflow:0.4'>" +
-                    "          <app-path>" + subWorkflowAppPath.toString() + "</app-path>" +
-                    "     </sub-workflow>" +
-                    "     <ok to=\"end\"/>" +
-                    "     <error to=\"fail\"/>" +
-                    "</action>" +
-                    "<kill name=\"fail\">" +
-                    "     <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
-                    "</kill>" +
-                    "<end name=\"end\"/>" +
-                    "</workflow-app>";
-
-            writeToFile(appXml, workflowUri);
+            String workflowUri = createSubWorkflowWithLazyAction(false);
             LocalOozie.start();
             final OozieClient wfClient = LocalOozie.getClient();
             Properties conf = wfClient.createConfiguration();
@@ -609,12 +646,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
             final String jobId = wfClient.submit(conf);
             wfClient.start(jobId);
 
-            waitFor(JOB_TIMEOUT, new Predicate() {
-                public boolean evaluate() throws Exception {
-                    return (wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING) &&
-                            (wfClient.getJobInfo(jobId).getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING);
-                }
-            });
+            waitForSubWFtoStart(wfClient, jobId);
 
             String subWorkflowExternalId = wfClient.getJobInfo(jobId).getActions().get(1).getExternalId();
             wfClient.kill(wfClient.getJobInfo(jobId).getActions().get(1).getExternalId());
@@ -647,6 +679,34 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
 
     }
 
+    private String createSubWorkflowWithLazyAction(boolean launchMRAction) throws IOException {
+        Path subWorkflowAppPath = getFsTestCaseDir();
+        FileSystem fs = getFileSystem();
+        Path subWorkflowPath = new Path(subWorkflowAppPath, "workflow.xml");
+        try (Writer writer = new OutputStreamWriter(fs.create(subWorkflowPath))) {
+            writer.write(getLazyWorkflow(launchMRAction));
+        }
+
+        String workflowUri = getTestCaseFileUri("workflow.xml");
+        String appXml = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"workflow\">" +
+                "<start to=\"subwf\"/>" +
+                "<action name=\"subwf\">" +
+                "     <sub-workflow xmlns='uri:oozie:workflow:1.0'>" +
+                "          <app-path>" + subWorkflowAppPath.toString() + "</app-path>" +
+                "     </sub-workflow>" +
+                "     <ok to=\"end\"/>" +
+                "     <error to=\"fail\"/>" +
+                "</action>" +
+                "<kill name=\"fail\">" +
+                "     <message>Sub workflow failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>" +
+                "</kill>" +
+                "<end name=\"end\"/>" +
+                "</workflow-app>";
+
+        writeToFile(appXml, workflowUri);
+        return workflowUri;
+    }
+
     public void testParentGlobalConf() throws Exception {
         try {
             Path subWorkflowAppPath = createSubWorkflowXml();
@@ -654,12 +714,7 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
             String workflowUri = createTestWorkflowXml(subWorkflowAppPath);
             LocalOozie.start();
             final OozieClient wfClient = LocalOozie.getClient();
-            Properties conf = wfClient.createConfiguration();
-            conf.setProperty(OozieClient.APP_PATH, workflowUri);
-            conf.setProperty(OozieClient.USER_NAME, getTestUser());
-            conf.setProperty("appName", "var-app-name");
-            final String jobId = wfClient.submit(conf);
-            wfClient.start(jobId);
+            final String jobId = submitWorkflow(workflowUri, wfClient);
 
             waitFor(JOB_TIMEOUT, new Predicate() {
                 public boolean evaluate() throws Exception {
@@ -870,4 +925,38 @@ public class TestSubWorkflowActionExecutor extends ActionExecutorTestCase {
                 + "<end name='end' />"
                 + "</workflow-app>";
     }
+
+    private static class ApplicationIdExistsPredicate implements Predicate {
+
+        private final FileSystem fs;
+        private final Path wfIDFile;
+
+        public ApplicationIdExistsPredicate(FileSystem fs, Path wfIDFile) {
+            this.fs = fs;
+            this.wfIDFile = wfIDFile;
+        }
+
+        @Override
+        public boolean evaluate() throws Exception {
+            return fs.exists(wfIDFile) && fs.getFileStatus(wfIDFile).getLen() > 0;
+        }
+    }
+
+    private static class SubWorkflowActionRunningPredicate implements Predicate {
+        private final OozieClient wfClient;
+        private final String jobId;
+
+        public SubWorkflowActionRunningPredicate(OozieClient wfClient, String jobId) {
+            this.wfClient = wfClient;
+            this.jobId = jobId;
+        }
+
+        @Override
+        public boolean evaluate() throws Exception {
+            boolean isSubWfRunning = wfClient.getJobInfo(jobId).getStatus() == WorkflowJob.Status.RUNNING;
+            boolean isSubWfActionRunning = wfClient.getJobInfo(jobId)
+                    .getActions().get(1).getStatus() == WorkflowAction.Status.RUNNING;
+            return isSubWfRunning && isSubWfActionRunning;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/05636799/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f04eeb2..48dbceb 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3193 Applications are not killed when submitted via subworkflow (kmarton via gezapeti, andras.piros)
 OOZIE-3310 SQL error during /v2/sla filtering (asalamon74 via andras.piros)
 OOZIE-2942 [examples] Fix Findbugs warnings (Jan Hentschel, kmarton via andras.piros)
 OOZIE-2718 Improve -dryrun for bundles (zhengxb2005, asalamon74 via andras.piros)