You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:28:01 UTC

[09/10] oozie git commit: OOZIE-1770 Create Oozie Application Master for YARN (asasvari, pbacsko, rkanter, gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 06ae5fd..f4c1127 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -25,21 +25,22 @@ import java.net.ConnectException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
-import java.security.PrivilegedExceptionAction;
+import java.nio.ByteBuffer;
 import java.text.MessageFormat;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
 
+import org.apache.commons.io.IOUtils;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.primitives.Ints;
 import org.apache.hadoop.conf.Configuration;
@@ -48,23 +49,41 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AccessControlException;
-import org.apache.oozie.hadoop.utils.HadoopShims;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.TaskLog;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.Apps;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.command.coord.CoordActionStartXCommand;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -72,8 +91,8 @@ import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.URIHandlerService;
-import org.apache.oozie.service.UserGroupInformationService;
 import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClasspathUtils;
 import org.apache.oozie.util.ELEvaluationException;
 import org.apache.oozie.util.ELEvaluator;
 import org.apache.oozie.util.JobUtils;
@@ -86,18 +105,22 @@ import org.jdom.Element;
 import org.jdom.JDOMException;
 import org.jdom.Namespace;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.io.Closeables;
+
 
 public class JavaActionExecutor extends ActionExecutor {
 
-    protected static final String HADOOP_USER = "user.name";
-    public static final String HADOOP_JOB_TRACKER = "mapred.job.tracker";
-    public static final String HADOOP_JOB_TRACKER_2 = "mapreduce.jobtracker.address";
+    public static final String RUNNING = "RUNNING";
+    public static final String SUCCEEDED = "SUCCEEDED";
+    public static final String KILLED = "KILLED";
+    public static final String FAILED = "FAILED";
+    public static final String FAILED_KILLED = "FAILED/KILLED";
     public static final String HADOOP_YARN_RM = "yarn.resourcemanager.address";
     public static final String HADOOP_NAME_NODE = "fs.default.name";
-    private static final String HADOOP_JOB_NAME = "mapred.job.name";
     public static final String OOZIE_COMMON_LIBDIR = "oozie";
-    private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
-    public final static String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
+
+    public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
     public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
@@ -111,34 +134,32 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String HADOOP_REDUCE_JAVA_OPTS = "mapreduce.reduce.java.opts";
     public static final String HADOOP_CHILD_JAVA_ENV = "mapred.child.env";
     public static final String HADOOP_MAP_JAVA_ENV = "mapreduce.map.env";
+    public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
+    public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
+    public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
     public static final String YARN_AM_RESOURCE_MB = "yarn.app.mapreduce.am.resource.mb";
     public static final String YARN_AM_COMMAND_OPTS = "yarn.app.mapreduce.am.command-opts";
     public static final String YARN_AM_ENV = "yarn.app.mapreduce.am.env";
-    private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
     public static final int YARN_MEMORY_MB_MIN = 512;
+
+    private static final String JAVA_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.JavaMain";
+    private static final String HADOOP_JOB_NAME = "mapred.job.name";
+    private static final Set<String> DISALLOWED_PROPERTIES = new HashSet<String>();
+
     private static int maxActionOutputLen;
     private static int maxExternalStatsSize;
     private static int maxFSGlobMax;
-    private static final String SUCCEEDED = "SUCCEEDED";
-    private static final String KILLED = "KILLED";
-    private static final String FAILED = "FAILED";
-    private static final String FAILED_KILLED = "FAILED/KILLED";
+
+    protected static final String HADOOP_USER = "user.name";
+
     protected XLog LOG = XLog.getLog(getClass());
-    private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
     private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
-    public static final String CONF_HADOOP_YARN_UBER_MODE = OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE;
-    public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
-    public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
-    public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
-    private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
 
     public XConfiguration workflowConf = null;
 
     static {
         DISALLOWED_PROPERTIES.add(HADOOP_USER);
-        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER);
         DISALLOWED_PROPERTIES.add(HADOOP_NAME_NODE);
-        DISALLOWED_PROPERTIES.add(HADOOP_JOB_TRACKER_2);
         DISALLOWED_PROPERTIES.add(HADOOP_YARN_RM);
     }
 
@@ -150,20 +171,17 @@ public class JavaActionExecutor extends ActionExecutor {
         super(type);
     }
 
-    public static List<Class> getCommonLauncherClasses() {
-        List<Class> classes = new ArrayList<Class>();
-        classes.add(LauncherMapper.class);
-        classes.add(launcherInputFormatClassLocator.locateOrGet());
-        classes.add(OozieLauncherOutputFormat.class);
-        classes.add(OozieLauncherOutputCommitter.class);
-        classes.add(LauncherMainHadoopUtils.class);
-        classes.add(HadoopShims.class);
+    public static List<Class<?>> getCommonLauncherClasses() {
+        List<Class<?>> classes = new ArrayList<Class<?>>();
+        classes.add(LauncherMain.class);
         classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+        classes.add(LauncherAM.class);
+        classes.add(LauncherAMCallbackNotifier.class);
         return classes;
     }
 
-    public List<Class> getLauncherClasses() {
-       List<Class> classes = new ArrayList<Class>();
+    public List<Class<?>> getLauncherClasses() {
+       List<Class<?>> classes = new ArrayList<Class<?>>();
         try {
             classes.add(Class.forName(JAVA_MAIN_CLASS_NAME));
         }
@@ -176,7 +194,7 @@ public class JavaActionExecutor extends ActionExecutor {
     @Override
     public void initActionType() {
         super.initActionType();
-        maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
+        maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
         //Get the limit for the maximum allowed size of action stats
         maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
         maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
@@ -217,31 +235,32 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    public JobConf createBaseHadoopConf(Context context, Element actionXml) {
+    public Configuration createBaseHadoopConf(Context context, Element actionXml) {
         return createBaseHadoopConf(context, actionXml, true);
     }
 
-    protected JobConf createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
+    protected Configuration createBaseHadoopConf(Context context, Element actionXml, boolean loadResources) {
+
         Namespace ns = actionXml.getNamespace();
         String jobTracker = actionXml.getChild("job-tracker", ns).getTextTrim();
         String nameNode = actionXml.getChild("name-node", ns).getTextTrim();
-        JobConf conf = null;
+        Configuration conf = null;
         if (loadResources) {
             conf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
         }
         else {
-            conf = new JobConf(false);
+            conf = new Configuration(false);
         }
+
         conf.set(HADOOP_USER, context.getProtoActionConf().get(WorkflowAppService.HADOOP_USER));
-        conf.set(HADOOP_JOB_TRACKER, jobTracker);
-        conf.set(HADOOP_JOB_TRACKER_2, jobTracker);
         conf.set(HADOOP_YARN_RM, jobTracker);
         conf.set(HADOOP_NAME_NODE, nameNode);
         conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
+
         return conf;
     }
 
-    protected JobConf loadHadoopDefaultResources(Context context, Element actionXml) {
+    protected Configuration loadHadoopDefaultResources(Context context, Element actionXml) {
         return createBaseHadoopConf(context, actionXml);
     }
 
@@ -266,7 +285,7 @@ public class JavaActionExecutor extends ActionExecutor {
             XConfiguration launcherConf = new XConfiguration();
             // Inject action defaults for launcher
             HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-            XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_JOB_TRACKER), getType());
+            XConfiguration actionDefaultConf = has.createActionDefaultConf(conf.get(HADOOP_YARN_RM), getType());
             injectLauncherProperties(actionDefaultConf, launcherConf);
             // Inject <job-xml> and <configuration> for launcher
             try {
@@ -276,15 +295,8 @@ public class JavaActionExecutor extends ActionExecutor {
             } catch (URISyntaxException ex) {
                 throw convertException(ex);
             }
-            // Inject use uber mode for launcher
-            injectLauncherUseUberMode(launcherConf);
             XConfiguration.copy(launcherConf, conf);
             checkForDisallowedProps(launcherConf, "launcher configuration");
-            // Inject config-class for launcher to use for action
-            Element e = actionXml.getChild("config-class", ns);
-            if (e != null) {
-                conf.set(LauncherMapper.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
-            }
             return conf;
         }
         catch (IOException ex) {
@@ -292,25 +304,6 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    void injectLauncherUseUberMode(Configuration launcherConf) {
-        // Set Uber Mode for the launcher (YARN only, ignored by MR1)
-        // Priority:
-        // 1. action's <configuration>
-        // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
-        // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
-        if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
-            if (ConfigurationService.get(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE).length() > 0) {
-                if (ConfigurationService.getBoolean(getActionTypeLauncherPrefix() + HADOOP_YARN_UBER_MODE)) {
-                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
-                }
-            } else {
-                if (ConfigurationService.getBoolean(OOZIE_ACTION_LAUNCHER_PREFIX + HADOOP_YARN_UBER_MODE)) {
-                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
-                }
-            }
-        }
-    }
-
     void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
         // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
         if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
@@ -322,151 +315,6 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    void updateConfForUberMode(Configuration launcherConf) {
-
-        // child.env
-        boolean hasConflictEnv = false;
-        String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
-        if (launcherMapEnv == null) {
-            launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
-        }
-        String amEnv = launcherConf.get(YARN_AM_ENV);
-        StringBuffer envStr = new StringBuffer();
-        HashMap<String, List<String>> amEnvMap = null;
-        HashMap<String, List<String>> launcherMapEnvMap = null;
-        if (amEnv != null) {
-            envStr.append(amEnv);
-            amEnvMap = populateEnvMap(amEnv);
-        }
-        if (launcherMapEnv != null) {
-            launcherMapEnvMap = populateEnvMap(launcherMapEnv);
-            if (amEnvMap != null) {
-                Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
-                while (envKeyItr.hasNext()) {
-                    String envKey = envKeyItr.next();
-                    if (amEnvMap.containsKey(envKey)) {
-                        List<String> amValList = amEnvMap.get(envKey);
-                        List<String> launcherValList = launcherMapEnvMap.get(envKey);
-                        Iterator<String> valItr = launcherValList.iterator();
-                        while (valItr.hasNext()) {
-                            String val = valItr.next();
-                            if (!amValList.contains(val)) {
-                                hasConflictEnv = true;
-                                break;
-                            }
-                            else {
-                                valItr.remove();
-                            }
-                        }
-                        if (launcherValList.isEmpty()) {
-                            envKeyItr.remove();
-                        }
-                    }
-                }
-            }
-        }
-        if (hasConflictEnv) {
-            launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
-        }
-        else {
-            if (launcherMapEnvMap != null) {
-                for (String key : launcherMapEnvMap.keySet()) {
-                    List<String> launcherValList = launcherMapEnvMap.get(key);
-                    for (String val : launcherValList) {
-                        if (envStr.length() > 0) {
-                            envStr.append(",");
-                        }
-                        envStr.append(key).append("=").append(val);
-                    }
-                }
-            }
-
-            launcherConf.set(YARN_AM_ENV, envStr.toString());
-
-            // memory.mb
-            int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
-            int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
-            // YARN_MEMORY_MB_MIN to provide buffer.
-            // suppose launcher map aggressively use high memory, need some
-            // headroom for AM
-            int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
-            // limit to 4096 in case of 32 bit
-            if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
-                memoryMB = 4096;
-            }
-            launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
-
-            // We already made mapred.child.java.opts and
-            // mapreduce.map.java.opts equal, so just start with one of them
-            String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
-            String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
-            StringBuilder optsStr = new StringBuilder();
-            int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
-            int heapSizeForAm = extractHeapSizeMB(amChildOpts);
-            int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
-            // limit to 3584 in case of 32 bit
-            if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
-                heapSize = 3584;
-            }
-            if (amChildOpts != null) {
-                optsStr.append(amChildOpts);
-            }
-            optsStr.append(" ").append(launcherMapOpts.trim());
-            if (heapSize > 0) {
-                // append calculated total heap size to the end
-                optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
-            }
-            launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
-        }
-    }
-
-    void updateConfForJavaTmpDir(Configuration conf) {
-        String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
-        String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
-        if (amChildOpts != null && !amChildOpts.contains(JAVA_TMP_DIR_SETTINGS)) {
-            conf.set(YARN_AM_COMMAND_OPTS, amChildOpts + " " + oozieJavaTmpDirSetting);
-        }
-    }
-
-    private HashMap<String, List<String>> populateEnvMap(String input) {
-        HashMap<String, List<String>> envMaps = new HashMap<String, List<String>>();
-        String[] envEntries = input.split(",");
-        for (String envEntry : envEntries) {
-            String[] envKeyVal = envEntry.split("=");
-            String envKey = envKeyVal[0].trim();
-            List<String> valList = envMaps.get(envKey);
-            if (valList == null) {
-                valList = new ArrayList<String>();
-            }
-            valList.add(envKeyVal[1].trim());
-            envMaps.put(envKey, valList);
-        }
-        return envMaps;
-    }
-
-    public int extractHeapSizeMB(String input) {
-        int ret = 0;
-        if(input == null || input.equals(""))
-            return ret;
-        Matcher m = heapPattern.matcher(input);
-        String heapStr = null;
-        String heapNum = null;
-        // Grabs the last match which takes effect (in case that multiple Xmx options specified)
-        while (m.find()) {
-            heapStr = m.group(1);
-            heapNum = m.group(2);
-        }
-        if (heapStr != null) {
-            // when Xmx specified in Gigabyte
-            if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
-                ret = Integer.parseInt(heapNum) * 1024;
-            } else {
-                ret = Integer.parseInt(heapNum);
-            }
-        }
-        return ret;
-    }
-
     public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf)
             throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
         parseJobXmlAndConfiguration(context, element, appPath, conf, false);
@@ -475,6 +323,7 @@ public class JavaActionExecutor extends ActionExecutor {
     public static void parseJobXmlAndConfiguration(Context context, Element element, Path appPath, Configuration conf,
             boolean isLauncher) throws IOException, ActionExecutorException, HadoopAccessorException, URISyntaxException {
         Namespace ns = element.getNamespace();
+        @SuppressWarnings("unchecked")
         Iterator<Element> it = element.getChildren("job-xml", ns).iterator();
         HashMap<String, FileSystem> filesystemsMap = new HashMap<String, FileSystem>();
         HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
@@ -535,8 +384,8 @@ public class JavaActionExecutor extends ActionExecutor {
             throws ActionExecutorException {
         try {
             HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-            XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_JOB_TRACKER), getType());
-            XConfiguration.copy(actionDefaults, actionConf);
+            XConfiguration actionDefaults = has.createActionDefaultConf(actionConf.get(HADOOP_YARN_RM), getType());
+            XConfiguration.injectDefaults(actionDefaults, actionConf);
             has.checkSupportedFilesystem(appPath.toUri());
 
             // Set the Java Main Class for the Java action to give to the Java launcher
@@ -546,7 +395,6 @@ public class JavaActionExecutor extends ActionExecutor {
 
             // set cancel.delegation.token in actionConf that child job doesn't cancel delegation token
             actionConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", false);
-            updateConfForJavaTmpDir(actionConf);
             setRootLoggerLevel(actionConf);
             return actionConf;
         }
@@ -634,8 +482,8 @@ public class JavaActionExecutor extends ActionExecutor {
         }
         catch (Exception ex) {
             LOG.debug(
-                    "Errors when add to DistributedCache. Path=" + uri.toString() + ", archive=" + archive + ", conf="
-                            + XmlUtils.prettyPrint(conf).toString());
+                    "Errors when add to DistributedCache. Path=" + Objects.toString(uri, "<null>") + ", archive="
+                            + archive + ", conf=" + XmlUtils.prettyPrint(conf).toString());
             throw convertException(ex);
         }
     }
@@ -758,7 +606,7 @@ public class JavaActionExecutor extends ActionExecutor {
         if (shareLibService != null) {
             try {
                 List<Path> listOfPaths = shareLibService.getSystemLibJars(JavaActionExecutor.OOZIE_COMMON_LIBDIR);
-                if (listOfPaths == null || listOfPaths.isEmpty()) {
+                if (listOfPaths.isEmpty()) {
                     throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "EJ001",
                             "Could not locate Oozie sharelib");
                 }
@@ -768,7 +616,7 @@ public class JavaActionExecutor extends ActionExecutor {
                     DistributedCache.createSymlink(conf);
                 }
                 listOfPaths = shareLibService.getSystemLibJars(getType());
-                if (listOfPaths != null) {
+                if (!listOfPaths.isEmpty()) {
                     for (Path actionLibPath : listOfPaths) {
                         JobUtils.addFileToClassPath(actionLibPath, conf, fs);
                         DistributedCache.createSymlink(conf);
@@ -885,7 +733,7 @@ public class JavaActionExecutor extends ActionExecutor {
 
 
     protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
-        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
+        return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
     }
 
     private void setJavaMain(Configuration actionConf, Element actionXml) {
@@ -907,8 +755,8 @@ public class JavaActionExecutor extends ActionExecutor {
     }
 
     @SuppressWarnings("unchecked")
-    JobConf createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml, Configuration actionConf)
-            throws ActionExecutorException {
+    Configuration createLauncherConf(FileSystem actionFs, Context context, WorkflowAction action, Element actionXml,
+            Configuration actionConf) throws ActionExecutorException {
         try {
 
             // app path could be a file
@@ -918,7 +766,7 @@ public class JavaActionExecutor extends ActionExecutor {
             }
 
             // launcher job configuration
-            JobConf launcherJobConf = createBaseHadoopConf(context, actionXml);
+            Configuration launcherJobConf = createBaseHadoopConf(context, actionXml);
             // cancel delegation token on a launcher job which stays alive till child job(s) finishes
             // otherwise (in mapred action), doesn't cancel not to disturb running child job
             launcherJobConf.setBoolean("mapreduce.job.complete.cancel.delegation.tokens", true);
@@ -940,7 +788,7 @@ public class JavaActionExecutor extends ActionExecutor {
                     launcherTime = context.getWorkflow().getCreatedTime().getTime();
                 }
                 String actionYarnTag = getActionYarnTag(getWorkflowConf(context), context.getWorkflow(), action);
-                LauncherMapperHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
+                LauncherHelper.setupYarnRestartHandling(launcherJobConf, actionConf, actionYarnTag, launcherTime);
             }
             else {
                 LOG.info(MessageFormat.format("{0} is set to false, not setting YARN restart properties",
@@ -953,15 +801,6 @@ public class JavaActionExecutor extends ActionExecutor {
             }
             setLibFilesArchives(context, actionXml, appPathRoot, launcherJobConf);
 
-            String jobName = launcherJobConf.get(HADOOP_JOB_NAME);
-            if (jobName == null || jobName.isEmpty()) {
-                jobName = XLog.format(
-                        "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
-                        context.getWorkflow().getAppName(), action.getName(),
-                        context.getWorkflow().getId());
-            launcherJobConf.setJobName(jobName);
-            }
-
             // Inject Oozie job information if enabled.
             injectJobInfo(launcherJobConf, actionConf, context, action);
 
@@ -981,23 +820,22 @@ public class JavaActionExecutor extends ActionExecutor {
                     prepareXML = XmlUtils.prettyPrint(prepareElement).toString().trim();
                 }
             }
-            LauncherMapperHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
+            LauncherHelper.setupLauncherInfo(launcherJobConf, jobId, actionId, actionDir, recoveryId, actionConf,
                     prepareXML);
 
             // Set the launcher Main Class
-            LauncherMapperHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
-            LauncherMapperHelper.setupLauncherURIHandlerConf(launcherJobConf);
-
-            LauncherMapperHelper.setupMaxOutputData(launcherJobConf, getMaxOutputData(actionConf));
-            LauncherMapperHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
-            LauncherMapperHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax);
+            LauncherHelper.setupMainClass(launcherJobConf, getLauncherMain(launcherJobConf, actionXml));
+            LauncherHelper.setupLauncherURIHandlerConf(launcherJobConf);
+            LauncherHelper.setupMaxOutputData(launcherJobConf, getMaxOutputData(actionConf));
+            LauncherHelper.setupMaxExternalStatsSize(launcherJobConf, maxExternalStatsSize);
+            LauncherHelper.setupMaxFSGlob(launcherJobConf, maxFSGlobMax);
 
             List<Element> list = actionXml.getChildren("arg", ns);
             String[] args = new String[list.size()];
             for (int i = 0; i < list.size(); i++) {
                 args[i] = list.get(i).getTextTrim();
             }
-            LauncherMapperHelper.setupMainArguments(launcherJobConf, args);
+            LauncherHelper.setupMainArguments(launcherJobConf, args);
             // backward compatibility flag - see OOZIE-2872
             if (ConfigurationService.getBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED)) {
                 launcherJobConf.setBoolean(LauncherMapper.CONF_OOZIE_NULL_ARGS_ALLOWED, true);
@@ -1022,16 +860,6 @@ public class JavaActionExecutor extends ActionExecutor {
             launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
             launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
 
-            // setting for uber mode
-            if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
-                if (checkPropertiesToDisableUber(launcherJobConf)) {
-                    launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
-                }
-                else {
-                    updateConfForUberMode(launcherJobConf);
-                }
-            }
-            updateConfForJavaTmpDir(launcherJobConf);
             injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
 
             // properties from action that are needed by the launcher (e.g. QUEUE NAME, ACLs)
@@ -1055,23 +883,9 @@ public class JavaActionExecutor extends ActionExecutor {
         return maxActionOutputLen;
     }
 
-    private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
-        boolean disable = false;
-        if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
-            disable = true;
-        }
-        else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
-            disable = true;
-        }
-        return disable;
-    }
-
     protected void injectCallback(Context context, Configuration conf) {
-        String callback = context.getCallbackUrl("$jobStatus");
-        if (conf.get("job.end.notification.url") != null) {
-            LOG.warn("Overriding the action job end notification URI");
-        }
-        conf.set("job.end.notification.url", callback);
+        String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
     }
 
     void injectActionCallback(Context context, Configuration actionConf) {
@@ -1082,7 +896,7 @@ public class JavaActionExecutor extends ActionExecutor {
         injectCallback(context, launcherConf);
     }
 
-    private void actionConfToLauncherConf(Configuration actionConf, JobConf launcherConf) {
+    private void actionConfToLauncherConf(Configuration actionConf, Configuration launcherConf) {
         for (String name : SPECIAL_PROPERTIES) {
             if (actionConf.get(name) != null && launcherConf.get("oozie.launcher." + name) == null) {
                 launcherConf.set(name, actionConf.get(name));
@@ -1090,9 +904,8 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
-        JobClient jobClient = null;
-        boolean exception = false;
+    public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
+        YarnClient yarnClient = null;
         try {
             Path appPathRoot = new Path(context.getWorkflow().getAppPath());
 
@@ -1109,14 +922,6 @@ public class JavaActionExecutor extends ActionExecutor {
             LOG.debug("Setting LibFilesArchives ");
             setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
 
-            String jobName = actionConf.get(HADOOP_JOB_NAME);
-            if (jobName == null || jobName.isEmpty()) {
-                jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
-                        getType(), context.getWorkflow().getAppName(),
-                        action.getName(), context.getWorkflow().getId());
-                actionConf.set(HADOOP_JOB_NAME, jobName);
-            }
-
             injectActionCallback(context, actionConf);
 
             if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
@@ -1130,15 +935,17 @@ public class JavaActionExecutor extends ActionExecutor {
             }
 
             // Setting the credential properties in launcher conf
-            JobConf credentialsConf = null;
+            Configuration credentialsConf = null;
+
             HashMap<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context,
                     action, actionConf);
+            Credentials credentials = null;
             if (credentialsProperties != null) {
-
+                credentials = new Credentials();
                 // Adding if action need to set more credential tokens
-                credentialsConf = new JobConf(false);
+                credentialsConf = new Configuration(false);
                 XConfiguration.copy(actionConf, credentialsConf);
-                setCredentialTokens(credentialsConf, context, action, credentialsProperties);
+                setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties);
 
                 // insert conf to action conf from credentialsConf
                 for (Entry<String, String> entry : credentialsConf) {
@@ -1147,49 +954,56 @@ public class JavaActionExecutor extends ActionExecutor {
                     }
                 }
             }
+            Configuration launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
 
-            JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
-
-            LOG.debug("Creating Job Client for action " + action.getId());
-            jobClient = createJobClient(context, launcherJobConf);
-            String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+            String consoleUrl;
+            String launcherId = LauncherHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
                     .getRecoveryId());
             boolean alreadyRunning = launcherId != null;
-            RunningJob runningJob;
 
             // if user-retry is on, always submit new launcher
             boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
+            LOG.debug("Creating yarnClient for action {0}", action.getId());
+            yarnClient = createYarnClient(context, launcherJobConf);
 
             if (alreadyRunning && !isUserRetry) {
-                runningJob = jobClient.getJob(JobID.forName(launcherId));
-                if (runningJob == null) {
-                    String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
+                try {
+                    ApplicationId appId = ConverterUtils.toApplicationId(launcherId);
+                    ApplicationReport report = yarnClient.getApplicationReport(appId);
+                    consoleUrl = report.getTrackingUrl();
+                } catch (RemoteException e) {
+                    // caught when the application id does not exist
+                    LOG.error("Got RemoteException from YARN", e);
+                    String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
                     throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
                             "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker);
                 }
             }
             else {
-                LOG.debug("Submitting the job through Job Client for action " + action.getId());
-
-                // setting up propagation of the delegation token.
-                Services.get().get(HadoopAccessorService.class).addRMDelegationToken(jobClient, launcherJobConf);
+                // TODO: OYA: do we actually need an MR token?  IIRC, it's issued by the JHS
+//                // setting up propagation of the delegation token.
+//                Token<DelegationTokenIdentifier> mrdt = null;
+//                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+//                mrdt = jobClient.getDelegationToken(has
+//                        .getMRDelegationTokenRenewer(launcherJobConf));
+//                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
 
                 // insert credentials tokens to launcher job conf if needed
-                if (needInjectCredentials() && credentialsConf != null) {
-                    for (Token<? extends TokenIdentifier> tk : credentialsConf.getCredentials().getAllTokens()) {
+                if (credentialsConf != null) {
+                    for (Token<? extends TokenIdentifier> tk :credentials.getAllTokens()) {
                         Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService());
                         LOG.debug("ADDING TOKEN: " + fauxAlias);
-                        launcherJobConf.getCredentials().addToken(fauxAlias, tk);
+                        credentials.addToken(fauxAlias, tk);
                     }
-                    if (credentialsConf.getCredentials().numberOfSecretKeys() > 0) {
+                    if (credentials.numberOfSecretKeys() > 0) {
                         for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) {
                             CredentialsProperties credProps = entry.getValue();
                             if (credProps != null) {
                                 Text credName = new Text(credProps.getName());
-                                byte[] secKey = credentialsConf.getCredentials().getSecretKey(credName);
+                                byte[] secKey = credentials.getSecretKey(credName);
                                 if (secKey != null) {
                                     LOG.debug("ADDING CREDENTIAL: " + credProps.getName());
-                                    launcherJobConf.getCredentials().addSecretKey(credName, secKey);
+                                    credentials.addSecretKey(credName, secKey);
                                 }
                             }
                         }
@@ -1198,55 +1012,129 @@ public class JavaActionExecutor extends ActionExecutor {
                 else {
                     LOG.info("No need to inject credentials.");
                 }
-                runningJob = jobClient.submitJob(launcherJobConf);
-                if (runningJob == null) {
-                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
-                            "Error submitting launcher for action [{0}]", action.getId());
-                }
-                launcherId = runningJob.getID().toString();
-                LOG.debug("After submission get the launcherId " + launcherId);
+
+                String user = context.getWorkflow().getUser();
+
+                YarnClientApplication newApp = yarnClient.createApplication();
+                ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+                ApplicationSubmissionContext appContext =
+                        createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(),
+                                credentials);
+                yarnClient.submitApplication(appContext);
+
+                launcherId = appId.toString();
+                LOG.debug("After submission get the launcherId [{0}]", launcherId);
+                ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+                consoleUrl = appReport.getTrackingUrl();
             }
 
-            String jobTracker = launcherJobConf.get(HADOOP_JOB_TRACKER);
-            String consoleUrl = runningJob.getTrackingURL();
+            String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
             context.setStartData(launcherId, jobTracker, consoleUrl);
         }
         catch (Exception ex) {
-            exception = true;
             throw convertException(ex);
         }
         finally {
-            if (jobClient != null) {
-                try {
-                    jobClient.close();
-                }
-                catch (Exception e) {
-                    if (exception) {
-                        LOG.error("JobClient error: ", e);
-                    }
-                    else {
-                        throw convertException(e);
-                    }
-                }
+            if (yarnClient != null) {
+                Closeables.closeQuietly(yarnClient);
             }
         }
     }
-    private boolean needInjectCredentials() {
-        boolean methodExists = true;
 
-        Class klass;
-        try {
-            klass = Class.forName("org.apache.hadoop.mapred.JobConf");
-            klass.getMethod("getCredentials");
-        }
-        catch (ClassNotFoundException ex) {
-            methodExists = false;
-        }
-        catch (NoSuchMethodException ex) {
-            methodExists = false;
+    private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf,
+                                        String user, Context context, Configuration actionConf, String actionName,
+                                        Credentials credentials)
+            throws IOException, HadoopAccessorException, URISyntaxException {
+
+        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+        String jobName = XLog.format(
+                "oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
+                context.getWorkflow().getAppName(), actionName,
+                context.getWorkflow().getId());
+
+        appContext.setApplicationId(appId);
+        appContext.setApplicationName(jobName);
+        appContext.setApplicationType("Oozie Launcher");
+        Priority pri = Records.newRecord(Priority.class);
+        int priority = 0; // TODO: OYA: Add a constant or a config
+        pri.setPriority(priority);
+        appContext.setPriority(pri);
+        appContext.setQueue("default");  // TODO: will be possible to set in <launcher>
+        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+        // Set the resources to localize
+        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+        ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
+        MRApps.setupDistributedCache(launcherJobConf, localResources);
+        // Add the Launcher and Action configs as Resources
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
+                launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
+        localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
+        LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf,
+                context.getAppFileSystem().getUri(), context.getActionDir());
+        localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR);
+        amContainer.setLocalResources(localResources);
+
+        // Set the environment variables
+        Map<String, String> env = new HashMap<String, String>();
+        // This adds the Hadoop jars to the classpath in the Launcher JVM
+        ClasspathUtils.setupClasspath(env, launcherJobConf);
+
+        if (needToAddMapReduceToClassPath()) {
+            ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+        }
+
+        addActionSpecificEnvVars(env);
+        amContainer.setEnvironment(Collections.unmodifiableMap(env));
+
+        // Set the command
+        List<String> vargs = new ArrayList<String>(6);
+        vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString())
+                + "/bin/java");
+
+        vargs.add("-Dlog4j.configuration=container-log4j.properties");
+        vargs.add("-Dlog4j.debug=true");
+        vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+        vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0);
+        vargs.add("-Dhadoop.root.logger=INFO,CLA");
+        vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
+        vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser());
+
+        Path amTmpDir = new Path(Apps.crossPlatformify(ApplicationConstants.Environment.PWD.toString()),
+                YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
+        vargs.add("-Djava.io.tmpdir=" + amTmpDir);
+
+        vargs.add(LauncherAM.class.getCanonicalName());
+        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+                Path.SEPARATOR + ApplicationConstants.STDOUT);
+        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+                Path.SEPARATOR + ApplicationConstants.STDERR);
+        StringBuilder mergedCommand = new StringBuilder();
+        for (CharSequence str : vargs) {
+            mergedCommand.append(str).append(" ");
+        }
+
+        List<String> vargsFinal = ImmutableList.of(mergedCommand.toString());
+        LOG.debug("Command to launch container for ApplicationMaster is: {0}", mergedCommand);
+        amContainer.setCommands(vargsFinal);
+        appContext.setAMContainerSpec(amContainer);
+
+        // Set tokens
+        if (credentials != null) {
+            DataOutputBuffer dob = new DataOutputBuffer();
+            credentials.writeTokenStorageToStream(dob);
+            amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
         }
 
-        return methodExists;
+        // Set Resources
+        // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores)
+        Resource resource = Resource.newInstance(2048, 1);
+        appContext.setResource(resource);
+        appContext.setCancelTokensWhenComplete(true);
+
+        return appContext;
     }
 
     protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
@@ -1258,15 +1146,16 @@ public class JavaActionExecutor extends ActionExecutor {
                 if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
                     !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
                     credPropertiesMap = getActionCredentialsProperties(context, action);
-                    if (credPropertiesMap != null) {
-                        for (String key : credPropertiesMap.keySet()) {
-                            CredentialsProperties prop = credPropertiesMap.get(key);
-                            if (prop != null) {
+                    if (!credPropertiesMap.isEmpty()) {
+                        for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
+                            if (entry.getValue() != null) {
+                                CredentialsProperties prop = entry.getValue();
                                 LOG.debug("Credential Properties set for action : " + action.getId());
-                                for (String property : prop.getProperties().keySet()) {
-                                    actionConf.set(property, prop.getProperties().get(property));
-                                    LOG.debug("property : '" + property + "', value : '" + prop.getProperties().get(property)
-                                            + "'");
+                                for (Entry<String, String> propEntry : prop.getProperties().entrySet()) {
+                                    String key = propEntry.getKey();
+                                    String value = propEntry.getValue();
+                                    actionConf.set(key, value);
+                                    LOG.debug("property : '" + key + "', value : '" + value + "'");
                                 }
                             }
                         }
@@ -1285,20 +1174,20 @@ public class JavaActionExecutor extends ActionExecutor {
         return credPropertiesMap;
     }
 
-    protected void setCredentialTokens(JobConf jobconf, Context context, WorkflowAction action,
+    protected void setCredentialTokens(Credentials credentials, Configuration jobconf, Context context, WorkflowAction action,
             HashMap<String, CredentialsProperties> credPropertiesMap) throws Exception {
 
         if (context != null && action != null && credPropertiesMap != null) {
             // Make sure we're logged into Kerberos; if not, or near expiration, it will relogin
-            CredentialsProvider.ensureKerberosLogin();
+            CredentialsProviderFactory.ensureKerberosLogin();
             for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
                 String credName = entry.getKey();
                 CredentialsProperties credProps = entry.getValue();
                 if (credProps != null) {
-                    CredentialsProvider credProvider = new CredentialsProvider(credProps.getType());
-                    Credentials credentialObject = credProvider.createCredentialObject();
-                    if (credentialObject != null) {
-                        credentialObject.addtoJobConf(jobconf, credProps, context);
+                    CredentialsProvider tokenProvider = CredentialsProviderFactory.getInstance()
+                            .createCredentialsProvider(credProps.getType());
+                    if (tokenProvider != null) {
+                        tokenProvider.updateCredentials(credentials, jobconf, credProps, context);
                         LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId());
                     }
                     else {
@@ -1310,7 +1199,6 @@ public class JavaActionExecutor extends ActionExecutor {
                 }
             }
         }
-
     }
 
     protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context,
@@ -1424,19 +1312,22 @@ public class JavaActionExecutor extends ActionExecutor {
      * @return JobClient
      * @throws HadoopAccessorException
      */
-    protected JobClient createJobClient(Context context, JobConf jobConf) throws HadoopAccessorException {
+    protected JobClient createJobClient(Context context, Configuration jobConf) throws HadoopAccessorException {
         String user = context.getWorkflow().getUser();
-        String group = context.getWorkflow().getGroup();
         return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
     }
 
-    protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
-        String externalId = action.getExternalId();
-        RunningJob runningJob = null;
-        if (externalId != null) {
-            runningJob = jobClient.getJob(JobID.forName(externalId));
-        }
-        return runningJob;
+    /**
+     * Create yarn client object
+     *
+     * @param context
+     * @param jobConf
+     * @return YarnClient
+     * @throws HadoopAccessorException
+     */
+    protected YarnClient createYarnClient(Context context, Configuration jobConf) throws HadoopAccessorException {
+        String user = context.getWorkflow().getUser();
+        return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
     }
 
     /**
@@ -1448,142 +1339,141 @@ public class JavaActionExecutor extends ActionExecutor {
         return action.getExternalId();
     }
 
+    /**
+     * If returns true, it means that we have to add Hadoop MR jars to the classpath.
+     * Subclasses should override this method if necessary. By default we don't add
+     * MR jars to the classpath.
+     * @return false by default
+     */
+    protected boolean needToAddMapReduceToClassPath() {
+        return false;
+    }
+
+    /**
+     * Adds action-specific environment variables. Default implementation is no-op.
+     * Subclasses should override this method if necessary.
+     *
+     */
+    protected void addActionSpecificEnvVars(Map<String, String> env) {
+        // nop
+    }
+
     @Override
     public void check(Context context, WorkflowAction action) throws ActionExecutorException {
-        JobClient jobClient = null;
-        boolean exception = false;
+        boolean fallback = false;
+        LOG = XLog.resetPrefix(LOG);
         LogUtils.setLogInfo(action);
+        YarnClient yarnClient = null;
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
+            Configuration jobConf = createBaseHadoopConf(context, actionXml);
             FileSystem actionFs = context.getAppFileSystem();
-            JobConf jobConf = createBaseHadoopConf(context, actionXml);
-            jobClient = createJobClient(context, jobConf);
-            RunningJob runningJob = getRunningJob(context, action, jobClient);
-            if (runningJob == null) {
-                context.setExecutionData(FAILED, null);
-                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
-                        "Could not lookup launched hadoop Job ID [{0}] which was associated with " +
-                        " action [{1}].  Failing this action!", getActualExternalId(action), action.getId());
-            }
-            if (runningJob.isComplete()) {
+            yarnClient = createYarnClient(context, jobConf);
+            FinalApplicationStatus appStatus = null;
+            try {
+                ApplicationReport appReport =
+                        yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
+                YarnApplicationState appState = appReport.getYarnApplicationState();
+                if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
+                        || appState == YarnApplicationState.KILLED) {
+                    appStatus = appReport.getFinalApplicationStatus();
+                }
+
+            } catch (Exception ye) {
+                LOG.warn("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye);
+                // Fallback to action data file if we can't find the Launcher AM (maybe it got purged)
+                fallback = true;
+            }
+            if (appStatus != null || fallback) {
                 Path actionDir = context.getActionDir();
-                String newId = null;
                 // load sequence file into object
-                Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
-                if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
-                    newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
-                    String launcherId = action.getExternalId();
-                    runningJob = jobClient.getJob(JobID.forName(newId));
-                    if (runningJob == null) {
-                        context.setExternalStatus(FAILED);
+                Map<String, String> actionData = LauncherHelper.getActionData(actionFs, actionDir, jobConf);
+                if (fallback) {
+                    String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
+                    if (finalStatus != null) {
+                        appStatus = FinalApplicationStatus.valueOf(finalStatus);
+                    } else {
+                        context.setExecutionData(FAILED, null);
                         throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
-                                "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
-                                action.getId());
+                                "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" +
+                                        " action data.  Failing this action!", action.getExternalId(), action.getId());
                     }
-                    context.setExternalChildIDs(newId);
-                    LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
-                            newId);
                 }
-                else {
-                    String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
-                    if (externalIDs != null) {
-                        context.setExternalChildIDs(externalIDs);
-                        LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
+
+                String externalID = actionData.get(LauncherAM.ACTION_DATA_NEW_ID);  // MapReduce was launched
+                if (externalID != null) {
+                    context.setExternalChildIDs(externalID);
+                    LOG.info(XLog.STD, "Hadoop Job was launched : [{0}]", externalID);
+                }
+
+               // Multiple child IDs - Pig or Hive action
+                String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+                if (externalIDs != null) {
+                    context.setExternalChildIDs(externalIDs);
+                    LOG.info(XLog.STD, "External Child IDs  : [{0}]", externalIDs);
+
+                }
+
+                LOG.info(XLog.STD, "action completed, external ID [{0}]", action.getExternalId());
+                context.setExecutionData(appStatus.toString(), null);
+                if (appStatus == FinalApplicationStatus.SUCCEEDED) {
+                    if (getCaptureOutput(action) && LauncherHelper.hasOutputData(actionData)) {
+                        context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
+                                .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
+                        LOG.info(XLog.STD, "action produced output");
                     }
-                    else if (LauncherMapperHelper.hasOutputData(actionData)) {
-                        // Load stored Hadoop jobs ids and promote them as external child ids
-                        // This is for jobs launched with older release during upgrade to Oozie 4.3
-                        Properties props = PropertiesUtils.stringToProperties(actionData
-                                .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS));
-                        if (props.get(LauncherMain.HADOOP_JOBS) != null) {
-                            externalIDs = (String) props.get(LauncherMain.HADOOP_JOBS);
-                            context.setExternalChildIDs(externalIDs);
-                            LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
-                        }
+                    else {
+                        context.setExecutionData(SUCCEEDED, null);
                     }
-                }
-                if (runningJob.isComplete()) {
-                    // fetching action output and stats for the Map-Reduce action.
-                    if (newId != null) {
-                        actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
+                    if (LauncherHelper.hasStatsData(actionData)) {
+                        context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
+                        LOG.info(XLog.STD, "action produced stats");
                     }
-                    LOG.info(XLog.STD, "action completed, external ID [{0}]",
-                            action.getExternalId());
-                    if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
-                        if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
-                            context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
-                                    .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
-                            LOG.info(XLog.STD, "action produced output");
+                    getActionData(actionFs, action, context);
+                }
+                else {
+                    String errorReason;
+                    if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) {
+                        Properties props = PropertiesUtils.stringToProperties(actionData
+                                .get(LauncherAM.ACTION_DATA_ERROR_PROPS));
+                        String errorCode = props.getProperty("error.code");
+                        if ("0".equals(errorCode)) {
+                            errorCode = "JA018";
                         }
-                        else {
-                            context.setExecutionData(SUCCEEDED, null);
+                        if ("-1".equals(errorCode)) {
+                            errorCode = "JA019";
                         }
-                        if (LauncherMapperHelper.hasStatsData(actionData)) {
-                            context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
-                            LOG.info(XLog.STD, "action produced stats");
+                        errorReason = props.getProperty("error.reason");
+                        LOG.warn("Launcher ERROR, reason: {0}", errorReason);
+                        String exMsg = props.getProperty("exception.message");
+                        String errorInfo = (exMsg != null) ? exMsg : errorReason;
+                        context.setErrorInfo(errorCode, errorInfo);
+                        String exStackTrace = props.getProperty("exception.stacktrace");
+                        if (exMsg != null) {
+                            LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
                         }
-                        getActionData(actionFs, runningJob, action, context);
                     }
                     else {
-                        String errorReason;
-                        if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
-                            Properties props = PropertiesUtils.stringToProperties(actionData
-                                    .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
-                            String errorCode = props.getProperty("error.code");
-                            if ("0".equals(errorCode)) {
-                                errorCode = "JA018";
-                            }
-                            if ("-1".equals(errorCode)) {
-                                errorCode = "JA019";
-                            }
-                            errorReason = props.getProperty("error.reason");
-                            LOG.warn("Launcher ERROR, reason: {0}", errorReason);
-                            String exMsg = props.getProperty("exception.message");
-                            String errorInfo = (exMsg != null) ? exMsg : errorReason;
-                            context.setErrorInfo(errorCode, errorInfo);
-                            String exStackTrace = props.getProperty("exception.stacktrace");
-                            if (exMsg != null) {
-                                LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
-                            }
-                        }
-                        else {
-                            errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
-                                    .getTrackerUri(), action.getExternalId());
-                            LOG.warn(errorReason);
-                        }
-                        context.setExecutionData(FAILED_KILLED, null);
+                        errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action
+                                .getTrackerUri(), action.getExternalId());
+                        LOG.warn(errorReason);
                     }
-                }
-                else {
-                    context.setExternalStatus("RUNNING");
-                    LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
-                            runningJob.getID());
+                    context.setExecutionData(FAILED_KILLED, null);
                 }
             }
             else {
-                context.setExternalStatus("RUNNING");
+                context.setExternalStatus(YarnApplicationState.RUNNING.toString());
                 LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
-                        runningJob.getID());
+                        action.getExternalId());
             }
         }
         catch (Exception ex) {
             LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
-            exception = true;
             throw convertException(ex);
         }
         finally {
-            if (jobClient != null) {
-                try {
-                    jobClient.close();
-                }
-                catch (Exception e) {
-                    if (exception) {
-                        LOG.error("JobClient error: ", e);
-                    }
-                    else {
-                        throw convertException(e);
-                    }
-                }
+            if (yarnClient != null) {
+                IOUtils.closeQuietly(yarnClient);
             }
         }
     }
@@ -1591,14 +1481,12 @@ public class JavaActionExecutor extends ActionExecutor {
     /**
      * Get the output data of an action. Subclasses should override this method
      * to get action specific output data.
-     *
      * @param actionFs the FileSystem object
-     * @param runningJob the runningJob
      * @param action the Workflow action
      * @param context executor context
      *
      */
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
     }
 
@@ -1611,55 +1499,39 @@ public class JavaActionExecutor extends ActionExecutor {
 
     @Override
     public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
-        JobClient jobClient = null;
-        boolean exception = false;
+        YarnClient yarnClient = null;
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
-            final JobConf jobConf = createBaseHadoopConf(context, actionXml);
-            WorkflowJob wfJob = context.getWorkflow();
-            Configuration conf = null;
-            if ( wfJob.getConf() != null ) {
-                conf = new XConfiguration(new StringReader(wfJob.getConf()));
-            }
-            String launcherTag = LauncherMapperHelper.getActionYarnTag(conf, wfJob.getParentId(), action);
-            jobConf.set(LauncherMainHadoopUtils.CHILD_MAPREDUCE_JOB_TAGS, LauncherMapperHelper.getTag(launcherTag));
-            jobConf.set(LauncherMainHadoopUtils.OOZIE_JOB_LAUNCH_TIME, Long.toString(action.getStartTime().getTime()));
-            UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class)
-                    .getProxyUser(context.getWorkflow().getUser());
-            ugi.doAs(new PrivilegedExceptionAction<Void>() {
-                @Override
-                public Void run() throws Exception {
-                    LauncherMainHadoopUtils.killChildYarnJobs(jobConf);
-                    return null;
+
+            final Configuration jobConf = createBaseHadoopConf(context, actionXml);
+            String launcherTag = LauncherHelper.getActionYarnTag(jobConf, context.getWorkflow().getParentId(), action);
+            jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, LauncherHelper.getTag(launcherTag));
+            yarnClient = createYarnClient(context, jobConf);
+            if(action.getExternalId() != null) {
+                yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
+            }
+            for(ApplicationId id : LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL,
+                    action.getStartTime().getTime())){
+                try {
+                    yarnClient.killApplication(id);
+                } catch (Exception e) {
+                    LOG.warn("Could not kill child of {0}, {1}", action.getExternalId(), id);
                 }
-            });
-            jobClient = createJobClient(context, jobConf);
-            RunningJob runningJob = getRunningJob(context, action, jobClient);
-            if (runningJob != null) {
-                runningJob.killJob();
             }
+
             context.setExternalStatus(KILLED);
             context.setExecutionData(KILLED, null);
-        }
-        catch (Exception ex) {
-            exception = true;
+        } catch (Exception ex) {
+            LOG.error("Error when killing YARN application", ex);
             throw convertException(ex);
-        }
-        finally {
+        } finally {
             try {
                 FileSystem actionFs = context.getAppFileSystem();
                 cleanUpActionDir(actionFs, context);
-                if (jobClient != null) {
-                    jobClient.close();
-                }
-            }
-            catch (Exception ex) {
-                if (exception) {
-                    LOG.error("Error: ", ex);
-                }
-                else {
-                    throw convertException(ex);
-                }
+                Closeables.closeQuietly(yarnClient);
+            } catch (Exception ex) {
+                LOG.error("Error when cleaning up action dir", ex);
+                throw convertException(ex);
             }
         }
     }
@@ -1754,7 +1626,7 @@ public class JavaActionExecutor extends ActionExecutor {
             HadoopAccessorException, URISyntaxException {
     }
 
-    private void injectJobInfo(JobConf launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) {
+    private void injectJobInfo(Configuration launcherJobConf, Configuration actionConf, Context context, WorkflowAction action) {
         if (OozieJobInfo.isJobInfoEnabled()) {
             try {
                 OozieJobInfo jobInfo = new OozieJobInfo(actionConf, context, action);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
new file mode 100644
index 0000000..f80141c
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherHelper.java
@@ -0,0 +1,322 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.math.BigInteger;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.oozie.client.OozieClient;
+import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.URIHandlerService;
+import org.apache.oozie.service.UserGroupInformationService;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.PropertiesUtils;
+
+public class LauncherHelper {
+
+    public static final String OOZIE_ACTION_YARN_TAG = "oozie.action.yarn.tag";
+
+    private static final LauncherInputFormatClassLocator launcherInputFormatClassLocator = new LauncherInputFormatClassLocator();
+
+    public static String getRecoveryId(Configuration launcherConf, Path actionDir, String recoveryId)
+            throws HadoopAccessorException, IOException {
+        String jobId = null;
+        Path recoveryFile = new Path(actionDir, recoveryId);
+        FileSystem fs = Services.get().get(HadoopAccessorService.class)
+                .createFileSystem(launcherConf.get("user.name"),recoveryFile.toUri(), launcherConf);
+
+        if (fs.exists(recoveryFile)) {
+            InputStream is = fs.open(recoveryFile);
+            BufferedReader reader = new BufferedReader(new InputStreamReader(is));
+            jobId = reader.readLine();
+            reader.close();
+        }
+        return jobId;
+
+    }
+
+    public static void setupMainClass(Configuration launcherConf, String javaMainClass) {
+        // Only set the javaMainClass if its not null or empty string, this way the user can override the action's main class via
+        // <configuration> property
+        if (javaMainClass != null && !javaMainClass.equals("")) {
+            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, javaMainClass);
+        }
+    }
+
+    public static void setupLauncherURIHandlerConf(Configuration launcherConf) {
+        for(Map.Entry<String, String> entry : Services.get().get(URIHandlerService.class).getLauncherConfig()) {
+            launcherConf.set(entry.getKey(), entry.getValue());
+        }
+    }
+
+    public static void setupMainArguments(Configuration launcherConf, String[] args) {
+        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_COUNT, args.length);
+        for (int i = 0; i < args.length; i++) {
+            launcherConf.set(LauncherMapper.CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i, args[i]);
+        }
+    }
+
+    public static void setupMaxOutputData(Configuration launcherConf, int maxOutputData) {
+        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, maxOutputData);
+    }
+
+    /**
+     * Set the maximum value of stats data
+     *
+     * @param launcherConf the oozie launcher configuration
+     * @param maxStatsData the maximum allowed size of stats data
+     */
+    public static void setupMaxExternalStatsSize(Configuration launcherConf, int maxStatsData){
+        launcherConf.setInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE, maxStatsData);
+    }
+
+    /**
+     * Set the maximum number of globbed files/dirs
+     *
+     * @param launcherConf the oozie launcher configuration
+     * @param fsGlobMax the maximum number of files/dirs for FS operation
+     */
+    public static void setupMaxFSGlob(Configuration launcherConf, int fsGlobMax){
+        launcherConf.setInt(LauncherMapper.CONF_OOZIE_ACTION_FS_GLOB_MAX, fsGlobMax);
+    }
+
+    public static void setupLauncherInfo(Configuration launcherConf, String jobId, String actionId, Path actionDir,
+            String recoveryId, Configuration actionConf, String prepareXML) throws IOException, HadoopAccessorException {
+
+        launcherConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+        launcherConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+        launcherConf.set(LauncherMapper.OOZIE_ACTION_DIR_PATH, actionDir.toString());
+        launcherConf.set(LauncherMapper.OOZIE_ACTION_RECOVERY_ID, recoveryId);
+        launcherConf.set(LauncherMapper.ACTION_PREPARE_XML, prepareXML);
+
+        actionConf.set(LauncherMapper.OOZIE_JOB_ID, jobId);
+        actionConf.set(LauncherMapper.OOZIE_ACTION_ID, actionId);
+
+        if (Services.get().getConf().getBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", false)) {
+          List<String> purgedEntries = new ArrayList<String>();
+          Collection<String> entries = actionConf.getStringCollection("mapreduce.job.cache.files");
+          for (String entry : entries) {
+            if (entry.contains("#")) {
+              purgedEntries.add(entry);
+            }
+          }
+          actionConf.setStrings("mapreduce.job.cache.files", purgedEntries.toArray(new String[purgedEntries.size()]));
+          launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
+        }
+    }
+
+    public static void setupYarnRestartHandling(Configuration launcherJobConf, Configuration actionConf, String launcherTag,
+                                                long launcherTime)
+            throws NoSuchAlgorithmException {
+        launcherJobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, launcherTime);
+        // Tags are limited to 100 chars so we need to hash them to make sure (the actionId otherwise doesn't have a max length)
+        String tag = getTag(launcherTag);
+        // keeping the oozie.child.mapreduce.job.tags instead of mapreduce.job.tags to avoid killing launcher itself.
+        // mapreduce.job.tags should only go to child job launch by launcher.
+        actionConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, tag);
+    }
+
+    public static String getTag(String launcherTag) throws NoSuchAlgorithmException {
+        MessageDigest digest = MessageDigest.getInstance("MD5");
+        digest.update(launcherTag.getBytes(), 0, launcherTag.length());
+        String md5 = "oozie-" + new BigInteger(1, digest.digest()).toString(16);
+        return md5;
+    }
+
+    public static boolean isMainDone(RunningJob runningJob) throws IOException {
+        return runningJob.isComplete();
+    }
+
+    public static boolean isMainSuccessful(RunningJob runningJob) throws IOException {
+        boolean succeeded = runningJob.isSuccessful();
+        if (succeeded) {
+            Counters counters = runningJob.getCounters();
+            if (counters != null) {
+                Counters.Group group = counters.getGroup(LauncherMapper.COUNTER_GROUP);
+                if (group != null) {
+                    succeeded = group.getCounter(LauncherMapper.COUNTER_LAUNCHER_ERROR) == 0;
+                }
+            }
+        }
+        return succeeded;
+    }
+
+    /**
+     * Determine whether action has external child jobs or not
+     * @param actionData
+     * @return true/false
+     * @throws IOException
+     */
+    public static boolean hasExternalChildJobs(Map<String, String> actionData) throws IOException {
+        return actionData.containsKey(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
+    }
+
+    /**
+     * Determine whether action has output data or not
+     * @param actionData
+     * @return true/false
+     * @throws IOException
+     */
+    public static boolean hasOutputData(Map<String, String> actionData) throws IOException {
+        return actionData.containsKey(LauncherMapper.ACTION_DATA_OUTPUT_PROPS);
+    }
+
+    /**
+     * Determine whether action has external stats or not
+     * @param actionData
+     * @return true/false
+     * @throws IOException
+     */
+    public static boolean hasStatsData(Map<String, String> actionData) throws IOException{
+        return actionData.containsKey(LauncherMapper.ACTION_DATA_STATS);
+    }
+
+    /**
+     * Determine whether action has new id (id swap) or not
+     * @param actionData
+     * @return true/false
+     * @throws IOException
+     */
+    public static boolean hasIdSwap(Map<String, String> actionData) throws IOException {
+        return actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID);
+    }
+
+    /**
+     * Get the sequence file path storing all action data
+     * @param actionDir
+     * @return
+     */
+    public static Path getActionDataSequenceFilePath(Path actionDir) {
+        return new Path(actionDir, LauncherMapper.ACTION_DATA_SEQUENCE_FILE);
+    }
+
+    /**
+     * Utility function to load the contents of action data sequence file into
+     * memory object
+     *
+     * @param fs Action Filesystem
+     * @param actionDir Path
+     * @param conf Configuration
+     * @return Map action data
+     * @throws IOException
+     * @throws InterruptedException
+     */
+    public static Map<String, String> getActionData(final FileSystem fs, final Path actionDir, final Configuration conf)
+            throws IOException, InterruptedException {
+        UserGroupInformationService ugiService = Services.get().get(UserGroupInformationService.class);
+        UserGroupInformation ugi = ugiService.getProxyUser(conf.get(OozieClient.USER_NAME));
+
+        return ugi.doAs(new PrivilegedExceptionAction<Map<String, String>>() {
+            @Override
+            public Map<String, String> run() throws IOException {
+                Map<String, String> ret = new HashMap<String, String>();
+                Path seqFilePath = getActionDataSequenceFilePath(actionDir);
+                if (fs.exists(seqFilePath)) {
+                    SequenceFile.Reader seqFile = new SequenceFile.Reader(fs, seqFilePath, conf);
+                    Text key = new Text(), value = new Text();
+                    while (seqFile.next(key, value)) {
+                        ret.put(key.toString(), value.toString());
+                    }
+                    seqFile.close();
+                }
+                else { // maintain backward-compatibility. to be deprecated
+                    org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(actionDir);
+                    InputStream is;
+                    BufferedReader reader = null;
+                    Properties props;
+                    if (files != null && files.length > 0) {
+                        for (int x = 0; x < files.length; x++) {
+                            Path file = files[x].getPath();
+                            if (file.equals(new Path(actionDir, "externalChildIds.properties"))) {
+                                is = fs.open(file);
+                                reader = new BufferedReader(new InputStreamReader(is));
+                                ret.put(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS,
+                                        IOUtils.getReaderAsString(reader, -1));
+                            }
+                            else if (file.equals(new Path(actionDir, "newId.properties"))) {
+                                is = fs.open(file);
+                                reader = new BufferedReader(new InputStreamReader(is));
+                                props = PropertiesUtils.readProperties(reader, -1);
+                                ret.put(LauncherMapper.ACTION_DATA_NEW_ID, props.getProperty("id"));
+                            }
+                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_OUTPUT_PROPS))) {
+                                int maxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA,
+                                        2 * 1024);
+                                is = fs.open(file);
+                                reader = new BufferedReader(new InputStreamReader(is));
+                                ret.put(LauncherMapper.ACTION_DATA_OUTPUT_PROPS, PropertiesUtils
+                                        .propertiesToString(PropertiesUtils.readProperties(reader, maxOutputData)));
+                            }
+                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_STATS))) {
+                                int statsMaxOutputData = conf.getInt(LauncherMapper.CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+                                        Integer.MAX_VALUE);
+                                is = fs.open(file);
+                                reader = new BufferedReader(new InputStreamReader(is));
+                                ret.put(LauncherMapper.ACTION_DATA_STATS, PropertiesUtils
+                                        .propertiesToString(PropertiesUtils.readProperties(reader, statsMaxOutputData)));
+                            }
+                            else if (file.equals(new Path(actionDir, LauncherMapper.ACTION_DATA_ERROR_PROPS))) {
+                                is = fs.open(file);
+                                reader = new BufferedReader(new InputStreamReader(is));
+                                ret.put(LauncherMapper.ACTION_DATA_ERROR_PROPS, IOUtils.getReaderAsString(reader, -1));
+                            }
+                        }
+                    }
+                }
+                return ret;
+            }
+        });
+    }
+
+    public static String getActionYarnTag(Configuration conf, String parentId, WorkflowAction wfAction) {
+        String tag;
+        if ( conf != null && conf.get(OOZIE_ACTION_YARN_TAG) != null) {
+            tag = conf.get(OOZIE_ACTION_YARN_TAG) + "@" + wfAction.getName();
+        } else if (parentId != null) {
+            tag = parentId + "@" + wfAction.getName();
+        } else {
+            tag = wfAction.getId();
+        }
+        return tag;
+    }
+
+}