You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2016/07/26 01:25:09 UTC

[3/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)

OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/fea512cf
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/fea512cf
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/fea512cf

Branch: refs/heads/oya
Commit: fea512cf66aec92d867e13c200978fd103868ab1
Parents: a37835f
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Jul 25 18:24:35 2016 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Jul 25 18:24:35 2016 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |  17 -
 .../action/hadoop/DistcpActionExecutor.java     |   7 +-
 .../action/hadoop/Hive2ActionExecutor.java      |   5 +-
 .../oozie/action/hadoop/HiveActionExecutor.java |   5 +-
 .../oozie/action/hadoop/JavaActionExecutor.java | 593 ++++++++---------
 .../action/hadoop/LauncherMapperHelper.java     |  12 -
 .../action/hadoop/SparkActionExecutor.java      |   5 +-
 .../action/hadoop/SqoopActionExecutor.java      |   9 +-
 .../oozie/service/HadoopAccessorService.java    |  97 ++-
 .../org/apache/oozie/util/ClasspathUtils.java   | 145 +++++
 core/src/main/resources/oozie-default.xml       |  25 -
 .../java/org/apache/oozie/QueryServlet.java     |  40 ++
 .../action/hadoop/TestJavaActionExecutor.java   | 531 ++--------------
 .../oozie/action/hadoop/TestLauncherAM.java     |  46 ++
 .../hadoop/TestLauncherAMCallbackNotifier.java  | 170 +++++
 .../action/hadoop/TestPrepareActionsDriver.java |  15 +-
 .../action/hadoop/TestShellActionExecutor.java  |  21 -
 .../apache/oozie/command/wf/HangServlet.java    |  19 +-
 .../oozie/service/TestConfigurationService.java |   3 -
 .../service/TestHadoopAccessorService.java      | 121 +++-
 .../java/org/apache/oozie/test/XTestCase.java   |   4 +
 .../apache/oozie/util/TestClasspathUtils.java   | 110 ++++
 release-log.txt                                 |   1 +
 sharelib/distcp/pom.xml                         |  12 -
 sharelib/hcatalog/pom.xml                       |  12 -
 sharelib/hive/pom.xml                           |  12 -
 sharelib/hive2/pom.xml                          |  12 -
 sharelib/oozie/pom.xml                          |  12 -
 .../apache/oozie/action/hadoop/LauncherAM.java  | 636 +++++++++++++++++++
 .../hadoop/LauncherAMCallbackNotifier.java      | 175 +++++
 .../oozie/action/hadoop/LauncherMapper.java     |   6 +-
 .../action/hadoop/PrepareActionsDriver.java     |  43 +-
 sharelib/pig/pom.xml                            |  12 -
 sharelib/spark/pom.xml                          |  12 -
 sharelib/sqoop/pom.xml                          |  12 -
 sharelib/streaming/pom.xml                      |  33 -
 36 files changed, 1899 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 6584af8..86feea0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -453,23 +453,6 @@
                 </configuration>
             </plugin>
             <plugin>
-              <artifactId>maven-dependency-plugin</artifactId>
-              <executions>
-                <execution>
-                  <id>create-mrapp-generated-classpath</id>
-                  <phase>generate-test-resources</phase>
-                  <goals>
-                    <goal>build-classpath</goal>
-                  </goals>
-                  <configuration>
-                    <!-- needed to run the unit test for DS to generate the required classpath
-                         that is required in the env of the launch container in the mini mr/yarn cluster -->
-                    <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                  </configuration>
-                </execution>
-              </executions>
-            </plugin>
-            <plugin>
                 <groupId>org.apache.openjpa</groupId>
                 <artifactId>openjpa-maven-plugin</artifactId>
                 <executions>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
index 96726da..99652e8 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
@@ -26,13 +26,10 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.ActionExecutor.Context;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XLog;
 import org.jdom.Element;
 import org.jdom.JDOMException;
@@ -126,9 +123,9 @@ public class DistcpActionExecutor extends JavaActionExecutor{
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
index b5b1bf9..9ba6318 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -134,9 +133,9 @@ public class Hive2ActionExecutor extends ScriptLanguageActionExecutor {
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
index c74e9e6..a850957 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.XOozieClient;
@@ -125,9 +124,9 @@ public class HiveActionExecutor extends ScriptLanguageActionExecutor {
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 99e3344..d573fc3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -18,42 +18,38 @@
 
 package org.apache.oozie.action.hadoop;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.ConnectException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
@@ -69,18 +65,41 @@ import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClasspathUtils;
 import org.apache.oozie.util.ELEvaluationException;
 import org.apache.oozie.util.ELEvaluator;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.util.JobUtils;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.PropertiesUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 import org.jdom.Namespace;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 
 public class JavaActionExecutor extends ActionExecutor {
 
@@ -94,7 +113,6 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
-    public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
     public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs";
     public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
     public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
@@ -117,7 +135,6 @@ public class JavaActionExecutor extends ActionExecutor {
     protected XLog LOG = XLog.getLog(getClass());
     private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
     private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
-    public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
     public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
     public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
     public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
@@ -138,10 +155,11 @@ public class JavaActionExecutor extends ActionExecutor {
 
     public static List<Class> getCommonLauncherClasses() {
         List<Class> classes = new ArrayList<Class>();
-        classes.add(LauncherMapper.class);
         classes.add(OozieLauncherInputFormat.class);
         classes.add(LauncherMain.class);
         classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+        classes.add(LauncherAM.class);
+        classes.add(LauncherAMCallbackNotifier.class);
         return classes;
     }
 
@@ -159,7 +177,7 @@ public class JavaActionExecutor extends ActionExecutor {
     @Override
     public void initActionType() {
         super.initActionType();
-        maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
+        maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
         //Get the limit for the maximum allowed size of action stats
         maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
         maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
@@ -257,8 +275,6 @@ public class JavaActionExecutor extends ActionExecutor {
             } catch (URISyntaxException ex) {
                 throw convertException(ex);
             }
-            // Inject use uber mode for launcher
-            injectLauncherUseUberMode(launcherConf);
             XConfiguration.copy(launcherConf, conf);
             checkForDisallowedProps(launcherConf, "launcher configuration");
             // Inject config-class for launcher to use for action
@@ -273,25 +289,6 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    void injectLauncherUseUberMode(Configuration launcherConf) {
-        // Set Uber Mode for the launcher (YARN only, ignored by MR1)
-        // Priority:
-        // 1. action's <configuration>
-        // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
-        // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
-        if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
-            if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
-                if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
-                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
-                }
-            } else {
-                if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) {
-                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
-                }
-            }
-        }
-    }
-
     void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
         // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
         if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
@@ -303,104 +300,6 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    void updateConfForUberMode(Configuration launcherConf) {
-
-        // child.env
-        boolean hasConflictEnv = false;
-        String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
-        if (launcherMapEnv == null) {
-            launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
-        }
-        String amEnv = launcherConf.get(YARN_AM_ENV);
-        StringBuffer envStr = new StringBuffer();
-        HashMap<String, List<String>> amEnvMap = null;
-        HashMap<String, List<String>> launcherMapEnvMap = null;
-        if (amEnv != null) {
-            envStr.append(amEnv);
-            amEnvMap = populateEnvMap(amEnv);
-        }
-        if (launcherMapEnv != null) {
-            launcherMapEnvMap = populateEnvMap(launcherMapEnv);
-            if (amEnvMap != null) {
-                Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
-                while (envKeyItr.hasNext()) {
-                    String envKey = envKeyItr.next();
-                    if (amEnvMap.containsKey(envKey)) {
-                        List<String> amValList = amEnvMap.get(envKey);
-                        List<String> launcherValList = launcherMapEnvMap.get(envKey);
-                        Iterator<String> valItr = launcherValList.iterator();
-                        while (valItr.hasNext()) {
-                            String val = valItr.next();
-                            if (!amValList.contains(val)) {
-                                hasConflictEnv = true;
-                                break;
-                            }
-                            else {
-                                valItr.remove();
-                            }
-                        }
-                        if (launcherValList.isEmpty()) {
-                            envKeyItr.remove();
-                        }
-                    }
-                }
-            }
-        }
-        if (hasConflictEnv) {
-            launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
-        }
-        else {
-            if (launcherMapEnvMap != null) {
-                for (String key : launcherMapEnvMap.keySet()) {
-                    List<String> launcherValList = launcherMapEnvMap.get(key);
-                    for (String val : launcherValList) {
-                        if (envStr.length() > 0) {
-                            envStr.append(",");
-                        }
-                        envStr.append(key).append("=").append(val);
-                    }
-                }
-            }
-
-            launcherConf.set(YARN_AM_ENV, envStr.toString());
-
-            // memory.mb
-            int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
-            int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
-            // YARN_MEMORY_MB_MIN to provide buffer.
-            // suppose launcher map aggressively use high memory, need some
-            // headroom for AM
-            int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
-            // limit to 4096 in case of 32 bit
-            if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
-                memoryMB = 4096;
-            }
-            launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
-
-            // We already made mapred.child.java.opts and
-            // mapreduce.map.java.opts equal, so just start with one of them
-            String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
-            String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
-            StringBuilder optsStr = new StringBuilder();
-            int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
-            int heapSizeForAm = extractHeapSizeMB(amChildOpts);
-            int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
-            // limit to 3584 in case of 32 bit
-            if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
-                heapSize = 3584;
-            }
-            if (amChildOpts != null) {
-                optsStr.append(amChildOpts);
-            }
-            optsStr.append(" ").append(launcherMapOpts.trim());
-            if (heapSize > 0) {
-                // append calculated total heap size to the end
-                optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
-            }
-            launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
-        }
-    }
-
     void updateConfForJavaTmpDir(Configuration conf) {
         String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
         String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
@@ -868,7 +767,7 @@ public class JavaActionExecutor extends ActionExecutor {
 
 
     protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
-        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
+        return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
     }
 
     private void setJavaMain(Configuration actionConf, Element actionXml) {
@@ -1004,15 +903,6 @@ public class JavaActionExecutor extends ActionExecutor {
             launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
             launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
 
-            // setting for uber mode
-            if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
-                if (checkPropertiesToDisableUber(launcherJobConf)) {
-                    launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
-                }
-                else {
-                    updateConfForUberMode(launcherJobConf);
-                }
-            }
             updateConfForJavaTmpDir(launcherJobConf);
             injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
 
@@ -1027,23 +917,9 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
-        boolean disable = false;
-        if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
-            disable = true;
-        }
-        else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
-            disable = true;
-        }
-        return disable;
-    }
-
     private void injectCallback(Context context, Configuration conf) {
-        String callback = context.getCallbackUrl("$jobStatus");
-        if (conf.get("job.end.notification.url") != null) {
-            LOG.warn("Overriding the action job end notification URI");
-        }
-        conf.set("job.end.notification.url", callback);
+        String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
     }
 
     void injectActionCallback(Context context, Configuration actionConf) {
@@ -1062,7 +938,7 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
+    public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
         JobClient jobClient = null;
         boolean exception = false;
         try {
@@ -1119,14 +995,17 @@ public class JavaActionExecutor extends ActionExecutor {
                     }
                 }
             }
-
             JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
 
-            LOG.debug("Creating Job Client for action " + action.getId());
-            jobClient = createJobClient(context, launcherJobConf);
-            String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
-                    .getRecoveryId());
-            boolean alreadyRunning = launcherId != null;
+            boolean alreadyRunning = false;
+            String launcherId = null;
+            String consoleUrl = null;
+            // TODO: OYA: equivalent of this? (recovery, alreadyRunning)  When does this happen?
+//            LOG.debug("Creating Job Client for action " + action.getId());
+//            jobClient = createJobClient(context, launcherJobConf);
+//            launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+//                    .getRecoveryId());
+//            alreadyRunning = launcherId != null;
             RunningJob runningJob;
 
             // if user-retry is on, always submit new launcher
@@ -1141,13 +1020,13 @@ public class JavaActionExecutor extends ActionExecutor {
                 }
             }
             else {
-                LOG.debug("Submitting the job through Job Client for action " + action.getId());
-
-                // setting up propagation of the delegation token.
-                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
-                        .getMRDelegationTokenRenewer(launcherJobConf));
-                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
+                // TODO: OYA: do we actually need an MR token?  IIRC, it's issued by the JHS
+//                // setting up propagation of the delegation token.
+//                Token<DelegationTokenIdentifier> mrdt = null;
+//                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+//                mrdt = jobClient.getDelegationToken(has
+//                        .getMRDelegationTokenRenewer(launcherJobConf));
+//                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
 
                 // insert credentials tokens to launcher job conf if needed
                 if (needInjectCredentials() && credentialsConf != null) {
@@ -1173,17 +1052,36 @@ public class JavaActionExecutor extends ActionExecutor {
                 else {
                     LOG.info("No need to inject credentials.");
                 }
-                runningJob = jobClient.submitJob(launcherJobConf);
-                if (runningJob == null) {
-                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
-                            "Error submitting launcher for action [{0}]", action.getId());
+
+                YarnClient yarnClient = null;
+                try {
+                    String user = context.getWorkflow().getUser();
+
+                    // Create application
+                    yarnClient = createYarnClient(context, launcherJobConf);
+                    YarnClientApplication newApp = yarnClient.createApplication();
+                    ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+
+                    // Create launch context for app master
+                    ApplicationSubmissionContext appContext =
+                            createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
+
+                    // Submit the launcher AM
+                    yarnClient.submitApplication(appContext);
+
+                    launcherId = appId.toString();
+                    LOG.debug("After submission get the launcherId [{0}]", launcherId);
+                    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+                    consoleUrl = appReport.getTrackingUrl();
+                } finally {
+                    if (yarnClient != null) {
+                        yarnClient.close();
+                        yarnClient = null;
+                    }
                 }
-                launcherId = runningJob.getID().toString();
-                LOG.debug("After submission get the launcherId " + launcherId);
             }
 
             String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
-            String consoleUrl = runningJob.getTrackingURL();
             context.setStartData(launcherId, jobTracker, consoleUrl);
         }
         catch (Exception ex) {
@@ -1206,6 +1104,91 @@ public class JavaActionExecutor extends ActionExecutor {
             }
         }
     }
+
+    private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, JobConf launcherJobConf, String user,
+                                                                    Context context, Configuration actionConf)
+            throws IOException, HadoopAccessorException, URISyntaxException {
+        // Create launch context for app master
+        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+        // set the application id
+        appContext.setApplicationId(appId);
+
+        // set the application name
+        appContext.setApplicationName(launcherJobConf.getJobName());
+        appContext.setApplicationType("Oozie Launcher");
+
+        // Set the priority for the application master
+        Priority pri = Records.newRecord(Priority.class);
+        int priority = 0; // TODO: OYA: Add a constant or a config
+        pri.setPriority(priority);
+        appContext.setPriority(pri);
+
+        // Set the queue to which this application is to be submitted in the RM
+        appContext.setQueue(launcherJobConf.getQueueName());
+
+        // Set up the container launch context for the application master
+        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+        // Set the resources to localize
+        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+        ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
+        MRApps.setupDistributedCache(launcherJobConf, localResources);
+        // Add the Launcher and Action configs as Resources
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
+                launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
+        localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
+        LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf,
+                context.getAppFileSystem().getUri(), context.getActionDir());
+        localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR);
+        amContainer.setLocalResources(localResources);
+
+        // Set the environment variables
+        Map<String, String> env = new HashMap<String, String>();
+        // This adds the Hadoop jars to the classpath in the Launcher JVM
+        ClasspathUtils.setupClasspath(env, launcherJobConf);
+        if (false) {        // TODO: OYA: config to add MR jars?  Probably also needed for MR Action
+            ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+        }
+        amContainer.setEnvironment(env);
+
+        // Set the command
+        List<String> vargs = new ArrayList<String>(6);
+        vargs.add(MRApps.crossPlatformifyMREnv(launcherJobConf, ApplicationConstants.Environment.JAVA_HOME)
+                + "/bin/java");
+        // TODO: OYA: remove attach debugger to AM; useful for debugging
+//                    vargs.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005");
+        MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs);
+        vargs.add(LauncherAM.class.getName());
+        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+                Path.SEPARATOR + ApplicationConstants.STDOUT);
+        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+                Path.SEPARATOR + ApplicationConstants.STDERR);
+        List<String> vargsFinal = new ArrayList<String>(6);
+        StringBuilder mergedCommand = new StringBuilder();
+        for (CharSequence str : vargs) {
+            mergedCommand.append(str).append(" ");
+        }
+        vargsFinal.add(mergedCommand.toString());
+        LOG.debug("Command to launch container for ApplicationMaster is : "
+                + mergedCommand);
+        amContainer.setCommands(vargsFinal);
+        appContext.setAMContainerSpec(amContainer);
+
+        // Set tokens
+        DataOutputBuffer dob = new DataOutputBuffer();
+        launcherJobConf.getCredentials().writeTokenStorageToStream(dob);
+        amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+        // Set Resources
+        // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores)
+        Resource resource = Resource.newInstance(2048, 1);
+        appContext.setResource(resource);
+
+        return appContext;
+    }
+
     private boolean needInjectCredentials() {
         boolean methodExists = true;
 
@@ -1409,6 +1392,19 @@ public class JavaActionExecutor extends ActionExecutor {
         return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
     }
 
+    /**
+     * Create yarn client object
+     *
+     * @param context
+     * @param jobConf
+     * @return YarnClient
+     * @throws HadoopAccessorException
+     */
+    protected YarnClient createYarnClient(Context context, JobConf jobConf) throws HadoopAccessorException {
+        String user = context.getWorkflow().getUser();
+        return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
+    }
+
     protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
         RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
         return runningJob;
@@ -1425,129 +1421,112 @@ public class JavaActionExecutor extends ActionExecutor {
 
     @Override
     public void check(Context context, WorkflowAction action) throws ActionExecutorException {
-        JobClient jobClient = null;
-        boolean exception = false;
+        boolean fallback = false;
+        LOG = XLog.resetPrefix(LOG);
         LogUtils.setLogInfo(action);
+        YarnClient yarnClient = null;
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
-            FileSystem actionFs = context.getAppFileSystem();
             JobConf jobConf = createBaseHadoopConf(context, actionXml);
-            jobClient = createJobClient(context, jobConf);
-            RunningJob runningJob = getRunningJob(context, action, jobClient);
-            if (runningJob == null) {
-                context.setExecutionData(FAILED, null);
-                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
-                        "Could not lookup launched hadoop Job ID [{0}] which was associated with " +
-                        " action [{1}].  Failing this action!", getActualExternalId(action), action.getId());
-            }
-            if (runningJob.isComplete()) {
+            FileSystem actionFs = context.getAppFileSystem();
+            yarnClient = createYarnClient(context, jobConf);
+            FinalApplicationStatus appStatus = null;
+            try {
+                ApplicationReport appReport =
+                        yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
+                YarnApplicationState appState = appReport.getYarnApplicationState();
+                if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
+                        || appState == YarnApplicationState.KILLED) {
+                    appStatus = appReport.getFinalApplicationStatus();
+                }
+
+            } catch (Exception ye) {
+                LOG.debug("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye);
+                // Fallback to action data file if we can't find the Launcher AM (maybe it got purged)
+                fallback = true;
+            }
+            if (appStatus != null || fallback) {
                 Path actionDir = context.getActionDir();
                 String newId = null;
                 // load sequence file into object
                 Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
-                if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
-                    newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
-                    String launcherId = action.getExternalId();
-                    runningJob = jobClient.getJob(JobID.forName(newId));
-                    if (runningJob == null) {
-                        context.setExternalStatus(FAILED);
+                if (fallback) {
+                    String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
+                    if (finalStatus != null) {
+                        appStatus = FinalApplicationStatus.valueOf(finalStatus);
+                    } else {
+                        context.setExecutionData(FAILED, null);
                         throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
-                                "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
-                                action.getId());
+                                "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" +
+                                        " action data.  Failing this action!", action.getExternalId(), action.getId());
                     }
-                    context.setExternalChildIDs(newId);
-                    LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
-                            newId);
                 }
-                else {
-                    String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
-                    if (externalIDs != null) {
-                        context.setExternalChildIDs(externalIDs);
-                        LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
-                    }
+                String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+                if (externalIDs != null) {
+                    context.setExternalChildIDs(externalIDs);
+                    LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
                 }
-                if (runningJob.isComplete()) {
-                    // fetching action output and stats for the Map-Reduce action.
-                    if (newId != null) {
-                        actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
+                LOG.info(XLog.STD, "action completed, external ID [{0}]",
+                        action.getExternalId());
+                context.setExecutionData(appStatus.toString(), null);
+                if (appStatus == FinalApplicationStatus.SUCCEEDED) {
+                    if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
+                        context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
+                                .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
+                        LOG.info(XLog.STD, "action produced output");
                     }
-                    LOG.info(XLog.STD, "action completed, external ID [{0}]",
-                            action.getExternalId());
-                    if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
-                        if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
-                            context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
-                                    .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
-                            LOG.info(XLog.STD, "action produced output");
+                    else {
+                        context.setExecutionData(SUCCEEDED, null);
+                    }
+                    if (LauncherMapperHelper.hasStatsData(actionData)) {
+                        context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
+                        LOG.info(XLog.STD, "action produced stats");
+                    }
+                    getActionData(actionFs, action, context);
+                }
+                else {
+                    String errorReason;
+                    if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) {
+                        Properties props = PropertiesUtils.stringToProperties(actionData
+                                .get(LauncherAM.ACTION_DATA_ERROR_PROPS));
+                        String errorCode = props.getProperty("error.code");
+                        if ("0".equals(errorCode)) {
+                            errorCode = "JA018";
                         }
-                        else {
-                            context.setExecutionData(SUCCEEDED, null);
+                        if ("-1".equals(errorCode)) {
+                            errorCode = "JA019";
                         }
-                        if (LauncherMapperHelper.hasStatsData(actionData)) {
-                            context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
-                            LOG.info(XLog.STD, "action produced stats");
+                        errorReason = props.getProperty("error.reason");
+                        LOG.warn("Launcher ERROR, reason: {0}", errorReason);
+                        String exMsg = props.getProperty("exception.message");
+                        String errorInfo = (exMsg != null) ? exMsg : errorReason;
+                        context.setErrorInfo(errorCode, errorInfo);
+                        String exStackTrace = props.getProperty("exception.stacktrace");
+                        if (exMsg != null) {
+                            LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
                         }
-                        getActionData(actionFs, runningJob, action, context);
                     }
                     else {
-                        String errorReason;
-                        if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
-                            Properties props = PropertiesUtils.stringToProperties(actionData
-                                    .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
-                            String errorCode = props.getProperty("error.code");
-                            if ("0".equals(errorCode)) {
-                                errorCode = "JA018";
-                            }
-                            if ("-1".equals(errorCode)) {
-                                errorCode = "JA019";
-                            }
-                            errorReason = props.getProperty("error.reason");
-                            LOG.warn("Launcher ERROR, reason: {0}", errorReason);
-                            String exMsg = props.getProperty("exception.message");
-                            String errorInfo = (exMsg != null) ? exMsg : errorReason;
-                            context.setErrorInfo(errorCode, errorInfo);
-                            String exStackTrace = props.getProperty("exception.stacktrace");
-                            if (exMsg != null) {
-                                LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
-                            }
-                        }
-                        else {
-                            errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
-                                    .getTrackerUri(), action.getExternalId());
-                            LOG.warn(errorReason);
-                        }
-                        context.setExecutionData(FAILED_KILLED, null);
+                        errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action
+                                .getTrackerUri(), action.getExternalId());
+                        LOG.warn(errorReason);
                     }
-                }
-                else {
-                    context.setExternalStatus("RUNNING");
-                    LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
-                            runningJob.getID());
+                    context.setExecutionData(FAILED_KILLED, null);
                 }
             }
             else {
-                context.setExternalStatus("RUNNING");
+                context.setExternalStatus(YarnApplicationState.RUNNING.toString());
                 LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
-                        runningJob.getID());
+                        action.getExternalId());
             }
         }
         catch (Exception ex) {
             LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
-            exception = true;
             throw convertException(ex);
         }
         finally {
-            if (jobClient != null) {
-                try {
-                    jobClient.close();
-                }
-                catch (Exception e) {
-                    if (exception) {
-                        LOG.error("JobClient error: ", e);
-                    }
-                    else {
-                        throw convertException(e);
-                    }
-                }
+            if (yarnClient != null) {
+                IOUtils.closeQuietly(yarnClient);
             }
         }
     }
@@ -1555,14 +1534,12 @@ public class JavaActionExecutor extends ActionExecutor {
     /**
      * Get the output data of an action. Subclasses should override this method
      * to get action specific output data.
-     *
      * @param actionFs the FileSystem object
-     * @param runningJob the runningJob
      * @param action the Workflow action
      * @param context executor context
      *
      */
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
     }
 
@@ -1585,38 +1562,28 @@ public class JavaActionExecutor extends ActionExecutor {
 
     @Override
     public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
-        JobClient jobClient = null;
-        boolean exception = false;
+        YarnClient yarnClient = null;
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
+            String user = context.getWorkflow().getUser();
             JobConf jobConf = createBaseHadoopConf(context, actionXml);
-            jobClient = createJobClient(context, jobConf);
-            RunningJob runningJob = getRunningJob(context, action, jobClient);
-            if (runningJob != null) {
-                runningJob.killJob();
-            }
+            yarnClient = createYarnClient(context, jobConf);
+            yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
             context.setExternalStatus(KILLED);
             context.setExecutionData(KILLED, null);
-        }
-        catch (Exception ex) {
-            exception = true;
+        } catch (Exception ex) {
+            LOG.error("Error: ", ex);
             throw convertException(ex);
-        }
-        finally {
+        } finally {
             try {
                 FileSystem actionFs = context.getAppFileSystem();
                 cleanUpActionDir(actionFs, context);
-                if (jobClient != null) {
-                    jobClient.close();
-                }
-            }
-            catch (Exception ex) {
-                if (exception) {
-                    LOG.error("Error: ", ex);
-                }
-                else {
-                    throw convertException(ex);
+                if (yarnClient != null) {
+                    yarnClient.close();
                 }
+            } catch (Exception ex) {
+                LOG.error("Error: ", ex);
+                throw convertException(ex);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 69e1044..07d1262 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -145,18 +145,6 @@ public class LauncherMapperHelper {
           launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
         }
 
-        FileSystem fs =
-          Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
-                                                                           actionDir.toUri(), launcherConf);
-        fs.mkdirs(actionDir);
-
-        OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
-        try {
-            actionConf.writeXml(os);
-        } finally {
-            IOUtils.closeSafely(os);
-        }
-
         launcherConf.setInputFormat(OozieLauncherInputFormat.class);
         launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 252f461..6a41235 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -157,9 +156,9 @@ public class SparkActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
index 6813a37..82e5f0c 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
@@ -23,7 +23,6 @@ import java.io.StringReader;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -232,17 +231,15 @@ public class SqoopActionExecutor extends JavaActionExecutor {
 
     /**
      * Get the stats and external child IDs
-     *
-     * @param actionFs the FileSystem object
-     * @param runningJob the runningJob
+     *  @param actionFs the FileSystem object
      * @param action the Workflow action
      * @param context executor context
      *
      */
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 794e825..0177241 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.service;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -29,7 +30,14 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.hadoop.JavaActionExecutor;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XConfiguration;
@@ -39,6 +47,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -511,9 +520,43 @@ public class HadoopAccessorService implements Service {
     }
 
     /**
-     * Return a FileSystem created with the provided user for the specified URI.
+     * Return a YarnClient created with the provided user and configuration.
      *
+     * @param user The username to impersonate
+     * @param conf The conf
+     * @return a YarnClient with the provided user and configuration
+     * @throws HadoopAccessorException if the client could not be created.
+     */
+    public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException {
+        ParamChecker.notEmpty(user, "user");
+        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+            throw new HadoopAccessorException(ErrorCode.E0903);
+        }
+        String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
+        validateJobTracker(rm);
+        try {
+            UserGroupInformation ugi = getUGI(user);
+            YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() {
+                @Override
+                public YarnClient run() throws Exception {
+                    YarnClient yarnClient = YarnClient.createYarnClient();
+                    yarnClient.init(conf);
+                    yarnClient.start();
+                    return yarnClient;
+                }
+            });
+            return yarnClient;
+        } catch (InterruptedException ex) {
+            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+        } catch (IOException ex) {
+            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+        }
+    }
+
+    /**
+     * Return a FileSystem created with the provided user for the specified URI.
      *
+     * @param user The username to impersonate
      * @param uri file system URI.
      * @param conf Configuration with all necessary information to create the FileSystem.
      * @return FileSystem created with the provided user/group.
@@ -667,4 +710,56 @@ public class HadoopAccessorService implements Service {
         return supportedSchemes;
     }
 
+    /**
+     * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container.  This involves also writing it
+     * to HDFS.
+     * Example usage:
+     * * <pre>
+     * {@code
+     * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir);
+     * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir);
+     * ...
+     * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+     * localResources.put(filename1, res1);
+     * localResources.put(filename2, res2);
+     * ...
+     * containerLaunchContext.setLocalResources(localResources);
+     * }
+     * </pre>
+     *
+     * @param filename The filename to use on the remote filesystem and once it has been localized.
+     * @param user The user
+     * @param conf The configuration to process
+     * @param uri The URI of the remote filesystem (e.g. HDFS)
+     * @param dir The directory on the remote filesystem to write the file to
+     * @return
+     * @throws IOException A problem occurred writing the file
+     * @throws HadoopAccessorException A problem occured with Hadoop
+     * @throws URISyntaxException A problem occurred parsing the URI
+     */
+    public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri,
+                                                                 Path dir)
+            throws IOException, HadoopAccessorException, URISyntaxException {
+        File f = File.createTempFile(filename, ".tmp");
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(f);
+            conf.writeXml(fos);
+        } finally {
+            if (fos != null) {
+                fos.close();
+            }
+        }
+        FileSystem fs = createFileSystem(user, uri, conf);
+        Path dst = new Path(dir, filename);
+        fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst);
+        LocalResource localResource = Records.newRecord(LocalResource.class);
+        localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+        localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+        FileStatus destStatus = fs.getFileStatus(dst);
+        localResource.setTimestamp(destStatus.getModificationTime());
+        localResource.setSize(destStatus.getLen());
+        return localResource;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
new file mode 100644
index 0000000..8533371
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class ClasspathUtils {
+    private static boolean usingMiniYarnCluster = false;
+    private static final List<String> CLASSPATH_ENTRIES = Arrays.asList(
+            ApplicationConstants.Environment.PWD.$(),
+            MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR,
+            MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
+            MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
+            ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + "*"
+    );
+
+    @VisibleForTesting
+    public static void setUsingMiniYarnCluster(boolean useMiniYarnCluster) {
+        usingMiniYarnCluster = useMiniYarnCluster;
+    }
+
+    // Adapted from MRApps#setClasspath.  Adds Yarn, HDFS, Common, and distributed cache jars.
+    public static void setupClasspath(Map<String, String> env, Configuration conf) throws IOException {
+        // Propagate the system classpath when using the mini cluster
+        if (usingMiniYarnCluster) {
+            MRApps.addToEnvironment(
+                    env,
+                    ApplicationConstants.Environment.CLASSPATH.name(),
+                    System.getProperty("java.class.path"), conf);
+        }
+
+        for (String entry : CLASSPATH_ENTRIES) {
+            MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(), entry, conf);
+        }
+
+        // a * in the classpath will only find a .jar, so we need to filter out
+        // all .jars and add everything else
+        addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getFileClassPaths(conf),
+                org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheFiles(conf),
+                conf,
+                env, ApplicationConstants.Environment.PWD.$());
+        addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getArchiveClassPaths(conf),
+                org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheArchives(conf),
+                conf,
+                env, ApplicationConstants.Environment.PWD.$());
+
+
+        boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+                MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+        for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                crossPlatform
+                        ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH
+                        : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+            MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+                    c.trim(), conf);
+        }
+    }
+
+    // Adapted from MRApps#setClasspath
+    public static void addMapReduceToClasspath(Map<String, String> env, Configuration conf) {
+        boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+                MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+        for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+                crossPlatform ?
+                        StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH)
+                        : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
+            MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+                    c.trim(), conf);
+        }
+    }
+
+    // Borrowed from MRApps#addToClasspathIfNotJar
+    private static void addToClasspathIfNotJar(Path[] paths,
+                                               URI[] withLinks, Configuration conf,
+                                               Map<String, String> environment,
+                                               String classpathEnvVar) throws IOException {
+        if (paths != null) {
+            HashMap<Path, String> linkLookup = new HashMap<Path, String>();
+            if (withLinks != null) {
+                for (URI u: withLinks) {
+                    Path p = new Path(u);
+                    FileSystem remoteFS = p.getFileSystem(conf);
+                    p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+                            remoteFS.getWorkingDirectory()));
+                    String name = (null == u.getFragment())
+                            ? p.getName() : u.getFragment();
+                    if (!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+                        linkLookup.put(p, name);
+                    }
+                }
+            }
+
+            for (Path p : paths) {
+                FileSystem remoteFS = p.getFileSystem(conf);
+                p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+                        remoteFS.getWorkingDirectory()));
+                String name = linkLookup.get(p);
+                if (name == null) {
+                    name = p.getName();
+                }
+                if(!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+                    MRApps.addToEnvironment(
+                            environment,
+                            classpathEnvVar,
+                            ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + name, conf);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 6c2f7d8..5f4645c 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1782,31 +1782,6 @@ will be the requeue interval for the actions which are waiting for a long time w
     </property>
 
     <property>
-        <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name>
-        <value>true</value>
-        <description>
-            Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default.
-            This can be overridden on a per-action-type basis by setting
-            oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action
-            type; for example, "pig").  And that can be overridden on a per-action basis by setting
-            oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow.  In summary, the
-            priority is this:
-            1. action's configuration section in a workflow
-            2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site
-            3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site
-        </description>
-    </property>
-
-    <property>
-        <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name>
-        <value>false</value>
-        <description>
-            The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by
-            default for it.  See oozie.action.launcher.mapreduce.job.ubertask.enable
-        </description>
-    </property>
-
-    <property>
         <name>oozie.action.shell.setup.hadoop.conf.dir</name>
         <value>false</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/QueryServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/QueryServlet.java b/core/src/test/java/org/apache/oozie/QueryServlet.java
new file mode 100644
index 0000000..8789438
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/QueryServlet.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URLDecoder;
+
+/**
+ * Servlet that keeps track of the last query string it recieved
+ */
+public class QueryServlet extends HttpServlet {
+
+    public static String lastQueryString = null;
+
+    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        lastQueryString = URLDecoder.decode(request.getQueryString(), "UTF-8");
+        response.setStatus(HttpServletResponse.SC_OK);
+    }
+
+}