You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:27:54 UTC

[02/10] oozie git commit: OOZIE-1770 Create Oozie Application Master for YARN (asasvari, pbacsko, rkanter, gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
new file mode 100644
index 0000000..688424b
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifierFactory.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class LauncherAMCallbackNotifierFactory {
+
+    public LauncherAMCallbackNotifier createCallbackNotifier(Configuration conf) {
+        return new LauncherAMCallbackNotifier(conf);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
index 9a411ac..0236e1b 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMain.java
@@ -30,7 +30,10 @@ import java.io.StringWriter;
 import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -45,7 +48,15 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 
 public abstract class LauncherMain {
 
@@ -56,6 +67,10 @@ public abstract class LauncherMain {
     public static final String OUTPUT_PROPERTIES = ACTION_PREFIX + "output.properties";
     public static final String HADOOP_JOBS = "hadoopJobs";
     public static final String MAPREDUCE_JOB_TAGS = "mapreduce.job.tags";
+
+    public static final String CHILD_MAPREDUCE_JOB_TAGS = "oozie.child.mapreduce.job.tags";
+    public static final String OOZIE_JOB_LAUNCH_TIME = "oozie.job.launch.time";
+
     public static final String TEZ_APPLICATION_TAGS = "tez.application.tags";
     public static final String SPARK_YARN_TAGS = "spark.yarn.tags";
     protected static String[] HADOOP_SITE_FILES = new String[]
@@ -170,6 +185,81 @@ public abstract class LauncherMain {
         }
     }
 
+    public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf) {
+        return getChildYarnJobs(actionConf, ApplicationsRequestScope.OWN);
+    }
+
+    public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope,
+                                                      long startTime) {
+        Set<ApplicationId> childYarnJobs = new HashSet<ApplicationId>();
+        String tag = actionConf.get(CHILD_MAPREDUCE_JOB_TAGS);
+        if (tag == null) {
+            System.out.print("Could not find Yarn tags property " + CHILD_MAPREDUCE_JOB_TAGS);
+            return childYarnJobs;
+        }
+        System.out.println("tag id : " + tag);
+        GetApplicationsRequest gar = GetApplicationsRequest.newInstance();
+        gar.setScope(scope);
+        gar.setApplicationTags(Collections.singleton(tag));
+        long endTime = System.currentTimeMillis();
+        if (startTime > endTime) {
+            System.out.println("WARNING: Clock skew between the Oozie server host and this host detected.  Please fix this.  " +
+                    "Attempting to work around...");
+            // We don't know which one is wrong (relative to the RM), so to be safe, let's assume they're both wrong and add an
+            // offset in both directions
+            long diff = 2 * (startTime - endTime);
+            startTime = startTime - diff;
+            endTime = endTime + diff;
+        }
+        gar.setStartRange(startTime, endTime);
+        try {
+            ApplicationClientProtocol proxy = ClientRMProxy.createRMProxy(actionConf, ApplicationClientProtocol.class);
+            GetApplicationsResponse apps = proxy.getApplications(gar);
+            List<ApplicationReport> appsList = apps.getApplicationList();
+            for(ApplicationReport appReport : appsList) {
+                childYarnJobs.add(appReport.getApplicationId());
+            }
+        } catch (YarnException | IOException ioe) {
+            throw new RuntimeException("Exception occurred while finding child jobs", ioe);
+        }
+
+        System.out.println("Child yarn jobs are found - " + StringUtils.join(childYarnJobs, ","));
+        return childYarnJobs;
+    }
+    public static Set<ApplicationId> getChildYarnJobs(Configuration actionConf, ApplicationsRequestScope scope) {
+        System.out.println("Fetching child yarn jobs");
+
+        long startTime = 0L;
+        try {
+            startTime = Long.parseLong(System.getProperty(OOZIE_JOB_LAUNCH_TIME));
+        } catch(NumberFormatException nfe) {
+            throw new RuntimeException("Could not find Oozie job launch time", nfe);
+        }
+        return getChildYarnJobs(actionConf, scope, startTime);
+    }
+
+    public static void killChildYarnJobs(Configuration actionConf) {
+        try {
+            Set<ApplicationId> childYarnJobs = getChildYarnJobs(actionConf);
+            if (!childYarnJobs.isEmpty()) {
+                System.out.println();
+                System.out.println("Found [" + childYarnJobs.size() + "] Map-Reduce jobs from this launcher");
+                System.out.println("Killing existing jobs and starting over:");
+                YarnClient yarnClient = YarnClient.createYarnClient();
+                yarnClient.init(actionConf);
+                yarnClient.start();
+                for (ApplicationId app : childYarnJobs) {
+                    System.out.print("Killing job [" + app + "] ... ");
+                    yarnClient.killApplication(app);
+                    System.out.println("Done");
+                }
+                System.out.println();
+            }
+        } catch (IOException | YarnException ye) {
+            throw new RuntimeException("Exception occurred while killing child job(s)", ye);
+        }
+    }
+
     protected abstract void run(String[] args) throws Exception;
 
     /**
@@ -181,12 +271,13 @@ public abstract class LauncherMain {
      * @param conf Configuration/Properties object to dump to STDOUT
      * @throws IOException thrown if an IO error ocurred.
      */
-    @SuppressWarnings("unchecked")
-    protected static void logMasking(String header, Collection<String> maskSet, Iterable conf) throws IOException {
+
+    protected static void logMasking(String header, Collection<String> maskSet, Iterable<Map.Entry<String,String>> conf)
+            throws IOException {
         StringWriter writer = new StringWriter();
         writer.write(header + "\n");
         writer.write("--------------------\n");
-        for (Map.Entry entry : (Iterable<Map.Entry>) conf) {
+        for (Map.Entry<String, String> entry : conf) {
             String name = (String) entry.getKey();
             String value = (String) entry.getValue();
             for (String mask : maskSet) {
@@ -221,30 +312,6 @@ public abstract class LauncherMain {
     }
 
     /**
-     * Will run the user specified OozieActionConfigurator subclass (if one is provided) to update the action configuration.
-     *
-     * @param actionConf The action configuration to update
-     * @throws OozieActionConfiguratorException
-     */
-    protected static void runConfigClass(JobConf actionConf) throws OozieActionConfiguratorException {
-        String configClass = System.getProperty(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS);
-        if (configClass != null) {
-            try {
-                Class<?> klass = Class.forName(configClass);
-                Class<? extends OozieActionConfigurator> actionConfiguratorKlass = klass.asSubclass(OozieActionConfigurator.class);
-                OozieActionConfigurator actionConfigurator = actionConfiguratorKlass.newInstance();
-                actionConfigurator.configure(actionConf);
-            } catch (ClassNotFoundException e) {
-                throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
-            } catch (InstantiationException e) {
-                throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
-            } catch (IllegalAccessException e) {
-                throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
-            }
-        }
-    }
-
-    /**
      * Read action configuration passes through action xml file.
      *
      * @return action  Configuration
@@ -268,13 +335,13 @@ public abstract class LauncherMain {
     }
 
     protected static void setYarnTag(Configuration actionConf) {
-        if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+        if(actionConf.get(CHILD_MAPREDUCE_JOB_TAGS) != null) {
             // in case the user set their own tags, appending the launcher tag.
             if(actionConf.get(MAPREDUCE_JOB_TAGS) != null) {
                 actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(MAPREDUCE_JOB_TAGS) + ","
-                        + actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+                        + actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
             } else {
-                actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+                actionConf.set(MAPREDUCE_JOB_TAGS, actionConf.get(CHILD_MAPREDUCE_JOB_TAGS));
             }
         }
     }
@@ -331,6 +398,27 @@ public abstract class LauncherMain {
         }
         copyFileMultiplex(actionXmlFile, dstFiles);
     }
+    /**
+     * Print arguments to standard output stream. Mask out argument values to option with name 'password' in them.
+     * @param banner source banner
+     * @param args arguments to be printed
+     */
+    void printArgs(String banner, String[] args) {
+        System.out.println(banner);
+        boolean maskNextArg = false;
+        for (String arg : args) {
+            if (maskNextArg) {
+                System.out.println("             " + "********");
+                maskNextArg = false;
+            }
+            else {
+                System.out.println("             " + arg);
+                if (arg.toLowerCase().contains("password")) {
+                    maskNextArg = true;
+                }
+            }
+        }
+    }
 }
 
 class LauncherMainException extends Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index 8657c67..912eba2 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -34,8 +34,8 @@ import java.security.Permission;
 import java.text.MessageFormat;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
-import java.util.Set;
 import java.util.StringTokenizer;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -49,9 +49,12 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.xml.sax.SAXException;
 
 import com.google.common.base.Strings;
+import javax.xml.parsers.ParserConfigurationException;
 
+// TODO: OYA: Delete :)
 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
 
     static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
@@ -238,7 +241,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
                         // Get what actually caused the exception
                         Throwable cause = ex.getCause();
                         // If we got a JavaMainException from JavaMain, then we need to unwrap it
-                        if (JavaMainException.class.isInstance(cause)) {
+                        if (JavaMain.JavaMainException.class.isInstance(cause)) {
                             cause = cause.getCause();
                         }
                         if (LauncherMainException.class.isInstance(cause)) {
@@ -348,9 +351,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
         // loading action conf prepared by Oozie
         Configuration actionConf = LauncherMain.loadActionConf();
 
-        if(actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS) != null) {
+        if(actionConf.get(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS) != null) {
             propagationConf.set(LauncherMain.MAPREDUCE_JOB_TAGS,
-                    actionConf.get(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS));
+                    actionConf.get(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS));
         }
 
         propagationConf.writeXml(new FileWriter(PROPAGATION_CONF_XML));
@@ -432,9 +435,8 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
             try {
                 wr = SequenceFile.createWriter(fs, getJobConf(), finalPath, Text.class, Text.class);
                 if (wr != null) {
-                    Set<String> keys = actionData.keySet();
-                    for (String propsKey : keys) {
-                        wr.append(new Text(propsKey), new Text(actionData.get(propsKey)));
+                    for (Entry<String, String> entry : actionData.entrySet()) {
+                        wr.append(new Text(entry.getKey()), new Text(entry.getValue()));
                     }
                 }
                 else {
@@ -469,9 +471,9 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
         System.setProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID, new File(ACTION_DATA_NEW_ID).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS, new File(ACTION_DATA_OUTPUT_PROPS).getAbsolutePath());
         System.setProperty(ACTION_PREFIX + ACTION_DATA_ERROR_PROPS, new File(ACTION_DATA_ERROR_PROPS).getAbsolutePath());
-        if (getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME) != null) {
-            System.setProperty(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME,
-                    getJobConf().get(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME));
+        if (getJobConf().get(LauncherMain.OOZIE_JOB_LAUNCH_TIME) != null) {
+            System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME,
+                    getJobConf().get(LauncherMain.OOZIE_JOB_LAUNCH_TIME));
         }
 
         String actionConfigClass = getJobConf().get(OOZIE_ACTION_CONFIG_CLASS);
@@ -481,7 +483,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
     }
 
     // Method to execute the prepare actions
-    private void executePrepare() throws IOException, LauncherException {
+    private void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException {
         String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
         if (prepareXML != null) {
              if (!prepareXML.equals("")) {
@@ -601,20 +603,26 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
         System.out.println("======================");
 
         File[] listOfFiles = folder.listFiles();
-        for (File fileName : listOfFiles) {
-            if (fileName.isFile()) {
-                System.out.println("File: " + fileName.getName());
-            }
-            else if (fileName.isDirectory()) {
-                System.out.println("Dir: " + fileName.getName());
-                File subDir = new File(fileName.getName());
-                File[] moreFiles = subDir.listFiles();
-                for (File subFileName : moreFiles) {
-                    if (subFileName.isFile()) {
-                        System.out.println("  File: " + subFileName.getName());
-                    }
-                    else if (subFileName.isDirectory()) {
-                        System.out.println("  Dir: " + subFileName.getName());
+
+        if (listOfFiles != null) {
+            for (File fileName : listOfFiles) {
+                if (fileName.isFile()) {
+                    System.out.println("File: " + fileName.getName());
+                }
+                else if (fileName.isDirectory()) {
+                    System.out.println("Dir: " + fileName.getName());
+                    File subDir = new File(fileName.getName());
+                    File[] moreFiles = subDir.listFiles();
+
+                    if (moreFiles != null) {
+                        for (File subFileName : moreFiles) {
+                            if (subFileName.isFile()) {
+                                System.out.println("  File: " + subFileName.getName());
+                            }
+                            else if (subFileName.isDirectory()) {
+                                System.out.println("  Dir: " + subFileName.getName());
+                            }
+                        }
                     }
                 }
             }
@@ -709,12 +717,3 @@ class LauncherSecurityManager extends SecurityManager {
     }
 }
 
-/**
- * Used by JavaMain to wrap a Throwable when an Exception occurs
- */
-@SuppressWarnings("serial")
-class JavaMainException extends Exception {
-    public JavaMainException(Throwable t) {
-        super(t);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
new file mode 100644
index 0000000..011ce93
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LocalFsOperations.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.FileVisitOption;
+import java.nio.file.FileVisitResult;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.SimpleFileVisitor;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.EnumSet;
+
+import org.apache.hadoop.conf.Configuration;
+
+public class LocalFsOperations {
+    private static final int WALK_DEPTH = 2;
+
+    /**
+     * Reads the launcher configuration "launcher.xml"
+     * @return Configuration object
+     */
+    public Configuration readLauncherConf() {
+        File confFile = new File(LauncherAM.LAUNCHER_JOB_CONF_XML);
+        Configuration conf = new Configuration(false);
+        conf.addResource(new org.apache.hadoop.fs.Path(confFile.getAbsolutePath()));
+        return conf;
+    }
+
+    /**
+     * Print files and directories in current directory. Will list files in the sub-directory (only 2 level deep)
+     * @throws IOException
+     */
+    public void printContentsOfDir(File folder) throws IOException {
+        System.out.println();
+        System.out.println("Files in current dir:" + folder.getAbsolutePath());
+        System.out.println("======================");
+
+        final Path root = folder.toPath();
+        Files.walkFileTree(root, EnumSet.of(FileVisitOption.FOLLOW_LINKS), WALK_DEPTH, new SimpleFileVisitor<Path>() {
+            @Override
+            public FileVisitResult visitFile(Path file, BasicFileAttributes attrs) throws IOException {
+                if (attrs.isRegularFile()) {
+                    System.out.println("  File: " + root.relativize(file));
+                } else if (attrs.isDirectory()) {
+                    System.out.println("  Dir: " +  root.relativize(file) + "/");
+                }
+
+                return FileVisitResult.CONTINUE;
+            }
+        });
+    }
+
+    /**
+     * Returns the contents of a file as string.
+     *
+     * @param file the File object which represents the file to be read
+     * @param type Type of the file
+     * @param maxLen Maximum allowed length
+     * @return The file contents as string
+     * @throws IOException if the file is bigger than maxLen or there is any I/O error
+     * @throws FileNotFoundException if the file does not exist
+     */
+    public String getLocalFileContentAsString(File file, String type, int maxLen) throws IOException {
+        if (file.exists()) {
+            if (maxLen > -1 && file.length() > maxLen) {
+                throw new IOException(type + " data exceeds its limit [" + maxLen + "]");
+            }
+
+            return com.google.common.io.Files.toString(file, StandardCharsets.UTF_8);
+        } else {
+            throw new FileNotFoundException("File not found: " + file.toPath().toAbsolutePath());
+        }
+    }
+
+    /**
+     * Checks if a given File exists or not. This method helps writing unit tests.
+     */
+    public boolean fileExists(File file) {
+        return file.exists();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
index d376057..e0974e8 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/MapReduceMain.java
@@ -52,7 +52,7 @@ public class MapReduceMain extends LauncherMain {
 
         JobConf jobConf = new JobConf();
         addActionConf(jobConf, actionConf);
-        LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
+        LauncherMain.killChildYarnJobs(jobConf);
 
         // Run a config class if given to update the job conf
         runConfigClass(jobConf);
@@ -132,31 +132,27 @@ public class MapReduceMain extends LauncherMain {
         return runJob;
     }
 
-    @SuppressWarnings("unchecked")
     protected JobClient createJobClient(JobConf jobConf) throws IOException {
         return new JobClient(jobConf);
     }
 
-    // allows any character in the value, the conf.setStrings() does not allow
-    // commas
-    public static void setStrings(Configuration conf, String key, String[] values) {
-        if (values != null) {
-            conf.setInt(key + ".size", values.length);
-            for (int i = 0; i < values.length; i++) {
-                conf.set(key + "." + i, values[i]);
-            }
-        }
-    }
-
-    public static String[] getStrings(Configuration conf, String key) {
-        String[] values = new String[conf.getInt(key + ".size", 0)];
-        for (int i = 0; i < values.length; i++) {
-            values[i] = conf.get(key + "." + i);
-            if (values[i] == null) {
-                values[i] = "";
+    /**
+     * Will run the user specified OozieActionConfigurator subclass (if one is provided) to update the action configuration.
+     *
+     * @param actionConf The action configuration to update
+     * @throws OozieActionConfiguratorException
+     */
+    private static void runConfigClass(JobConf actionConf) throws OozieActionConfiguratorException {
+        String configClass = actionConf.get(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS);
+        if (configClass != null) {
+            try {
+                Class<?> klass = Class.forName(configClass);
+                Class<? extends OozieActionConfigurator> actionConfiguratorKlass = klass.asSubclass(OozieActionConfigurator.class);
+                OozieActionConfigurator actionConfigurator = actionConfiguratorKlass.newInstance();
+                actionConfigurator.configure(actionConf);
+            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
+                throw new OozieActionConfiguratorException("An Exception occurred while instantiating the action config class", e);
             }
         }
-        return values;
     }
-
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
index 21ae456..cb5b1ac 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
@@ -38,7 +38,9 @@ import javax.xml.parsers.ParserConfigurationException;
  * Utility class to perform operations on the prepare block of Workflow
  *
  */
+@Deprecated
 public class PrepareActionsDriver {
+    private static final PrepareActionsHandler prepareHandler = new PrepareActionsHandler();
 
     /**
      * Method to parse the prepare XML and execute the corresponding prepare actions
@@ -46,52 +48,9 @@ public class PrepareActionsDriver {
      * @param prepareXML Prepare XML block in string format
      * @throws LauncherException
      */
-    static void doOperations(String prepareXML, Configuration conf) throws LauncherException {
-        try {
-            Document doc = getDocumentFromXML(prepareXML);
-            doc.getDocumentElement().normalize();
-
-            // Get the list of child nodes, basically, each one corresponding to a separate action
-            NodeList nl = doc.getDocumentElement().getChildNodes();
-            LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
-
-            for (int i = 0; i < nl.getLength(); ++i) {
-                Node n = nl.item(i);
-                String operation = n.getNodeName();
-                if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
-                    continue;
-                }
-                String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
-                // use Path to avoid URIsyntax error caused by square bracket in glob
-                URI uri = new Path(pathStr).toUri();
-                LauncherURIHandler handler = factory.getURIHandler(uri);
-                execute(operation, uri, handler, conf);
-            }
-        } catch (IOException ioe) {
-            throw new LauncherException(ioe.getMessage(), ioe);
-        } catch (SAXException saxe) {
-            throw new LauncherException(saxe.getMessage(), saxe);
-        } catch (ParserConfigurationException pce) {
-            throw new LauncherException(pce.getMessage(), pce);
-        } catch (IllegalArgumentException use) {
-            throw new LauncherException(use.getMessage(), use);
-        }
-    }
-
-    /**
-     * Method to execute the prepare actions based on the command
-     *
-     * @param n Child node of the prepare XML
-     * @throws LauncherException
-     */
-    private static void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
-            throws LauncherException {
-        if (operation.equals("delete")) {
-            handler.delete(uri, conf);
-        }
-        else if (operation.equals("mkdir")) {
-            handler.create(uri, conf);
-        }
+    static void doOperations(String prepareXML, Configuration conf)
+            throws IOException, SAXException, ParserConfigurationException, LauncherException {
+        prepareHandler.prepareAction(prepareXML, conf);
     }
 
     // Method to return the document from the prepare XML block

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
new file mode 100644
index 0000000..b5377b1
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsHandler.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.w3c.dom.Document;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.xml.sax.SAXException;
+
+public class PrepareActionsHandler {
+
+    /**
+     * Method to parse the prepare XML and execute the corresponding prepare actions
+     *
+     * @param prepareXML Prepare XML block in string format
+     * @throws LauncherException
+     */
+    public void prepareAction(String prepareXML, Configuration conf)
+            throws IOException, SAXException, ParserConfigurationException, LauncherException {
+        Document doc = getDocumentFromXML(prepareXML);
+        doc.getDocumentElement().normalize();
+
+        // Get the list of child nodes, basically, each one corresponding to a separate action
+        NodeList nl = doc.getDocumentElement().getChildNodes();
+        LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
+
+        for (int i = 0; i < nl.getLength(); ++i) {
+            Node n = nl.item(i);
+            String operation = n.getNodeName();
+            if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
+                continue;
+            }
+            String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
+            // use Path to avoid URIsyntax error caused by square bracket in glob
+            URI uri = new Path(pathStr).toUri();
+            LauncherURIHandler handler = factory.getURIHandler(uri);
+            execute(operation, uri, handler, conf);
+        }
+    }
+
+    private void execute(String operation, URI uri, LauncherURIHandler handler, Configuration conf)
+            throws LauncherException {
+
+        switch (operation) {
+            case "delete":
+                handler.delete(uri, conf);
+                break;
+
+            case "mkdir":
+                handler.create(uri, conf);
+                break;
+
+            default:
+                System.out.println("Warning: unknown prepare operation " + operation + " -- skipping");
+            }
+    }
+
+    // Method to return the document from the prepare XML block
+    static Document getDocumentFromXML(String prepareXML) throws ParserConfigurationException, SAXException,
+            IOException {
+        DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory.newInstance();
+        docBuilderFactory.setNamespaceAware(true);
+        // support for includes in the xml file
+        docBuilderFactory.setXIncludeAware(true);
+        // ignore all comments inside the xml file
+        docBuilderFactory.setIgnoringComments(true);
+        docBuilderFactory.setExpandEntityReferences(false);
+        docBuilderFactory.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
+        DocumentBuilder docBuilder = docBuilderFactory.newDocumentBuilder();
+        InputStream is = new ByteArrayInputStream(prepareXML.getBytes("UTF-8"));
+        return docBuilder.parse(is);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
new file mode 100644
index 0000000..8d986af
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/SequenceFileWriterFactory.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+
+public class SequenceFileWriterFactory {
+
+    public SequenceFile.Writer createSequenceFileWriter(Configuration launcherJobConf, Path finalPath,
+            Class<?> keyClass, Class<?> valueClass) throws IOException {
+        return SequenceFile.createWriter(launcherJobConf,
+                SequenceFile.Writer.file(finalPath),
+                SequenceFile.Writer.keyClass(keyClass),
+                SequenceFile.Writer.valueClass(valueClass));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
index f109318..0ee35e8 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/ShellMain.java
@@ -24,10 +24,10 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.FileWriter;
-import java.io.PrintWriter;
-import java.io.StringReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import java.io.StringReader;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -186,7 +186,7 @@ public class ShellMain extends LauncherMain {
      */
     private Map<String, String> getEnvMap(Map<String, String> envp, Configuration actionConf) {
         // Adding user-specified environments
-        String[] envs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS);
+        String[] envs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ENVS);
         for (String env : envs) {
             String[] varValue = env.split("=",2); // Error case is handled in
                                                 // ShellActionExecutor
@@ -339,7 +339,7 @@ public class ShellMain extends LauncherMain {
      */
     protected List<String> getShellArguments(Configuration actionConf) {
         List<String> arguments = new ArrayList<String>();
-        String[] scrArgs = MapReduceMain.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS);
+        String[] scrArgs = ActionUtils.getStrings(actionConf, CONF_OOZIE_SHELL_ARGS);
         for (String scrArg : scrArgs) {
             arguments.add(scrArg);
         }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java
new file mode 100644
index 0000000..718bf64
--- /dev/null
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/LauncherAMTestMainClass.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+public class LauncherAMTestMainClass {
+    public static final String SECURITY_EXCEPTION = "security";
+    public static final String LAUNCHER_EXCEPTION = "launcher";
+    public static final String JAVA_EXCEPTION = "java";
+    public static final String THROWABLE = "throwable";
+
+    public static final String JAVA_EXCEPTION_MESSAGE = "Java Exception";
+    public static final String SECURITY_EXCEPTION_MESSAGE = "Security Exception";
+    public static final String THROWABLE_MESSAGE = "Throwable";
+    public static final int LAUNCHER_ERROR_CODE = 1234;
+
+    public static void main(String args[]) throws Throwable {
+        System.out.println("Invocation of TestMain");
+
+        if (args != null && args.length == 1) {
+            switch (args[0]){
+                case JAVA_EXCEPTION:
+                    throw new JavaMain.JavaMainException(new RuntimeException(JAVA_EXCEPTION_MESSAGE));
+                case LAUNCHER_EXCEPTION:
+                    throw new LauncherMainException(LAUNCHER_ERROR_CODE);
+                case SECURITY_EXCEPTION:
+                    throw new SecurityException(SECURITY_EXCEPTION_MESSAGE);
+                case THROWABLE:
+                    throw new Throwable(THROWABLE_MESSAGE);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
new file mode 100644
index 0000000..68c0f4b
--- /dev/null
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestHdfsOperations.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import static org.junit.Assert.assertEquals;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.willThrow;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestHdfsOperations {
+    @Mock
+    private SequenceFileWriterFactory seqFileWriterFactoryMock;
+
+    @Mock
+    private SequenceFile.Writer writerMock;
+
+    @Mock
+    private UserGroupInformation ugiMock;
+
+    @Mock
+    private Configuration configurationMock;
+
+    private Path path = new Path(".");
+
+    private Map<String, String> actionData = new HashMap<>();
+
+    @InjectMocks
+    private HdfsOperations hdfsOperations;
+
+    @Before
+    public void setup() throws Exception {
+        configureMocksForHappyPath();
+        actionData.put("testKey", "testValue");
+    }
+
+    @Test
+    public void testActionDataUploadToHdfsSucceeds() throws Exception {
+        hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData);
+
+        verify(seqFileWriterFactoryMock).createSequenceFileWriter(eq(configurationMock),
+                any(Path.class), eq(Text.class), eq(Text.class));
+        ArgumentCaptor<Text> keyCaptor = ArgumentCaptor.forClass(Text.class);
+        ArgumentCaptor<Text> valueCaptor = ArgumentCaptor.forClass(Text.class);
+        verify(writerMock).append(keyCaptor.capture(), valueCaptor.capture());
+        assertEquals("testKey", keyCaptor.getValue().toString());
+        assertEquals("testValue", valueCaptor.getValue().toString());
+    }
+
+    @Test(expected = IOException.class)
+    public void testActionDataUploadToHdfsFailsWhenAppendingToWriter() throws Exception {
+        willThrow(new IOException()).given(writerMock).append(any(Text.class), any(Text.class));
+
+        hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData);
+    }
+
+    @Test(expected = IOException.class)
+    public void testActionDataUploadToHdfsFailsWhenWriterIsNull() throws Exception {
+        given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock),
+                any(Path.class), eq(Text.class), eq(Text.class))).willReturn(null);
+
+        hdfsOperations.uploadActionDataToHDFS(configurationMock, path, actionData);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void configureMocksForHappyPath() throws Exception {
+        given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0];
+                return action.run();
+            }
+        });
+
+        given(seqFileWriterFactoryMock.createSequenceFileWriter(eq(configurationMock),
+                any(Path.class), eq(Text.class), eq(Text.class))).willReturn(writerMock);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
new file mode 100644
index 0000000..9cdedb7
--- /dev/null
+++ b/sharelib/oozie/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
@@ -0,0 +1,641 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS;
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_NEW_ID;
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_OUTPUT_PROPS;
+import static org.apache.oozie.action.hadoop.LauncherAM.ACTION_DATA_STATS;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.JAVA_EXCEPTION;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.JAVA_EXCEPTION_MESSAGE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.LAUNCHER_ERROR_CODE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.LAUNCHER_EXCEPTION;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.SECURITY_EXCEPTION;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.SECURITY_EXCEPTION_MESSAGE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.THROWABLE;
+import static org.apache.oozie.action.hadoop.LauncherAMTestMainClass.THROWABLE_MESSAGE;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.hasKey;
+import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.hamcrest.Matchers.nullValue;
+
+import static org.mockito.BDDMockito.given;
+import static org.mockito.BDDMockito.willReturn;
+import static org.mockito.BDDMockito.willThrow;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.StringReader;
+import java.security.PrivilegedExceptionAction;
+import java.text.MessageFormat;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.action.hadoop.LauncherAM.LauncherSecurityManager;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+@RunWith(MockitoJUnitRunner.class)
+public class TestLauncherAM {
+    private static final String DEFAULT_CONTAINER_ID = "container_1479473450392_0001_01_000001";
+    private static final String ACTIONDATA_ERROR_PROPERTIES = "error.properties";
+    private static final String ACTIONDATA_FINAL_STATUS_PROPERTY = "final.status";
+    private static final String ERROR_CODE_PROPERTY = "error.code";
+    private static final String EXCEPTION_STACKTRACE_PROPERTY = "exception.stacktrace";
+    private static final String EXCEPTION_MESSAGE_PROPERTY = "exception.message";
+    private static final String ERROR_REASON_PROPERTY = "error.reason";
+
+    private static final String EMPTY_STRING = "";
+    private static final String EXIT_CODE_1 = "1";
+    private static final String EXIT_CODE_0 = "0";
+    private static final String DUMMY_XML = "<dummy>dummyXml</dummy>";
+
+    @Rule
+    public ExpectedException thrown = ExpectedException.none();
+
+    @Mock
+    private UserGroupInformation ugiMock;
+
+    @Mock
+    private AMRMClientAsyncFactory amRMClientAsyncFactoryMock;
+
+    @Mock
+    private AMRMClientAsync<?> amRmAsyncClientMock;
+
+    @Mock
+    private AMRMCallBackHandler callbackHandlerMock;
+
+    @Mock
+    private HdfsOperations hdfsOperationsMock;
+
+    @Mock
+    private LocalFsOperations localFsOperationsMock;
+
+    @Mock
+    private PrepareActionsHandler prepareHandlerMock;
+
+    @Mock
+    private LauncherAMCallbackNotifierFactory launcherCallbackNotifierFactoryMock;
+
+    @Mock
+    private LauncherAMCallbackNotifier launcherCallbackNotifierMock;
+
+    @Mock
+    private LauncherSecurityManager launcherSecurityManagerMock;
+
+    private Configuration launcherJobConfig = new Configuration();
+
+    private String containerId = DEFAULT_CONTAINER_ID;
+
+    private String applicationId = ConverterUtils.toContainerId(containerId)
+            .getApplicationAttemptId().getApplicationId().toString();
+
+    private LauncherAM launcherAM;
+
+    private ExpectedFailureDetails failureDetails = new ExpectedFailureDetails();
+
+    @Before
+    public void setup() throws Exception {
+        configureMocksForHappyPath();
+        launcherJobConfig.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, "1");
+        instantiateLauncher();
+    }
+
+    @Test
+    public void testMainIsSuccessfullyInvokedWithActionData() throws Exception {
+        setupActionOutputContents();
+
+        executeLauncher();
+
+        verifyZeroInteractions(prepareHandlerMock);
+        assertSuccessfulExecution(OozieActionResult.RUNNING);
+        assertActionOutputDataPresentAndCorrect();
+    }
+
+    @Test
+    public void testMainIsSuccessfullyInvokedWithoutActionData() throws Exception {
+        executeLauncher();
+
+        verifyZeroInteractions(prepareHandlerMock);
+        assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+        assertNoActionOutputData();
+    }
+
+    @Test
+    public void testActionHasPrepareXML() throws Exception {
+        launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, DUMMY_XML);
+
+        executeLauncher();
+
+        verify(prepareHandlerMock).prepareAction(eq(DUMMY_XML), any(Configuration.class));
+        assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+    }
+
+    @Test
+    public void testActionHasEmptyPrepareXML() throws Exception {
+        launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, EMPTY_STRING);
+
+        executeLauncher();
+
+        verifyZeroInteractions(prepareHandlerMock);
+        assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+        assertNoActionOutputData();
+    }
+
+    @Test
+    public void testLauncherClassNotDefined() throws Exception {
+        launcherJobConfig.unset(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS);
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage("Launcher class should not be null")
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason("Launcher class should not be null")
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testMainIsSuccessfullyInvokedAndAsyncErrorReceived() throws Exception {
+        ErrorHolder errorHolder = new ErrorHolder();
+        errorHolder.setErrorCode(6);
+        errorHolder.setErrorMessage("dummy error");
+        errorHolder.setErrorCause(new Exception());
+        given(callbackHandlerMock.getError()).willReturn(errorHolder);
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage(null)
+                    .expectedErrorCode("6")
+                    .expectedErrorReason("dummy error")
+                    .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testMainClassNotFound() throws Exception {
+        launcherJobConfig.set(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, "org.apache.non.existing.Klass");
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage(ClassNotFoundException.class.getCanonicalName())
+                .expectedErrorCode(EXIT_CODE_0)
+                .expectedErrorReason(ClassNotFoundException.class.getCanonicalName())
+                .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testLauncherJobConfCannotBeLoaded() throws Exception {
+        given(localFsOperationsMock.readLauncherConf()).willThrow(new RuntimeException());
+        thrown.expect(RuntimeException.class);
+
+        try {
+            executeLauncher();
+        } finally {
+            failureDetails.expectedExceptionMessage(null)
+                .expectedErrorCode(EXIT_CODE_0)
+                .expectedErrorReason("Could not load the Launcher AM configuration file")
+                .withStackTrace();
+
+            assertFailedExecution();
+        }
+    }
+
+    @Test
+    public void testActionPrepareFails() throws Exception {
+        launcherJobConfig.set(LauncherAM.ACTION_PREPARE_XML, DUMMY_XML);
+        willThrow(new IOException()).given(prepareHandlerMock).prepareAction(anyString(), any(Configuration.class));
+        thrown.expect(IOException.class);
+
+        try {
+            executeLauncher();
+        } finally {
+            failureDetails.expectedExceptionMessage(null)
+                .expectedErrorCode(EXIT_CODE_0)
+                .expectedErrorReason("Prepare execution in the Launcher AM has failed")
+                .withStackTrace();
+
+            assertFailedExecution();
+        }
+    }
+
+    @Test
+    public void testActionThrowsJavaMainException() throws Exception {
+        setupArgsForMainClass(JAVA_EXCEPTION);
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage(JAVA_EXCEPTION_MESSAGE)
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason(JAVA_EXCEPTION_MESSAGE)
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testActionThrowsLauncherException() throws Exception {
+        setupArgsForMainClass(LAUNCHER_EXCEPTION);
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage(null)
+            .expectedErrorCode(String.valueOf(LAUNCHER_ERROR_CODE))
+            .expectedErrorReason("exit code [" + LAUNCHER_ERROR_CODE + "]")
+            .withoutStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testActionThrowsSecurityExceptionWithExitCode0() throws Exception {
+        setupArgsForMainClass(SECURITY_EXCEPTION);
+        given(launcherSecurityManagerMock.getExitInvoked()).willReturn(true);
+        given(launcherSecurityManagerMock.getExitCode()).willReturn(0);
+
+        executeLauncher();
+
+        assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+    }
+
+    @Test
+    public void testActionThrowsSecurityExceptionWithExitCode1() throws Exception {
+        setupArgsForMainClass(SECURITY_EXCEPTION);
+        given(launcherSecurityManagerMock.getExitInvoked()).willReturn(true);
+        given(launcherSecurityManagerMock.getExitCode()).willReturn(1);
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage(null)
+            .expectedErrorCode(EXIT_CODE_1)
+            .expectedErrorReason("exit code ["+ EXIT_CODE_1 + "]")
+            .withoutStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testActionThrowsSecurityExceptionWithoutSystemExit() throws Exception {
+        setupArgsForMainClass(SECURITY_EXCEPTION);
+        given(launcherSecurityManagerMock.getExitInvoked()).willReturn(false);
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage(SECURITY_EXCEPTION_MESSAGE)
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason(SECURITY_EXCEPTION_MESSAGE)
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testActionThrowsThrowable() throws Exception {
+        setupArgsForMainClass(THROWABLE);
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage(THROWABLE_MESSAGE)
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason(THROWABLE_MESSAGE)
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testActionThrowsThrowableAndAsyncErrorReceived() throws Exception {
+        setupArgsForMainClass(THROWABLE);
+        ErrorHolder errorHolder = new ErrorHolder();
+        errorHolder.setErrorCode(6);
+        errorHolder.setErrorMessage("dummy error");
+        errorHolder.setErrorCause(new Exception());
+        given(callbackHandlerMock.getError()).willReturn(errorHolder);
+
+        executeLauncher();
+
+        // sync problem overrides async problem
+        failureDetails.expectedExceptionMessage(THROWABLE_MESSAGE)
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason(THROWABLE_MESSAGE)
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testYarnUnregisterFails() throws Exception {
+        willThrow(new IOException()).given(amRmAsyncClientMock).unregisterApplicationMaster(any(FinalApplicationStatus.class),
+                anyString(), anyString());
+        thrown.expect(IOException.class);
+
+        try {
+            executeLauncher();
+        } finally {
+            // TODO: check if this behaviour is correct (url callback: successful, but unregister fails)
+            assertSuccessfulExecution(OozieActionResult.SUCCEEDED);
+        }
+    }
+
+    @Test
+    public void testUpdateActionDataFailsWithActionError() throws Exception {
+        setupActionOutputContents();
+        given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_EXTERNAL_CHILD_IDS), anyInt()))
+            .willThrow(new IOException());
+        thrown.expect(IOException.class);
+
+        try {
+            executeLauncher();
+        } finally {
+            Map<String, String> actionData = launcherAM.getActionData();
+            assertThat(actionData, not(hasKey(ACTION_DATA_EXTERNAL_CHILD_IDS)));
+            verify(launcherCallbackNotifierMock).notifyURL(OozieActionResult.FAILED);
+        }
+    }
+
+    @Test
+    public void testRecoveryIdNotSet() throws Exception {
+        launcherJobConfig.unset(LauncherMapper.OOZIE_ACTION_RECOVERY_ID);
+        instantiateLauncher();
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage("IO error")
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason("IO error")
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testRecoveryIdExistsAndRecoveryIsdMatch() throws Exception {
+        given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+        given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(applicationId);
+
+        executeLauncher();
+
+        verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+    }
+
+    @Test
+    public void testRecoveryIdExistsAndRecoveryIdsDoNotMatch() throws Exception {
+        String newAppId = "not_matching_appid";
+        given(hdfsOperationsMock.fileExists(any(Path.class), eq(launcherJobConfig))).willReturn(true);
+        given(hdfsOperationsMock.readFileContents(any(Path.class), eq(launcherJobConfig))).willReturn(newAppId);
+
+        executeLauncher();
+
+        String errorMessage = MessageFormat.format(
+                "YARN Id mismatch, action file [{0}] declares Id [{1}] current Id [{2}]", "dummy/1",
+                newAppId,
+                applicationId);
+
+        failureDetails.expectedExceptionMessage(errorMessage)
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason(errorMessage)
+            .withStackTrace();
+
+        verify(hdfsOperationsMock).readFileContents(any(Path.class), eq(launcherJobConfig));
+        assertFailedExecution();
+    }
+
+    @Test
+    public void testReadingRecoveryIdFails() throws Exception {
+        willThrow(new IOException()).given(hdfsOperationsMock)
+            .writeStringToFile(any(Path.class), eq(launcherJobConfig), eq(applicationId));
+
+        executeLauncher();
+
+        failureDetails.expectedExceptionMessage("IO error")
+            .expectedErrorCode(EXIT_CODE_0)
+            .expectedErrorReason("IO error")
+            .withStackTrace();
+
+        assertFailedExecution();
+    }
+
+    private void instantiateLauncher() {
+        launcherAM = new LauncherAM(ugiMock,
+                amRMClientAsyncFactoryMock,
+                callbackHandlerMock,
+                hdfsOperationsMock,
+                localFsOperationsMock,
+                prepareHandlerMock,
+                launcherCallbackNotifierFactoryMock,
+                launcherSecurityManagerMock,
+                containerId);
+    }
+
+     @SuppressWarnings("unchecked")
+    private void configureMocksForHappyPath() throws Exception {
+        launcherJobConfig.set(LauncherAM.OOZIE_ACTION_DIR_PATH, "dummy");
+        launcherJobConfig.set(LauncherAM.OOZIE_JOB_ID, "dummy");
+        launcherJobConfig.set(LauncherAM.OOZIE_ACTION_ID, "dummy");
+        launcherJobConfig.set(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, LauncherAMTestMainClass.class.getCanonicalName());
+
+        given(localFsOperationsMock.readLauncherConf()).willReturn(launcherJobConfig);
+        given(localFsOperationsMock.fileExists(any(File.class))).willReturn(true);
+        willReturn(amRmAsyncClientMock).given(amRMClientAsyncFactoryMock).createAMRMClientAsync(anyInt());
+        given(ugiMock.doAs(any(PrivilegedExceptionAction.class))).willAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                PrivilegedExceptionAction<?> action = (PrivilegedExceptionAction<?>) invocation.getArguments()[0];
+                return action.run();
+            }
+        });
+        given(launcherCallbackNotifierFactoryMock.createCallbackNotifier(any(Configuration.class)))
+            .willReturn(launcherCallbackNotifierMock);
+    }
+
+    private void setupActionOutputContents() throws IOException {
+        // output files generated by an action
+        given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_EXTERNAL_CHILD_IDS), anyInt()))
+            .willReturn(ACTION_DATA_EXTERNAL_CHILD_IDS);
+
+        given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_NEW_ID), anyInt()))
+            .willReturn(ACTION_DATA_NEW_ID);
+
+        given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_OUTPUT_PROPS), anyInt()))
+            .willReturn(ACTION_DATA_OUTPUT_PROPS);
+
+        given(localFsOperationsMock.getLocalFileContentAsString(any(File.class), eq(ACTION_DATA_STATS), anyInt()))
+            .willReturn(ACTION_DATA_STATS);
+    }
+
+    private void setupArgsForMainClass(final String...  args) {
+        launcherJobConfig.set(String.valueOf(LauncherAM.CONF_OOZIE_ACTION_MAIN_ARG_COUNT), String.valueOf(args.length));
+
+        for (int i = 0; i < args.length; i++) {
+            launcherJobConfig.set(String.valueOf(LauncherAM.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i), args[i]);
+        }
+    }
+
+    private void executeLauncher() throws Exception {
+        launcherAM.run();
+    }
+
+    @SuppressWarnings("unchecked")
+    private void assertSuccessfulExecution(OozieActionResult actionResult) throws Exception {
+        verify(amRmAsyncClientMock).registerApplicationMaster(anyString(), anyInt(), anyString());
+        verify(amRmAsyncClientMock).unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED, EMPTY_STRING, EMPTY_STRING);
+        verify(amRmAsyncClientMock).stop();
+        verify(ugiMock, times(2)).doAs(any(PrivilegedExceptionAction.class)); // prepare & action main
+        verify(hdfsOperationsMock).uploadActionDataToHDFS(any(Configuration.class), any(Path.class), any(Map.class));
+        verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class));
+        verify(launcherCallbackNotifierMock).notifyURL(actionResult);
+        verify(hdfsOperationsMock).writeStringToFile(any(Path.class), any(Configuration.class), any(String.class));
+
+        Map<String, String> actionData = launcherAM.getActionData();
+        verifyFinalStatus(actionData, actionResult);
+        verifyNoError(actionData);
+    }
+
+    private void assertActionOutputDataPresentAndCorrect() {
+        Map<String, String> actionData = launcherAM.getActionData();
+        String extChildId = actionData.get(ACTION_DATA_EXTERNAL_CHILD_IDS);
+        String stats = actionData.get(ACTION_DATA_STATS);
+        String output = actionData.get(ACTION_DATA_OUTPUT_PROPS);
+        String idSwap = actionData.get(ACTION_DATA_NEW_ID);
+
+        assertThat("extChildID output", ACTION_DATA_EXTERNAL_CHILD_IDS, equalTo(extChildId));
+        assertThat("stats output", ACTION_DATA_STATS, equalTo(stats));
+        assertThat("action output", ACTION_DATA_OUTPUT_PROPS, equalTo(output));
+        assertThat("idSwap output", ACTION_DATA_NEW_ID, equalTo(idSwap));
+    }
+
+    private void assertNoActionOutputData() {
+        Map<String, String> actionData = launcherAM.getActionData();
+        String extChildId = actionData.get(ACTION_DATA_EXTERNAL_CHILD_IDS);
+        String stats = actionData.get(ACTION_DATA_STATS);
+        String output = actionData.get(ACTION_DATA_OUTPUT_PROPS);
+        String idSwap = actionData.get(ACTION_DATA_NEW_ID);
+
+        assertThat("extChildId", extChildId, nullValue());
+        assertThat("stats", stats, nullValue());
+        assertThat("Output", output, nullValue());
+        assertThat("idSwap", idSwap, nullValue());
+    }
+
+    private void assertFailedExecution() throws Exception {
+        Map<String, String> actionData = launcherAM.getActionData();
+        verify(launcherCallbackNotifierFactoryMock).createCallbackNotifier(any(Configuration.class));
+        verify(launcherCallbackNotifierMock).notifyURL(OozieActionResult.FAILED);
+        verifyFinalStatus(actionData, OozieActionResult.FAILED);
+
+        // Note: actionData contains properties inside a property, so we have to extract them into a new Property object
+        String fullError = actionData.get(ACTIONDATA_ERROR_PROPERTIES);
+        Properties props = new Properties();
+        props.load(new StringReader(fullError));
+
+        String errorReason = props.getProperty(ERROR_REASON_PROPERTY);
+        if (failureDetails.expectedErrorReason != null) {
+            assertThat("errorReason", errorReason, containsString(failureDetails.expectedErrorReason));
+        } else {
+            assertThat("errorReason", errorReason, nullValue());
+        }
+
+        String exceptionMessage = props.getProperty(EXCEPTION_MESSAGE_PROPERTY);
+        if (failureDetails.expectedExceptionMessage != null) {
+            assertThat("exceptionMessage", exceptionMessage, containsString(failureDetails.expectedExceptionMessage));
+        } else {
+            assertThat("exceptionMessage", exceptionMessage, nullValue());
+        }
+
+        String stackTrace = props.getProperty(EXCEPTION_STACKTRACE_PROPERTY);
+        if (failureDetails.hasStackTrace) {
+            assertThat("stackTrace", stackTrace, notNullValue());
+        } else {
+            assertThat("stackTrace", stackTrace, nullValue());
+        }
+
+        String errorCode = props.getProperty(ERROR_CODE_PROPERTY);
+        assertThat("errorCode", errorCode, equalTo(failureDetails.expectedErrorCode));
+    }
+
+    private void verifyFinalStatus(Map<String, String> actionData, OozieActionResult actionResult) {
+        String finalStatus = actionData.get(ACTIONDATA_FINAL_STATUS_PROPERTY);
+        assertThat("actionResult", actionResult.toString(), equalTo(finalStatus));
+    }
+
+    private void verifyNoError(Map<String, String> actionData) {
+        String fullError = actionData.get(ACTIONDATA_ERROR_PROPERTIES);
+        assertThat("error properties", fullError, nullValue());
+    }
+
+    private class ExpectedFailureDetails {
+        String expectedExceptionMessage;
+        String expectedErrorCode;
+        String expectedErrorReason;
+        boolean hasStackTrace;
+
+        public ExpectedFailureDetails expectedExceptionMessage(String expectedExceptionMessage) {
+            this.expectedExceptionMessage = expectedExceptionMessage;
+            return this;
+        }
+
+        public ExpectedFailureDetails expectedErrorCode(String expectedErrorCode) {
+            this.expectedErrorCode = expectedErrorCode;
+            return this;
+        }
+
+        public ExpectedFailureDetails expectedErrorReason(String expectedErrorReason) {
+            this.expectedErrorReason = expectedErrorReason;
+            return this;
+        }
+
+        public ExpectedFailureDetails withStackTrace() {
+            this.hasStackTrace = true;
+            return this;
+        }
+
+        public ExpectedFailureDetails withoutStackTrace() {
+            this.hasStackTrace = false;
+            return this;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/pig/pom.xml b/sharelib/pig/pom.xml
index 99148d7..8f74ded 100644
--- a/sharelib/pig/pom.xml
+++ b/sharelib/pig/pom.xml
@@ -82,9 +82,9 @@
         </dependency>
 
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-client</artifactId>
-            <scope>provided</scope>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
@@ -103,8 +103,8 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-hadoop-utils</artifactId>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
             <scope>provided</scope>
         </dependency>
         <dependency>
@@ -170,18 +170,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
index 11cc7ee..5a9123a 100644
--- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
+++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMain.java
@@ -131,7 +131,8 @@ public class PigMain extends LauncherMain {
         pigProperties.store(os, "");
         os.close();
 
-        logMasking("pig.properties:", Arrays.asList("password"), pigProperties.entrySet());
+        logMasking("pig.properties:", Arrays.asList("password"),
+                (Iterable<Map.Entry<String, String>>)(Iterable<?>) pigProperties.entrySet());
 
         List<String> arguments = new ArrayList<String>();
         String script = actionConf.get(PigActionExecutor.PIG_SCRIPT);
@@ -148,7 +149,7 @@ public class PigMain extends LauncherMain {
 
         arguments.add("-file");
         arguments.add(script);
-        String[] params = MapReduceMain.getStrings(actionConf, PigActionExecutor.PIG_PARAMS);
+        String[] params = ActionUtils.getStrings(actionConf, PigActionExecutor.PIG_PARAMS);
         for (String param : params) {
             arguments.add("-param");
             arguments.add(param);
@@ -193,7 +194,7 @@ public class PigMain extends LauncherMain {
         arguments.add("-logfile");
         arguments.add(pigLog);
 
-        String[] pigArgs = MapReduceMain.getStrings(actionConf, PigActionExecutor.PIG_ARGS);
+        String[] pigArgs = ActionUtils.getStrings(actionConf, PigActionExecutor.PIG_ARGS);
         for (String pigArg : pigArgs) {
             if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
                 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");
@@ -212,7 +213,7 @@ public class PigMain extends LauncherMain {
             System.out.println("             " + arg);
         }
 
-        LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
+        LauncherMain.killChildYarnJobs(actionConf);
 
         System.out.println("=================================================================");
         System.out.println();

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
index 503d0eb..0ee4b0b 100644
--- a/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
+++ b/sharelib/pig/src/main/java/org/apache/oozie/action/hadoop/PigMainWithOldAPI.java
@@ -135,7 +135,7 @@ public class PigMainWithOldAPI extends LauncherMain {
 
         arguments.add("-file");
         arguments.add(script);
-        String[] params = MapReduceMain.getStrings(actionConf, "oozie.pig.params");
+        String[] params = ActionUtils.getStrings(actionConf, "oozie.pig.params");
         for (String param : params) {
             arguments.add("-param");
             arguments.add(param);
@@ -178,7 +178,7 @@ public class PigMainWithOldAPI extends LauncherMain {
         arguments.add("-logfile");
         arguments.add(pigLog);
 
-        String[] pigArgs = MapReduceMain.getStrings(actionConf, "oozie.pig.args");
+        String[] pigArgs = ActionUtils.getStrings(actionConf, "oozie.pig.args");
         for (String pigArg : pigArgs) {
             if (DISALLOWED_PIG_OPTIONS.contains(pigArg)) {
                 throw new RuntimeException("Error: Pig argument " + pigArg + " is not supported");