You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:27:55 UTC

[03/10] oozie git commit: OOZIE-1770 Create Oozie Application Master for YARN (asasvari, pbacsko, rkanter, gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/pom.xml b/sharelib/hcatalog/pom.xml
index 6eb88ef..fea277f 100644
--- a/sharelib/hcatalog/pom.xml
+++ b/sharelib/hcatalog/pom.xml
@@ -297,18 +297,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive/pom.xml b/sharelib/hive/pom.xml
index 1331219..7268fa9 100644
--- a/sharelib/hive/pom.xml
+++ b/sharelib/hive/pom.xml
@@ -122,18 +122,10 @@
             <classifier>tests</classifier>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <scope>provided</scope>
-        </dependency>
-
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-minicluster</artifactId>
         </dependency>
-
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
@@ -150,8 +142,8 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-hadoop-utils</artifactId>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
@@ -184,18 +176,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
index 6a600fa..1f88c85 100644
--- a/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
+++ b/sharelib/hive/src/main/java/org/apache/oozie/action/hadoop/HiveMain.java
@@ -233,8 +233,11 @@ public class HiveMain extends LauncherMain {
             File localDir = new File("dummy").getAbsoluteFile().getParentFile();
             System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
             System.out.println("------------------------");
-            for (String file : localDir.list()) {
-                System.out.println("  " + file);
+            String[] files = localDir.list();
+            if (files != null) {
+                for (String file : files) {
+                    System.out.println("  " + file);
+                }
             }
             System.out.println("------------------------");
             System.out.println();
@@ -264,7 +267,7 @@ public class HiveMain extends LauncherMain {
         }
 
         // Pass any parameters to Hive via arguments
-        String[] params = MapReduceMain.getStrings(hiveConf, HiveActionExecutor.HIVE_PARAMS);
+        String[] params = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_PARAMS);
         if (params.length > 0) {
             System.out.println("Parameters:");
             System.out.println("------------------------");
@@ -284,7 +287,7 @@ public class HiveMain extends LauncherMain {
             System.out.println();
         }
 
-        String[] hiveArgs = MapReduceMain.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
+        String[] hiveArgs = ActionUtils.getStrings(hiveConf, HiveActionExecutor.HIVE_ARGS);
         for (String hiveArg : hiveArgs) {
             if (DISALLOWED_HIVE_OPTIONS.contains(hiveArg)) {
                 throw new RuntimeException("Error: Hive argument " + hiveArg + " is not supported");
@@ -298,7 +301,7 @@ public class HiveMain extends LauncherMain {
         }
         System.out.println();
 
-        LauncherMainHadoopUtils.killChildYarnJobs(hiveConf);
+        LauncherMain.killChildYarnJobs(hiveConf);
 
         System.out.println("=================================================================");
         System.out.println();
@@ -309,13 +312,6 @@ public class HiveMain extends LauncherMain {
         try {
             runHive(arguments.toArray(new String[arguments.size()]));
         }
-        catch (SecurityException ex) {
-            if (LauncherSecurityManager.getExitInvoked()) {
-                if (LauncherSecurityManager.getExitCode() != 0) {
-                    throw ex;
-                }
-            }
-        }
         finally {
             System.out.println("\n<<< Invocation of Hive command completed <<<\n");
             writeExternalChildIDs(logFile, HIVE_JOB_IDS_PATTERNS, "Hive");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
index 12e1e91..71ee641 100644
--- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
+++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveActionExecutor.java
@@ -22,7 +22,6 @@ import java.io.FileInputStream;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
-import java.io.StringReader;
 import java.io.Writer;
 import java.text.MessageFormat;
 import java.util.Arrays;
@@ -32,22 +31,14 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.util.ClassUtils;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.jdom.Element;
 import org.jdom.Namespace;
 
 public class TestHiveActionExecutor extends ActionExecutorTestCase {
@@ -163,19 +154,13 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
             dataWriter.close();
             Context context = createContext(getActionScriptXml());
             Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.2");
-            final RunningJob launcherJob = submitAction(context, ns);
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            final String launcherId = submitAction(context, ns);
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
-            Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+            Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-            assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+            assertFalse(LauncherHelper.hasIdSwap(actionData));
             HiveActionExecutor ae = new HiveActionExecutor();
             ae.check(context, context.getAction());
             assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -192,19 +177,13 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
         {
             Context context = createContext(getActionQueryXml(hiveScript));
             Namespace ns = Namespace.getNamespace("uri:oozie:hive-action:0.6");
-            final RunningJob launcherJob = submitAction(context, ns);
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            final String launcherId = submitAction(context, ns);
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
-            Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+            Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-            assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+            assertFalse(LauncherHelper.hasIdSwap(actionData));
             HiveActionExecutor ae = new HiveActionExecutor();
             ae.check(context, context.getAction());
             assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -220,7 +199,7 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
         }
     }
 
-    private RunningJob submitAction(Context context, Namespace ns) throws Exception {
+    private String submitAction(Context context, Namespace ns) throws Exception {
         HiveActionExecutor ae = new HiveActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -234,22 +213,9 @@ public class TestHiveActionExecutor extends ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
 
-        JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+
+        return jobId;
     }
 
     private String copyJar(String targetFile, Class<?> anyContainedClass)

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
----------------------------------------------------------------------
diff --git a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
index 2ba0da7..bbd6246 100644
--- a/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
+++ b/sharelib/hive/src/test/java/org/apache/oozie/action/hadoop/TestHiveMain.java
@@ -110,10 +110,10 @@ public class TestHiveMain extends MainTestCase {
             SharelibUtils.addToDistributedCache("hive", fs, getFsTestCaseDir(), jobConf);
 
             jobConf.set(HiveActionExecutor.HIVE_SCRIPT, script.toString());
-            MapReduceMain.setStrings(jobConf, HiveActionExecutor.HIVE_PARAMS, new String[]{
+            ActionUtils.setStrings(jobConf, HiveActionExecutor.HIVE_PARAMS, new String[]{
                 "IN=" + inputDir.toUri().getPath(),
                 "OUT=" + outputDir.toUri().getPath()});
-            MapReduceMain.setStrings(jobConf, HiveActionExecutor.HIVE_ARGS,
+            ActionUtils.setStrings(jobConf, HiveActionExecutor.HIVE_ARGS,
                 new String[]{ "-v" });
 
             File actionXml = new File(getTestCaseDir(), "action.xml");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive2/pom.xml b/sharelib/hive2/pom.xml
index e81bfbe..be51cd1 100644
--- a/sharelib/hive2/pom.xml
+++ b/sharelib/hive2/pom.xml
@@ -171,11 +171,7 @@
             <classifier>tests</classifier>
             <scope>test</scope>
         </dependency>
-        <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-hadoop-utils</artifactId>
-            <scope>provided</scope>
-        </dependency>
+
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
@@ -221,18 +217,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
----------------------------------------------------------------------
diff --git a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
index a3a07bd..e626dbb 100644
--- a/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
+++ b/sharelib/hive2/src/main/java/org/apache/oozie/action/hadoop/Hive2Main.java
@@ -152,8 +152,11 @@ public class Hive2Main extends LauncherMain {
             File localDir = new File("dummy").getAbsoluteFile().getParentFile();
             System.out.println("Current (local) dir = " + localDir.getAbsolutePath());
             System.out.println("------------------------");
-            for (String file : localDir.list()) {
-                System.out.println("  " + file);
+            String[] files = localDir.list();
+            if (files != null) {
+                for (String file : files) {
+                    System.out.println("  " + file);
+                }
             }
             System.out.println("------------------------");
             System.out.println();
@@ -183,7 +186,7 @@ public class Hive2Main extends LauncherMain {
         }
 
         // Pass any parameters to Beeline via arguments
-        String[] params = MapReduceMain.getStrings(actionConf, Hive2ActionExecutor.HIVE2_PARAMS);
+        String[] params = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_PARAMS);
         if (params.length > 0) {
             System.out.println("Parameters:");
             System.out.println("------------------------");
@@ -208,7 +211,7 @@ public class Hive2Main extends LauncherMain {
         arguments.add("-a");
         arguments.add("delegationToken");
 
-        String[] beelineArgs = MapReduceMain.getStrings(actionConf, Hive2ActionExecutor.HIVE2_ARGS);
+        String[] beelineArgs = ActionUtils.getStrings(actionConf, Hive2ActionExecutor.HIVE2_ARGS);
         for (String beelineArg : beelineArgs) {
             if (DISALLOWED_BEELINE_OPTIONS.contains(beelineArg)) {
                 throw new RuntimeException("Error: Beeline argument " + beelineArg + " is not supported");
@@ -233,7 +236,7 @@ public class Hive2Main extends LauncherMain {
         }
         System.out.println();
 
-        LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+        LauncherMain.killChildYarnJobs(actionConf);
 
         System.out.println("=================================================================");
         System.out.println();
@@ -244,13 +247,6 @@ public class Hive2Main extends LauncherMain {
         try {
             runBeeline(arguments.toArray(new String[arguments.size()]), logFile);
         }
-        catch (SecurityException ex) {
-            if (LauncherSecurityManager.getExitInvoked()) {
-                if (LauncherSecurityManager.getExitCode() != 0) {
-                    throw ex;
-                }
-            }
-        }
         finally {
             System.out.println("\n<<< Invocation of Beeline command completed <<<\n");
             writeExternalChildIDs(logFile, HIVE2_JOB_IDS_PATTERNS, "Beeline");
@@ -269,6 +265,7 @@ public class Hive2Main extends LauncherMain {
         BeeLine beeLine = new BeeLine();
         beeLine.setErrorStream(new PrintStream(new TeeOutputStream(System.err, new FileOutputStream(logFile))));
         int status = beeLine.begin(args, null);
+        beeLine.close();
         if (status != 0) {
             System.exit(status);
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
index 4818bb6..2127eb0 100644
--- a/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
+++ b/sharelib/hive2/src/test/java/org/apache/oozie/action/hadoop/TestHive2ActionExecutor.java
@@ -19,7 +19,6 @@
 package org.apache.oozie.action.hadoop;
 
 import java.io.OutputStreamWriter;
-import java.io.StringReader;
 import java.io.Writer;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -29,15 +28,9 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
@@ -69,10 +62,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
         setSystemProperty("oozie.service.ActionService.executor.classes", Hive2ActionExecutor.class.getName());
     }
 
-    @SuppressWarnings("unchecked")
     public void testSetupMethodsForScript() throws Exception {
         Hive2ActionExecutor ae = new Hive2ActionExecutor();
-        List<Class> classes = new ArrayList<Class>();
+        List<Class<?>> classes = new ArrayList<>();
         classes.add(Hive2Main.class);
         assertEquals(classes, ae.getLauncherClasses());
 
@@ -110,10 +102,9 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
         assertEquals("--dee", conf.get("oozie.hive2.args.1"));
     }
 
-    @SuppressWarnings("unchecked")
     public void testSetupMethodsForQuery() throws Exception {
         Hive2ActionExecutor ae = new Hive2ActionExecutor();
-        List<Class> classes = new ArrayList<Class>();
+        List<Class<?>> classes = new ArrayList<>();
         classes.add(Hive2Main.class);
         assertEquals(classes, ae.getLauncherClasses());
 
@@ -192,7 +183,6 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
             "<query>" + query + "</query>" + "</hive2>";
     }
 
-    @SuppressWarnings("deprecation")
     public void testHive2Action() throws Exception {
         setupHiveServer2();
         Path inputDir = new Path(getFsTestCaseDir(), INPUT_DIRNAME);
@@ -205,21 +195,14 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
             dataWriter.write(SAMPLE_DATA_TEXT);
             dataWriter.close();
             Context context = createContext(getQueryActionXml(query));
-            final RunningJob launcherJob = submitAction(context,
+            final String launcherId = submitAction(context,
                 Namespace.getNamespace("uri:oozie:hive2-action:0.2"));
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
-            Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+            Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-            assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+            assertFalse(LauncherHelper.hasIdSwap(actionData));
             Hive2ActionExecutor ae = new Hive2ActionExecutor();
             ae.check(context, context.getAction());
             assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -241,21 +224,14 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
             dataWriter.write(SAMPLE_DATA_TEXT);
             dataWriter.close();
             Context context = createContext(getScriptActionXml());
-            final RunningJob launcherJob = submitAction(context,
+            final String launcherId = submitAction(context,
                 Namespace.getNamespace("uri:oozie:hive2-action:0.1"));
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
+            waitUntilYarnAppDoneAndAssertSuccess(launcherId);
             Configuration conf = new XConfiguration();
             conf.set("user.name", getTestUser());
-            Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+            Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-            assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+            assertFalse(LauncherHelper.hasIdSwap(actionData));
             Hive2ActionExecutor ae = new Hive2ActionExecutor();
             ae.check(context, context.getAction());
             assertTrue(launcherId.equals(context.getAction().getExternalId()));
@@ -267,35 +243,33 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
             assertTrue(fs.exists(outputDir));
             assertTrue(fs.isDirectory(outputDir));
         }
-        // Negative testcase with incorrect hive-query.
-        {
-            String query = getHive2BadScript(inputDir.toString(), outputDir.toString());
-            Writer dataWriter = new OutputStreamWriter(fs.create(new Path(inputDir, DATA_FILENAME)));
-            dataWriter.write(SAMPLE_DATA_TEXT);
-            dataWriter.close();
-            Context context = createContext(getQueryActionXml(query));
-            final RunningJob launcherJob = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2"));
-            String launcherId = context.getAction().getExternalId();
-            waitFor(200 * 1000, new Predicate() {
-                @Override
-                public boolean evaluate() throws Exception {
-                    return launcherJob.isComplete();
-                }
-            });
-            assertTrue(launcherJob.isSuccessful());
-            Configuration conf = new XConfiguration();
-            conf.set("user.name", getTestUser());
-            Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
-                    conf);
-            assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
-            Hive2ActionExecutor ae = new Hive2ActionExecutor();
-            ae.check(context, context.getAction());
-            assertTrue(launcherId.equals(context.getAction().getExternalId()));
-            assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
-            ae.end(context, context.getAction());
-            assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
-            assertNull(context.getExternalChildIDs());
-        }
+    }
+
+    public void testHive2ActionFails() throws Exception {
+        setupHiveServer2();
+        Path inputDir = new Path(getFsTestCaseDir(), INPUT_DIRNAME);
+        Path outputDir = new Path(getFsTestCaseDir(), OUTPUT_DIRNAME);
+        FileSystem fs = getFileSystem();
+
+        String query = getHive2BadScript(inputDir.toString(), outputDir.toString());
+        Writer dataWriter = new OutputStreamWriter(fs.create(new Path(inputDir, DATA_FILENAME)));
+        dataWriter.write(SAMPLE_DATA_TEXT);
+        dataWriter.close();
+        Context context = createContext(getQueryActionXml(query));
+        final String launcherId = submitAction(context, Namespace.getNamespace("uri:oozie:hive2-action:0.2"));
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+        Configuration conf = new XConfiguration();
+        conf.set("user.name", getTestUser());
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
+                conf);
+        assertFalse(LauncherHelper.hasIdSwap(actionData));
+        Hive2ActionExecutor ae = new Hive2ActionExecutor();
+        ae.check(context, context.getAction());
+        assertTrue(launcherId.equals(context.getAction().getExternalId()));
+        assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
+        ae.end(context, context.getAction());
+        assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
+        assertNull(context.getExternalChildIDs());
     }
 
     private String getHive2BadScript(String inputPath, String outputPath) {
@@ -311,7 +285,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
         return buffer.toString();
     }
 
-    private RunningJob submitAction(Context context, Namespace ns) throws Exception {
+    private String submitAction(Context context, Namespace ns) throws Exception {
         Hive2ActionExecutor ae = new Hive2ActionExecutor();
 
         WorkflowAction action = context.getAction();
@@ -325,21 +299,7 @@ public class TestHive2ActionExecutor extends ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration", ns)).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker", ns));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node", ns));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return jobId;
     }
 
     private Context createContext(String actionXml) throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/oozie/pom.xml b/sharelib/oozie/pom.xml
index f3ea071..12f5cdd 100644
--- a/sharelib/oozie/pom.xml
+++ b/sharelib/oozie/pom.xml
@@ -61,17 +61,17 @@
             <scope>test</scope>
         </dependency>
 
+       <dependency>
+           <groupId>org.hamcrest</groupId>
+           <artifactId>hamcrest-all</artifactId>
+           <scope>test</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-all</artifactId>
             <scope>test</scope>
         </dependency>
-
-        <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-hadoop-utils</artifactId>
-            <scope>compile</scope>
-        </dependency>
     </dependencies>
 
     <build>
@@ -97,18 +97,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
new file mode 100644
index 0000000..e6c9d04
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMCallBackHandler.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+
+// Note: methods which modify/read the state of errorHolder are synchronized to avoid data races when LauncherAM invokes getError()
+public class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler {
+    private ErrorHolder errorHolder;
+
+    @Override
+    public void onContainersCompleted(List<ContainerStatus> containerStatuses) {
+        //noop
+    }
+
+    @Override
+    public void onContainersAllocated(List<Container> containers) {
+        //noop
+    }
+
+    @Override
+    public synchronized void onShutdownRequest() {
+        System.out.println("Resource manager requested AM Shutdown");
+        errorHolder = new ErrorHolder();
+        errorHolder.setErrorCode(0);
+        errorHolder.setErrorMessage("ResourceManager requested AM Shutdown");
+    }
+
+    @Override
+    public void onNodesUpdated(List<NodeReport> nodeReports) {
+        //noop
+    }
+
+    @Override
+    public float getProgress() {
+        return 0.5f;
+    }
+
+    @Override
+    public synchronized void onError(final Throwable ex) {
+        System.out.println("Received asynchronous error");
+        ex.printStackTrace();
+        errorHolder = new ErrorHolder();
+        errorHolder.setErrorCode(0);
+        errorHolder.setErrorMessage(ex.getMessage());
+        errorHolder.setErrorCause(ex);
+    }
+
+    public synchronized ErrorHolder getError() {
+        return errorHolder;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
new file mode 100644
index 0000000..b4cbb4b
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+
+public class AMRMClientAsyncFactory {
+
+    public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) {
+        AMRMClient<?> amRmClient = AMRMClient.createAMRMClient();
+        AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
+        AMRMClientAsync<?> amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler);
+
+        return amRmClientAsync;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java
new file mode 100644
index 0000000..3002ad5
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ActionUtils.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+
+public final class ActionUtils {
+
+    private ActionUtils() {
+        // no instances
+    }
+
+    public static void setStrings(Configuration conf, String key, String[] values) {
+        if (values != null) {
+            conf.setInt(key + ".size", values.length);
+            for (int i = 0; i < values.length; i++) {
+                conf.set(key + "." + i, values[i]);
+            }
+        }
+    }
+
+    public static String[] getStrings(Configuration conf, String key) {
+        String[] values = new String[conf.getInt(key + ".size", 0)];
+        for (int i = 0; i < values.length; i++) {
+            values[i] = conf.get(key + "." + i);
+            if (values[i] == null) {
+                values[i] = "";
+            }
+        }
+        return values;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
new file mode 100644
index 0000000..6a755db
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ErrorHolder.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+public class ErrorHolder {
+    private int errorCode = 0;
+    private Throwable errorCause = null;
+    private String errorMessage = null;
+    private boolean populated = false;
+
+    public int getErrorCode() {
+        return errorCode;
+    }
+
+    public void setErrorCode(int errorCode) {
+        this.errorCode = errorCode;
+        this.populated = true;
+    }
+
+    public Throwable getErrorCause() {
+        return errorCause;
+    }
+
+    public void setErrorCause(Throwable errorCause) {
+        this.errorCause = errorCause;
+        this.populated = true;
+    }
+
+    public String getErrorMessage() {
+        return errorMessage;
+    }
+
+    public void setErrorMessage(String errorMessage) {
+        this.errorMessage = errorMessage;
+        this.populated = true;
+    }
+
+    public boolean isPopulated() {
+        return populated;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
new file mode 100644
index 0000000..874d371
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.PrivilegedExceptionAction;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import com.google.common.base.Preconditions;
+
+public class HdfsOperations {
+    private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8;
+    private final SequenceFileWriterFactory seqFileWriterFactory;
+    private final UserGroupInformation ugi;
+
+    public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, UserGroupInformation ugi) {
+        this.seqFileWriterFactory = Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should not be null");
+        this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
+    }
+
+    /**
+     * Creates a Sequence file which contains the output from an action and uploads it to HDFS.
+     */
+    public void uploadActionDataToHDFS(final Configuration launcherJobConf, final Path actionDir,
+            final Map<String, String> actionData) throws IOException, InterruptedException {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE);
+                // upload into sequence file
+                System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri());
+
+                try (SequenceFile.Writer wr =
+                        seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) {
+
+                    if (wr != null) {
+                        for (Entry<String, String> entry : actionData.entrySet()) {
+                            wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
+                        }
+                    } else {
+                        throw new IOException("SequenceFile.Writer is null for " + finalPath);
+                    }
+                }
+
+                return null;
+            }
+        });
+    }
+
+    public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException {
+        return ugi.doAs(new PrivilegedExceptionAction<Boolean>() {
+            @Override
+            public Boolean run() throws Exception {
+                FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf);
+                return fs.exists(path);
+            }
+        });
+    }
+
+    public void writeStringToFile(final Path path, final Configuration conf, final String contents)
+            throws IOException, InterruptedException {
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+                        java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) {
+                    writer.write(contents);
+                }
+
+                return null;
+            }
+        });
+    }
+
+    public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException {
+        return ugi.doAs(new PrivilegedExceptionAction<String>() {
+            @Override
+            public String run() throws Exception {
+                StringBuilder sb = new StringBuilder();
+
+                try (FileSystem fs = FileSystem.get(path.toUri(), conf);
+                        InputStream is = fs.open(path);
+                        BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) {
+
+                    String contents;
+                    while ((contents = reader.readLine()) != null) {
+                        sb.append(contents);
+                    }
+                }
+
+                return sb.toString();
+            }
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
index 30d68e2..c3e3d3f 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/JavaMain.java
@@ -44,11 +44,11 @@ public class JavaMain extends LauncherMain {
         setApplicationTags(actionConf, TEZ_APPLICATION_TAGS);
         setApplicationTags(actionConf, SPARK_YARN_TAGS);
 
-        LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+        LauncherMain.killChildYarnJobs(actionConf);
 
         Class<?> klass = actionConf.getClass(JAVA_MAIN_CLASS, Object.class);
-        System.out.println("Main class        : " + klass.getName());
-        LauncherMapper.printArgs("Arguments         :", args);
+        System.out.println("Java action main class        : " + klass.getName());
+        printArgs("Java action arguments         :", args);
         System.out.println();
         Method mainMethod = klass.getMethod("main", String[].class);
         try {
@@ -60,4 +60,13 @@ public class JavaMain extends LauncherMain {
     }
 
 
+    /**
+     * Used by JavaMain to wrap a Throwable when an Exception occurs
+     */
+    @SuppressWarnings("serial")
+    static class JavaMainException extends Exception {
+        public JavaMainException(Throwable t) {
+            super(t);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
new file mode 100644
index 0000000..4f252d1
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -0,0 +1,614 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Permission;
+import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+
+public class LauncherAM {
+    private static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml";
+    private static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id";
+
+    public static final String JAVA_CLASS_PATH = "java.class.path";
+    public static final String OOZIE_ACTION_ID = "oozie.action.id";
+    public static final String OOZIE_JOB_ID = "oozie.job.id";
+    public static final String ACTION_PREFIX = "oozie.action.";
+    static final String OOZIE_ACTION_RECOVERY_ID = ACTION_PREFIX + "recovery.id";
+    public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data";
+    public static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg.";
+    public static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count";
+    public static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
+    public static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path";
+    public static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml";
+    public static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE
+    public static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs";
+    public static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
+    public static final String ACTION_DATA_STATS = "stats.properties";
+    public static final String ACTION_DATA_NEW_ID = "newId";
+    public static final String ACTION_DATA_ERROR_PROPS = "error.properties";
+    public static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
+
+    // TODO: OYA: more unique file names?  action.xml may be stuck for backwards compat though
+    public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml";
+    public static final String ACTION_CONF_XML = "action.xml";
+    public static final String ACTION_DATA_FINAL_STATUS = "final.status";
+
+    private final UserGroupInformation ugi;
+    private final AMRMCallBackHandler callbackHandler;
+    private final AMRMClientAsyncFactory amRmClientAsyncFactory;
+    private final HdfsOperations hdfsOperations;
+    private final LocalFsOperations localFsOperations;
+    private final PrepareActionsHandler prepareHandler;
+    private final LauncherAMCallbackNotifierFactory callbackNotifierFactory;
+    private final LauncherSecurityManager launcherSecurityManager;
+    private final ContainerId containerId;
+
+    private Configuration launcherJobConf;
+    private AMRMClientAsync<?> amRmClientAsync;
+    private Path actionDir;
+    private Map<String, String> actionData = new HashMap<String,String>();
+
+    public LauncherAM(UserGroupInformation ugi,
+            AMRMClientAsyncFactory amRmClientAsyncFactory,
+            AMRMCallBackHandler callbackHandler,
+            HdfsOperations hdfsOperations,
+            LocalFsOperations localFsOperations,
+            PrepareActionsHandler prepareHandler,
+            LauncherAMCallbackNotifierFactory callbackNotifierFactory,
+            LauncherSecurityManager launcherSecurityManager,
+            String containerId) {
+        this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null");
+        this.amRmClientAsyncFactory = Preconditions.checkNotNull(amRmClientAsyncFactory,
+                "amRmClientAsyncFactory should not be null");
+        this.callbackHandler = Preconditions.checkNotNull(callbackHandler, "callbackHandler should not be null");
+        this.hdfsOperations = Preconditions.checkNotNull(hdfsOperations, "hdfsOperations should not be null");
+        this.localFsOperations = Preconditions.checkNotNull(localFsOperations, "localFsOperations should not be null");
+        this.prepareHandler = Preconditions.checkNotNull(prepareHandler, "prepareHandler should not be null");
+        this.callbackNotifierFactory = Preconditions.checkNotNull(callbackNotifierFactory,
+                "callbackNotifierFactory should not be null");
+        this.launcherSecurityManager = Preconditions.checkNotNull(launcherSecurityManager,
+                "launcherSecurityManager should not be null");
+        this.containerId = ContainerId.fromString(Preconditions.checkNotNull(containerId, "containerId should not be null"));
+    }
+
+    public static void main(String[] args) throws Exception {
+        UserGroupInformation ugi = null;
+        String submitterUser = System.getProperty("submitter.user", "").trim();
+        Preconditions.checkArgument(!submitterUser.isEmpty(), "Submitter user is undefined");
+        System.out.println("Submitter user is: " + submitterUser);
+
+        // We don't need remote/proxy user if the current login user is the workflow submitter
+        // Otherwise we have to create a remote user
+        if (UserGroupInformation.getLoginUser().getShortUserName().equals(submitterUser)) {
+            System.out.println("Using login user for UGI");
+            ugi = UserGroupInformation.getLoginUser();
+        } else {
+            ugi = UserGroupInformation.createRemoteUser(submitterUser);
+            ugi.addCredentials(UserGroupInformation.getLoginUser().getCredentials());
+        }
+
+        AMRMClientAsyncFactory amRmClientAsyncFactory = new AMRMClientAsyncFactory();
+        AMRMCallBackHandler callbackHandler = new AMRMCallBackHandler();
+        HdfsOperations hdfsOperations = new HdfsOperations(new SequenceFileWriterFactory(), ugi);
+        LocalFsOperations localFSOperations = new LocalFsOperations();
+        PrepareActionsHandler prepareHandler = new PrepareActionsHandler();
+        LauncherAMCallbackNotifierFactory callbackNotifierFactory = new LauncherAMCallbackNotifierFactory();
+        LauncherSecurityManager launcherSecurityManager = new LauncherSecurityManager();
+
+        LauncherAM launcher = new LauncherAM(ugi,
+                amRmClientAsyncFactory,
+                callbackHandler,
+                hdfsOperations,
+                localFSOperations,
+                prepareHandler,
+                callbackNotifierFactory,
+                launcherSecurityManager,
+                System.getenv("CONTAINER_ID"));
+
+        launcher.run();
+    }
+
+    public void run() throws Exception {
+        final ErrorHolder errorHolder = new ErrorHolder();
+        OozieActionResult actionResult = OozieActionResult.FAILED;
+        boolean launcherExecutedProperly = false;
+        boolean backgroundAction = false;
+
+        try {
+            try {
+                launcherJobConf = localFsOperations.readLauncherConf();
+                System.out.println("Launcher AM configuration loaded");
+            } catch (Exception ex) {
+                errorHolder.setErrorMessage("Could not load the Launcher AM configuration file");
+                errorHolder.setErrorCause(ex);
+                throw ex;
+            }
+            actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH));
+
+            registerWithRM();
+            executePrepare(ugi, errorHolder);
+            final String[] mainArgs = getMainArguments(launcherJobConf);
+            printDebugInfo();
+            setupMainConfiguration();
+            launcherExecutedProperly = runActionMain(mainArgs, errorHolder, ugi);
+
+            if (launcherExecutedProperly) {
+                handleActionData();
+                if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
+                    System.out.println();
+                    System.out.println("Oozie Launcher, capturing output data:");
+                    System.out.println("=======================");
+                    System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
+                    System.out.println();
+                    System.out.println("=======================");
+                    System.out.println();
+                }
+                if (actionData.get(ACTION_DATA_NEW_ID) != null) {
+                    System.out.println();
+                    System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
+                    System.out.println("=======================");
+                    System.out.println(actionData.get(ACTION_DATA_NEW_ID));
+                    System.out.println("=======================");
+                    System.out.println();
+                    backgroundAction = true;
+                }
+            }
+        } catch (Exception e) {
+            System.out.println("Launcher AM execution failed");
+            System.err.println("Launcher AM execution failed");
+            e.printStackTrace(System.out);
+            e.printStackTrace(System.err);
+            launcherExecutedProperly = false;
+            if (!errorHolder.isPopulated()) {
+                errorHolder.setErrorCause(e);
+                errorHolder.setErrorMessage(e.getMessage());
+            }
+            throw e;
+        } finally {
+            try {
+                ErrorHolder callbackErrorHolder = callbackHandler.getError();
+
+                if (launcherExecutedProperly) {
+                    actionResult = backgroundAction ? OozieActionResult.RUNNING : OozieActionResult.SUCCEEDED;
+                }
+
+                if (!launcherExecutedProperly) {
+                    updateActionDataWithFailure(errorHolder, actionData);
+                } else if (callbackErrorHolder != null) {  // async error from the callback
+                    actionResult = OozieActionResult.FAILED;
+                    updateActionDataWithFailure(callbackErrorHolder, actionData);
+                }
+
+                actionData.put(ACTION_DATA_FINAL_STATUS, actionResult.toString());
+                hdfsOperations.uploadActionDataToHDFS(launcherJobConf, actionDir, actionData);
+            } finally {
+                try {
+                    unregisterWithRM(actionResult, errorHolder.getErrorMessage());
+                } finally {
+                    LauncherAMCallbackNotifier cn = callbackNotifierFactory.createCallbackNotifier(launcherJobConf);
+                    cn.notifyURL(actionResult);
+                }
+            }
+        }
+    }
+
+    @VisibleForTesting
+    Map<String, String> getActionData() {
+        return actionData;
+    }
+
+    private void printDebugInfo() throws IOException {
+        localFsOperations.printContentsOfDir(new File("."));
+
+        System.out.println();
+        System.out.println("Oozie Launcher Application Master configuration");
+        System.out.println("===============================================");
+        System.out.println("Workflow job id   : " + launcherJobConf.get(OOZIE_JOB_ID));
+        System.out.println("Workflow action id: " + launcherJobConf.get(OOZIE_ACTION_ID));
+        System.out.println();
+        System.out.println("Classpath         :");
+        System.out.println("------------------------");
+        StringTokenizer st = new StringTokenizer(System.getProperty(JAVA_CLASS_PATH), ":");
+        while (st.hasMoreTokens()) {
+            System.out.println("  " + st.nextToken());
+        }
+        System.out.println("------------------------");
+        System.out.println();
+        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+        System.out.println("Main class        : " + mainClass);
+        System.out.println();
+        System.out.println("Maximum output    : "
+                + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
+        System.out.println();
+
+        System.out.println();
+        System.out.println("Java System Properties:");
+        System.out.println("------------------------");
+        System.getProperties().store(System.out, "");
+        System.out.println("------------------------");
+        System.out.println();
+
+        System.out.println("Environment variables");
+        Map<String, String> env = System.getenv();
+        System.out.println("------------------------");
+        for (Map.Entry<String, String> entry : env.entrySet()) {
+            System.out.println(entry.getKey() + "=" + entry.getValue());
+        }
+        System.out.println("------------------------");
+        System.out.println("=================================================================");
+        System.out.println();
+        System.out.println(">>> Invoking Main class now >>>");
+        System.out.println();
+    }
+
+    private void registerWithRM() throws IOException, YarnException {
+        // TODO: OYA: make heartbeat interval configurable & make interval higher to put less load on RM, but lower than timeout
+        amRmClientAsync = amRmClientAsyncFactory.createAMRMClientAsync(60000);
+        amRmClientAsync.init(new Configuration(launcherJobConf));
+        amRmClientAsync.start();
+
+        // hostname and tracking url are determined automatically
+        amRmClientAsync.registerApplicationMaster("", 0, "");
+    }
+
+    private void unregisterWithRM(OozieActionResult actionResult, String message) throws YarnException, IOException {
+        if (amRmClientAsync != null) {
+            System.out.println("Stopping AM");
+            try {
+                message = (message == null) ? "" : message;
+                // tracking url is determined automatically
+                amRmClientAsync.unregisterApplicationMaster(actionResult.getYarnStatus(), message, "");
+            } catch (Exception ex) {
+                System.out.println("Error un-registering AM client");
+                throw ex;
+            } finally {
+                amRmClientAsync.stop();
+            }
+        }
+    }
+
+    // Method to execute the prepare actions
+    private void executePrepare(UserGroupInformation ugi, ErrorHolder errorHolder) throws Exception {
+        try {
+            System.out.println("\nStarting the execution of prepare actions");
+            ugi.doAs(new PrivilegedExceptionAction<Void>() {
+                @Override
+                public Void run() throws Exception {
+                    String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML);
+                    if (prepareXML != null) {
+                        if (prepareXML.length() != 0) {
+                            Configuration actionConf = new Configuration(launcherJobConf);
+                            actionConf.addResource(ACTION_CONF_XML);
+                            prepareHandler.prepareAction(prepareXML, actionConf);
+                        } else {
+                            System.out.println("There are no prepare actions to execute.");
+                        }
+                    }
+                    return null;
+                }
+            });
+            System.out.println("Completed the execution of prepare actions successfully");
+        } catch (Exception ex) {
+            errorHolder.setErrorMessage("Prepare execution in the Launcher AM has failed");
+            errorHolder.setErrorCause(ex);
+            throw ex;
+        }
+    }
+
+    private void setupMainConfiguration() throws IOException {
+        System.setProperty(OOZIE_LAUNCHER_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID));
+        System.setProperty(OOZIE_JOB_ID, launcherJobConf.get(OOZIE_JOB_ID));
+        System.setProperty(OOZIE_ACTION_ID, launcherJobConf.get(OOZIE_ACTION_ID));
+        System.setProperty(OOZIE_ACTION_CONF_XML, new File(ACTION_CONF_XML).getAbsolutePath());
+        System.setProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS,
+                new File(ACTION_DATA_EXTERNAL_CHILD_IDS).getAbsolutePath());
+        System.setProperty(ACTION_PREFIX + ACTION_DATA_STATS, new File(ACTION_DATA_STATS).getAbsolutePath());
+        System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath());
+        System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
+        System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
+
+        System.setProperty("oozie.job.launch.time", String.valueOf(System.currentTimeMillis()));
+    }
+
+    private boolean runActionMain(final String[] mainArgs, final ErrorHolder eHolder, UserGroupInformation ugi) throws Exception {
+        // using AtomicBoolean because we want to modify it inside run()
+        final AtomicBoolean actionMainExecutedProperly = new AtomicBoolean(false);
+
+        ugi.doAs(new PrivilegedExceptionAction<Void>() {
+            @Override
+            public Void run() throws Exception {
+                try {
+                    setRecoveryId();
+                    Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, null);
+                    Preconditions.checkNotNull(klass, "Launcher class should not be null");
+                    System.out.println("Launcher class: " + klass.toString());
+                    Method mainMethod = klass.getMethod("main", String[].class);
+                    // Enable LauncherSecurityManager to catch System.exit calls
+                    launcherSecurityManager.enable();
+                    mainMethod.invoke(null, (Object) mainArgs);
+
+                    System.out.println();
+                    System.out.println("<<< Invocation of Main class completed <<<");
+                    System.out.println();
+                    actionMainExecutedProperly.set(true);
+                } catch (InvocationTargetException ex) {
+                    ex.printStackTrace(System.out);
+                    // Get what actually caused the exception
+                    Throwable cause = ex.getCause();
+                    // If we got a JavaMainException from JavaMain, then we need to unwrap it
+                    if (JavaMain.JavaMainException.class.isInstance(cause)) {
+                        cause = cause.getCause();
+                    }
+                    if (LauncherMainException.class.isInstance(cause)) {
+                        int errorCode = ((LauncherMainException) ex.getCause()).getErrorCode();
+                        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                        eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" +
+                                errorCode + "]");
+                        eHolder.setErrorCode(errorCode);
+                    } else if (SecurityException.class.isInstance(cause)) {
+                        if (launcherSecurityManager.getExitInvoked()) {
+                            final int exitCode = launcherSecurityManager.getExitCode();
+                            System.out.println("Intercepting System.exit(" + exitCode + ")");
+                            // if 0 main() method finished successfully
+                            // ignoring
+                            eHolder.setErrorCode(exitCode);
+                            if (exitCode != 0) {
+                                String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                                eHolder.setErrorMessage("Main Class [" + mainClass + "],"
+                                        + " exit code [" + eHolder.getErrorCode() + "]");
+                            } else {
+                                actionMainExecutedProperly.set(true);
+                            }
+                        } else {
+                            // just SecurityException, no exit was invoked
+                            eHolder.setErrorCode(0);
+                            eHolder.setErrorCause(cause);
+                            eHolder.setErrorMessage(cause.getMessage());
+                        }
+                    } else {
+                        eHolder.setErrorMessage(cause.getMessage());
+                        eHolder.setErrorCause(cause);
+                    }
+                } catch (Throwable t) {
+                    t.printStackTrace();
+                    eHolder.setErrorMessage(t.getMessage());
+                    eHolder.setErrorCause(t);
+                } finally {
+                    // Disable LauncherSecurityManager
+                    launcherSecurityManager.disable();
+                }
+
+                return null;
+            }
+        });
+
+        return actionMainExecutedProperly.get();
+    }
+
+    private void setRecoveryId() throws LauncherException {
+        try {
+            ApplicationId applicationId = containerId.getApplicationAttemptId().getApplicationId();
+            String applicationIdStr = applicationId.toString();
+
+            String recoveryId = Preconditions.checkNotNull(launcherJobConf.get(OOZIE_ACTION_RECOVERY_ID),
+                            "RecoveryID should not be null");
+
+            Path path = new Path(actionDir, recoveryId);
+            if (!hdfsOperations.fileExists(path, launcherJobConf)) {
+                hdfsOperations.writeStringToFile(path, launcherJobConf, applicationIdStr);
+            } else {
+                String id = hdfsOperations.readFileContents(path, launcherJobConf);
+
+                if (!applicationIdStr.equals(id)) {
+                    throw new LauncherException(MessageFormat.format(
+                            "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", path, id,
+                            applicationIdStr));
+                }
+            }
+        } catch (RuntimeException | InterruptedException | IOException ex) {
+            throw new LauncherException("IO error", ex);
+        }
+    }
+
+    private void handleActionData() throws IOException {
+        // external child IDs
+        processActionData(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS, null,
+                ACTION_DATA_EXTERNAL_CHILD_IDS, -1);
+
+        // external stats
+        processActionData(ACTION_PREFIX + ACTION_DATA_STATS, CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+                ACTION_DATA_STATS, Integer.MAX_VALUE);
+
+        // output data
+        processActionData(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
+                ACTION_DATA_OUTPUT_PROPS, 2048);
+
+        // id swap
+        processActionData(ACTION_PREFIX + ACTION_DATA_NEW_ID, null,
+                ACTION_DATA_NEW_ID, -1);
+    }
+
+    private void processActionData(String propertyName, String maxSizePropertyName, String actionDataPropertyName,
+            int maxSizeDefault) throws IOException {
+        String propValue = System.getProperty(propertyName);
+        int maxSize = maxSizeDefault;
+
+        if (maxSizePropertyName != null) {
+            maxSize = launcherJobConf.getInt(maxSizePropertyName, maxSizeDefault);
+        }
+
+        if (propValue != null) {
+            File actionDataFile = new File(propValue);
+            if (localFsOperations.fileExists(actionDataFile)) {
+                actionData.put(actionDataPropertyName, localFsOperations.getLocalFileContentAsString(actionDataFile,
+                        actionDataPropertyName, maxSize));
+            }
+        }
+    }
+
+    private void updateActionDataWithFailure(ErrorHolder eHolder, Map<String, String> actionData) {
+        if (eHolder.getErrorCause() != null && eHolder.getErrorCause().getMessage() != null) {
+            if (Objects.equal(eHolder.getErrorMessage(), eHolder.getErrorCause().getMessage())) {
+                eHolder.setErrorMessage(eHolder.getErrorMessage());
+            } else {
+                eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage());
+            }
+        }
+
+        Properties errorProps = new Properties();
+        errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode()));
+        String errorMessage = eHolder.getErrorMessage() == null ? "<empty>" : eHolder.getErrorMessage();
+        errorProps.setProperty("error.reason", errorMessage);
+        if (eHolder.getErrorCause() != null) {
+            if (eHolder.getErrorCause().getMessage() != null) {
+                errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage());
+            }
+            StringWriter sw = new StringWriter();
+            PrintWriter pw = new PrintWriter(sw);
+            eHolder.getErrorCause().printStackTrace(pw);
+            pw.close();
+            errorProps.setProperty("exception.stacktrace", sw.toString());
+        }
+
+        StringWriter sw = new StringWriter();
+        try {
+            errorProps.store(sw, "");
+            sw.close();
+            actionData.put(LauncherAM.ACTION_DATA_ERROR_PROPS, sw.toString());
+
+            // external child IDs
+            String externalChildIdsProp = System.getProperty(LauncherAM.ACTION_PREFIX + LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+            if (externalChildIdsProp != null) {
+                File externalChildIDs = new File(externalChildIdsProp);
+                if (localFsOperations.fileExists(externalChildIDs)) {
+                    actionData.put(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS,
+                            localFsOperations.getLocalFileContentAsString(externalChildIDs, ACTION_DATA_EXTERNAL_CHILD_IDS, -1));
+                }
+            }
+        } catch (IOException ioe) {
+            System.out.println("A problem occured trying to fail the launcher");
+            ioe.printStackTrace();
+        } finally {
+            System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
+            if (eHolder.getErrorCause() != null) {
+                eHolder.getErrorCause().printStackTrace(System.out);
+            }
+        }
+    }
+
+    private String[] getMainArguments(Configuration conf) {
+        return LauncherMapper.getMainArguments(conf);
+    }
+
+    public static class LauncherSecurityManager extends SecurityManager {
+        private boolean exitInvoked;
+        private int exitCode;
+        private SecurityManager originalSecurityManager;
+
+        public LauncherSecurityManager() {
+            exitInvoked = false;
+            exitCode = 0;
+            originalSecurityManager = System.getSecurityManager();
+        }
+
+        @Override
+        public void checkPermission(Permission perm, Object context) {
+            if (originalSecurityManager != null) {
+                // check everything with the original SecurityManager
+                originalSecurityManager.checkPermission(perm, context);
+            }
+        }
+
+        @Override
+        public void checkPermission(Permission perm) {
+            if (originalSecurityManager != null) {
+                // check everything with the original SecurityManager
+                originalSecurityManager.checkPermission(perm);
+            }
+        }
+
+        @Override
+        public void checkExit(int status) throws SecurityException {
+            exitInvoked = true;
+            exitCode = status;
+            throw new SecurityException("Intercepted System.exit(" + status + ")");
+        }
+
+        public boolean getExitInvoked() {
+            return exitInvoked;
+        }
+
+        public int getExitCode() {
+            return exitCode;
+        }
+
+        public void enable() {
+            if (System.getSecurityManager() != this) {
+                System.setSecurityManager(this);
+            }
+        }
+
+        public void disable() {
+            if (System.getSecurityManager() == this) {
+                System.setSecurityManager(originalSecurityManager);
+            }
+        }
+    }
+
+    public enum OozieActionResult {
+        SUCCEEDED(FinalApplicationStatus.SUCCEEDED),
+        FAILED(FinalApplicationStatus.FAILED),
+        RUNNING(FinalApplicationStatus.SUCCEEDED);
+
+        // YARN-equivalent status
+        private FinalApplicationStatus yarnStatus;
+
+        OozieActionResult(FinalApplicationStatus yarnStatus) {
+            this.yarnStatus = yarnStatus;
+        }
+
+        public FinalApplicationStatus getYarnStatus() {
+            return yarnStatus;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..2972658
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URL;
+
+// Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier
+/**
+ * This call sends back an HTTP GET callback to the configured URL.  It is meant for the {@link LauncherAM} to notify the
+ * Oozie Server that it has finished.
+ */
+public class LauncherAMCallbackNotifier {
+    private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback.";
+    private static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
+
+    public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts";
+    public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval";
+    public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "max.attempts";
+    public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = OOZIE_LAUNCHER_CALLBACK + "timeout";
+    public static final String OOZIE_LAUNCHER_CALLBACK_URL = OOZIE_LAUNCHER_CALLBACK + "url";
+    public static final String OOZIE_LAUNCHER_CALLBACK_PROXY = OOZIE_LAUNCHER_CALLBACK + "proxy";
+    public static final String OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN = "$jobStatus";
+
+    protected String userUrl;
+    protected String proxyConf;
+    protected int numTries; //Number of tries to attempt notification
+    protected int waitInterval; //Time (ms) to wait between retrying notification
+    protected int timeout; // Timeout (ms) on the connection and notification
+    protected URL urlToNotify; //URL to notify read from the config
+    protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+
+
+    /**
+     * Parse the URL that needs to be notified of the end of the job, along
+     * with the number of retries in case of failure, the amount of time to
+     * wait between retries and proxy settings
+     * @param conf the configuration
+     */
+    public LauncherAMCallbackNotifier(Configuration conf) {
+        numTries = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 0) + 1,
+                conf.getInt(OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, 1));
+
+        waitInterval = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX),
+                OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+        waitInterval = (waitInterval < 0) ? OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX : waitInterval;
+
+        timeout = conf.getInt(OOZIE_LAUNCHER_CALLBACK_TIMEOUT, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+
+        userUrl = conf.get(OOZIE_LAUNCHER_CALLBACK_URL);
+
+        proxyConf = conf.get(OOZIE_LAUNCHER_CALLBACK_PROXY);
+
+        //Configure the proxy to use if its set. It should be set like
+        //proxyType@proxyHostname:port
+        if(proxyConf != null && !proxyConf.equals("") &&
+                proxyConf.lastIndexOf(":") != -1) {
+            int typeIndex = proxyConf.indexOf("@");
+            Proxy.Type proxyType = Proxy.Type.HTTP;
+            if(typeIndex != -1 &&
+                    proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
+                proxyType = Proxy.Type.SOCKS;
+            }
+            String hostname = proxyConf.substring(typeIndex + 1,
+                    proxyConf.lastIndexOf(":"));
+            String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
+            try {
+                int port = Integer.parseInt(portConf);
+                proxyToUse = new Proxy(proxyType,
+                        new InetSocketAddress(hostname, port));
+                System.out.println("Callback notification using proxy type \"" + proxyType +
+                        "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
+            } catch(NumberFormatException nfe) {
+                System.err.println("Callback notification couldn't parse configured proxy's port "
+                        + portConf + ". Not going to use a proxy");
+            }
+        }
+
+    }
+
+    /**
+     * Notify the URL just once. Use best effort.
+     */
+    protected boolean notifyURLOnce() {
+        boolean success = false;
+        HttpURLConnection conn = null;
+        try {
+            System.out.println("Callback notification trying " + urlToNotify);
+            conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
+            conn.setConnectTimeout(timeout);
+            conn.setReadTimeout(timeout);
+            conn.setAllowUserInteraction(false);
+            if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                System.err.println("Callback notification to " + urlToNotify +" failed with code: "
+                        + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+                        +"\"");
+            }
+            else {
+                success = true;
+                System.out.println("Callback notification to " + urlToNotify + " succeeded");
+            }
+        } catch(IOException ioe) {
+            System.err.println("Callback notification to " + urlToNotify + " failed");
+            ioe.printStackTrace();
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+        return success;
+    }
+
+    /**
+     * Notify a server of the completion of a submitted job.
+     * @param actionResult The Action Result (failed/succeeded/running)
+     *
+     * @throws InterruptedException
+     */
+    public void notifyURL(OozieActionResult actionResult) throws InterruptedException {
+        // Do we need job-end notification?
+        if (userUrl == null) {
+            System.out.println("Callback notification URL not set, skipping.");
+            return;
+        }
+
+        //Do string replacements for final status
+        if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) {
+            userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, actionResult.toString());
+        }
+
+        // Create the URL, ensure sanity
+        try {
+            urlToNotify = new URL(userUrl);
+        } catch (MalformedURLException mue) {
+            System.err.println("Callback notification couldn't parse " + userUrl);
+            mue.printStackTrace();
+            return;
+        }
+
+        // Send notification
+        boolean success = false;
+        while (numTries-- > 0 && !success) {
+            System.out.println("Callback notification attempts left " + numTries);
+            success = notifyURLOnce();
+            if (!success) {
+                Thread.sleep(waitInterval);
+            }
+        }
+        if (!success) {
+            System.err.println("Callback notification failed to notify : " + urlToNotify);
+        } else {
+            System.out.println("Callback notification succeeded");
+        }
+    }
+}