You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2016/07/26 01:25:07 UTC

[1/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)

Repository: oozie
Updated Branches:
  refs/heads/oya a37835fec -> fea512cf6


http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
new file mode 100644
index 0000000..e056acc
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -0,0 +1,636 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.io.Reader;
+import java.io.StringWriter;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.security.Permission;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.StringTokenizer;
+
+public class LauncherAM {
+
+    static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
+
+    static final String ACTION_PREFIX = "oozie.action.";
+    public static final String CONF_OOZIE_ACTION_MAX_OUTPUT_DATA = ACTION_PREFIX + "max.output.data";
+    static final String CONF_OOZIE_ACTION_MAIN_ARG_PREFIX = ACTION_PREFIX + "main.arg.";
+    static final String CONF_OOZIE_ACTION_MAIN_ARG_COUNT = ACTION_PREFIX + CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + "count";
+    static final String CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE = "oozie.external.stats.max.size";
+
+    static final String OOZIE_ACTION_DIR_PATH = ACTION_PREFIX + "dir.path";
+    static final String ACTION_PREPARE_XML = ACTION_PREFIX + "prepare.xml";
+    static final String ACTION_DATA_SEQUENCE_FILE = "action-data.seq"; // COMBO FILE
+    static final String ACTION_DATA_EXTERNAL_CHILD_IDS = "externalChildIDs";
+    static final String ACTION_DATA_OUTPUT_PROPS = "output.properties";
+    static final String ACTION_DATA_STATS = "stats.properties";
+    static final String ACTION_DATA_NEW_ID = "newId";
+    static final String ACTION_DATA_ERROR_PROPS = "error.properties";
+
+    // TODO: OYA: more unique file names?  action.xml may be stuck for backwards compat though
+    public static final String LAUNCHER_JOB_CONF_XML = "launcher.xml";
+    public static final String ACTION_CONF_XML = "action.xml";
+    public static final String ACTION_DATA_FINAL_STATUS = "final.status";
+
+    private static AMRMClientAsync<AMRMClient.ContainerRequest> amRmClientAsync = null;
+    private static Configuration launcherJobConf = null;
+    private static Path actionDir;
+    private static Map<String, String> actionData = new HashMap<String,String>();
+
+    private static void printDebugInfo(String[] mainArgs) throws IOException {
+        printContentsOfCurrentDir();
+
+        System.out.println();
+        System.out.println("Oozie Launcher Application Master configuration");
+        System.out.println("===============================================");
+        System.out.println("Workflow job id   : " + launcherJobConf.get("oozie.job.id"));
+        System.out.println("Workflow action id: " + launcherJobConf.get("oozie.action.id"));
+        System.out.println();
+        System.out.println("Classpath         :");
+        System.out.println("------------------------");
+        StringTokenizer st = new StringTokenizer(System.getProperty("java.class.path"), ":");
+        while (st.hasMoreTokens()) {
+            System.out.println("  " + st.nextToken());
+        }
+        System.out.println("------------------------");
+        System.out.println();
+        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+        System.out.println("Main class        : " + mainClass);
+        System.out.println();
+        System.out.println("Maximum output    : "
+                + launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024));
+        System.out.println();
+        System.out.println("Arguments         :");
+        for (String arg : mainArgs) {
+            System.out.println("                    " + arg);
+        }
+
+        System.out.println();
+        System.out.println("Java System Properties:");
+        System.out.println("------------------------");
+        System.getProperties().store(System.out, "");
+        System.out.flush();
+        System.out.println("------------------------");
+        System.out.println();
+
+        System.out.println("=================================================================");
+        System.out.println();
+        System.out.println(">>> Invoking Main class now >>>");
+        System.out.println();
+        System.out.flush();
+    }
+
+    // TODO: OYA: delete me when making real Action Mains
+    public static class DummyMain {
+        public static void main(String[] args) throws Exception {
+            System.out.println("Hello World!");
+            if (launcherJobConf.get("foo", "0").equals("1")) {
+                throw new IOException("foo 1");
+            } else if (launcherJobConf.get("foo", "0").equals("2")) {
+                throw new JavaMainException(new IOException("foo 2"));
+            } else if (launcherJobConf.get("foo", "0").equals("3")) {
+                throw new LauncherMainException(3);
+            } else if (launcherJobConf.get("foo", "0").equals("4")) {
+                System.exit(0);
+            } else if (launcherJobConf.get("foo", "0").equals("5")) {
+                System.exit(1);
+            }
+        }
+    }
+
+    // TODO: OYA: rethink all print messages and formatting
+    public static void main(String[] AMargs) throws Exception {
+        ErrorHolder eHolder = new ErrorHolder();
+        FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
+        try {
+            try {
+                launcherJobConf = readLauncherConf();
+                System.out.println("Launcher AM configuration loaded");
+            } catch (Exception ex) {
+                eHolder.setErrorMessage("Could not load the Launcher AM configuration file");
+                eHolder.setErrorCause(ex);
+                throw ex;
+            }
+
+            registerWithRM();
+
+            actionDir = new Path(launcherJobConf.get(OOZIE_ACTION_DIR_PATH));
+
+            try {
+                System.out.println("\nStarting the execution of prepare actions");
+                executePrepare();
+                System.out.println("Completed the execution of prepare actions successfully");
+            } catch (Exception ex) {
+                eHolder.setErrorMessage("Prepare execution in the Launcher AM has failed");
+                eHolder.setErrorCause(ex);
+                throw ex;
+            }
+
+            String[] mainArgs = getMainArguments(launcherJobConf);
+
+            // TODO: OYA: should we allow turning this off?
+            // TODO: OYA: what should default be?
+            if (launcherJobConf.getBoolean("oozie.launcher.print.debug.info", true)) {
+                printDebugInfo(mainArgs);
+            }
+            finalStatus = runActionMain(mainArgs, eHolder);
+            if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
+                handleActionData();
+                if (actionData.get(ACTION_DATA_OUTPUT_PROPS) != null) {
+                    System.out.println();
+                    System.out.println("Oozie Launcher, capturing output data:");
+                    System.out.println("=======================");
+                    System.out.println(actionData.get(ACTION_DATA_OUTPUT_PROPS));
+                    System.out.println();
+                    System.out.println("=======================");
+                    System.out.println();
+                }
+                if (actionData.get(ACTION_DATA_NEW_ID) != null) {
+                    System.out.println();
+                    System.out.println("Oozie Launcher, propagating new Hadoop job id to Oozie");
+                    System.out.println("=======================");
+                    System.out.println(actionData.get(ACTION_DATA_NEW_ID));
+                    System.out.println("=======================");
+                    System.out.println();
+                }
+            }
+        } finally {
+            try {
+                // Store final status in case Launcher AM falls off the RM
+                actionData.put(ACTION_DATA_FINAL_STATUS, finalStatus.toString());
+                if (finalStatus != FinalApplicationStatus.SUCCEEDED) {
+                    failLauncher(eHolder);
+                }
+                uploadActionDataToHDFS();
+            } finally {
+                try {
+                    unregisterWithRM(finalStatus, eHolder.getErrorMessage());
+                } finally {
+                    LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(launcherJobConf);
+                    cn.notifyURL(finalStatus);
+                }
+            }
+        }
+    }
+
+    private static void registerWithRM() throws IOException, YarnException {
+        AMRMClient<AMRMClient.ContainerRequest> amRmClient = AMRMClient.createAMRMClient();
+
+        AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler();
+        // TODO: OYA: make heartbeat interval configurable
+        // TODO: OYA: make heartbeat interval higher to put less load on RM, but lower than timeout
+        amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, 60000, callBackHandler);
+        amRmClientAsync.init(launcherJobConf);
+        amRmClientAsync.start();
+
+        // hostname and tracking url are determined automatically
+        amRmClientAsync.registerApplicationMaster("", 0, "");
+    }
+
+    private static void unregisterWithRM(FinalApplicationStatus status, String message) throws YarnException, IOException {
+        if (amRmClientAsync != null) {
+            System.out.println("Stopping AM");
+            try {
+                message = (message == null) ? "" : message;
+                // tracking url is determined automatically
+                amRmClientAsync.unregisterApplicationMaster(status, message, "");
+            } catch (YarnException ex) {
+                System.err.println("Error un-registering AM client");
+                throw ex;
+            } catch (IOException ex) {
+                System.err.println("Error un-registering AM client");
+                throw ex;
+            } finally {
+                amRmClientAsync.stop();
+                amRmClientAsync = null;
+            }
+        }
+    }
+
+    // Method to execute the prepare actions
+    private static void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException {
+        String prepareXML = launcherJobConf.get(ACTION_PREPARE_XML);
+        if (prepareXML != null) {
+            if (prepareXML.length() != 0) {
+                Configuration actionConf = new Configuration(launcherJobConf);
+                actionConf.addResource(ACTION_CONF_XML);
+                PrepareActionsDriver.doOperations(prepareXML, actionConf);
+            } else {
+                System.out.println("There are no prepare actions to execute.");
+            }
+        }
+    }
+
+    private static FinalApplicationStatus runActionMain(String[] mainArgs, ErrorHolder eHolder) {
+        FinalApplicationStatus finalStatus = FinalApplicationStatus.FAILED;
+        LauncherSecurityManager secMan = new LauncherSecurityManager();
+        try {
+            Class<?> klass = launcherJobConf.getClass(CONF_OOZIE_ACTION_MAIN_CLASS, Object.class);
+            Method mainMethod = klass.getMethod("main", String[].class);
+            // Enable LauncherSecurityManager to catch System.exit calls
+            secMan.set();
+            // TODO: OYA: remove this line to actually run the Main class instead of this dummy
+            mainMethod = DummyMain.class.getMethod("main", String[].class);
+            mainMethod.invoke(null, (Object) mainArgs);
+
+            System.out.println();
+            System.out.println("<<< Invocation of Main class completed <<<");
+            System.out.println();
+            finalStatus = FinalApplicationStatus.SUCCEEDED;
+        } catch (InvocationTargetException ex) {
+            // Get what actually caused the exception
+            Throwable cause = ex.getCause();
+            // If we got a JavaMainException from JavaMain, then we need to unwrap it
+            if (JavaMainException.class.isInstance(cause)) {
+                cause = cause.getCause();
+            }
+            if (LauncherMainException.class.isInstance(cause)) {
+                String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" +
+                        ((LauncherMainException) ex.getCause()).getErrorCode() + "]");
+            } else if (SecurityException.class.isInstance(cause)) {
+                if (secMan.getExitInvoked()) {
+                    System.out.println("Intercepting System.exit(" + secMan.getExitCode()
+                            + ")");
+                    System.err.println("Intercepting System.exit(" + secMan.getExitCode()
+                            + ")");
+                    // if 0 main() method finished successfully
+                    // ignoring
+                    eHolder.setErrorCode(secMan.getExitCode());
+                    if (eHolder.getErrorCode() != 0) {
+                        String mainClass = launcherJobConf.get(CONF_OOZIE_ACTION_MAIN_CLASS);
+                        eHolder.setErrorMessage("Main Class [" + mainClass + "], exit code [" + eHolder.getErrorCode() + "]");
+                    } else {
+                        finalStatus = FinalApplicationStatus.SUCCEEDED;
+                    }
+                }
+            } else {
+                eHolder.setErrorMessage(cause.getMessage());
+                eHolder.setErrorCause(cause);
+            }
+        } catch (Throwable t) {
+            eHolder.setErrorMessage(t.getMessage());
+            eHolder.setErrorCause(t);
+        } finally {
+            // Disable LauncherSecurityManager
+            secMan.unset();
+        }
+        return finalStatus;
+    }
+
+    private static void handleActionData() throws IOException {
+        // external child IDs
+        String externalChildIdsProp = System.getProperty(ACTION_PREFIX
+                + ACTION_DATA_EXTERNAL_CHILD_IDS);
+        if (externalChildIdsProp != null) {
+            File externalChildIDs = new File(externalChildIdsProp);
+            if (externalChildIDs.exists()) {
+                actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1));
+            }
+        }
+
+        // external stats
+        String statsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_STATS);
+        if (statsProp != null) {
+            File actionStatsData = new File(statsProp);
+            if (actionStatsData.exists()) {
+                int statsMaxOutputData = launcherJobConf.getInt(CONF_OOZIE_EXTERNAL_STATS_MAX_SIZE,
+                        Integer.MAX_VALUE);
+                actionData.put(ACTION_DATA_STATS,
+                        getLocalFileContentStr(actionStatsData, "Stats", statsMaxOutputData));
+            }
+        }
+
+        // output data
+        String outputProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_OUTPUT_PROPS);
+        if (outputProp != null) {
+            File actionOutputData = new File(outputProp);
+            if (actionOutputData.exists()) {
+                int maxOutputData = launcherJobConf.getInt(CONF_OOZIE_ACTION_MAX_OUTPUT_DATA, 2 * 1024);
+                actionData.put(ACTION_DATA_OUTPUT_PROPS,
+                        getLocalFileContentStr(actionOutputData, "Output", maxOutputData));
+            }
+        }
+
+        // id swap
+        String newIdProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_NEW_ID);
+        if (newIdProp != null) {
+            File newId = new File(newIdProp);
+            if (newId.exists()) {
+                actionData.put(ACTION_DATA_NEW_ID, getLocalFileContentStr(newId, "", -1));
+            }
+        }
+    }
+
+    public static String getLocalFileContentStr(File file, String type, int maxLen) throws IOException {
+        StringBuilder sb = new StringBuilder();
+        Reader reader = null;
+        try {
+            reader = new BufferedReader(new FileReader(file));
+            char[] buffer = new char[2048];
+            int read;
+            int count = 0;
+            while ((read = reader.read(buffer)) > -1) {
+                count += read;
+                if (maxLen > -1 && count > maxLen) {
+                    throw new IOException(type + " data exceeds its limit [" + maxLen + "]");
+                }
+                sb.append(buffer, 0, read);
+            }
+        } finally {
+            if (reader != null) {
+                reader.close();
+            }
+        }
+        return sb.toString();
+    }
+
+    private static void uploadActionDataToHDFS() throws IOException {
+        Path finalPath = new Path(actionDir, ACTION_DATA_SEQUENCE_FILE);
+        FileSystem fs = FileSystem.get(finalPath.toUri(), launcherJobConf);
+        // upload into sequence file
+        System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: "
+                + new Path(actionDir, ACTION_DATA_SEQUENCE_FILE).toUri());
+
+        SequenceFile.Writer wr = null;
+        try {
+            wr = SequenceFile.createWriter(launcherJobConf,
+                    SequenceFile.Writer.file(finalPath),
+                    SequenceFile.Writer.keyClass(Text.class),
+                    SequenceFile.Writer.valueClass(Text.class));
+            if (wr != null) {
+                Set<String> keys = actionData.keySet();
+                for (String propsKey : keys) {
+                    wr.append(new Text(propsKey), new Text(actionData.get(propsKey)));
+                }
+            }
+            else {
+                throw new IOException("SequenceFile.Writer is null for " + finalPath);
+            }
+        }
+        catch(IOException e) {
+            e.printStackTrace();
+            throw e;
+        }
+        finally {
+            if (wr != null) {
+                wr.close();
+            }
+        }
+    }
+    private static void failLauncher(int errorCode, String message, Throwable ex) {
+        ErrorHolder eHolder = new ErrorHolder();
+        eHolder.setErrorCode(errorCode);
+        eHolder.setErrorMessage(message);
+        eHolder.setErrorCause(ex);
+        failLauncher(eHolder);
+    }
+
+    private static void failLauncher(ErrorHolder eHolder) {
+        if (eHolder.getErrorCause() != null) {
+            eHolder.setErrorMessage(eHolder.getErrorMessage() + ", " + eHolder.getErrorCause().getMessage());
+        }
+        Properties errorProps = new Properties();
+        errorProps.setProperty("error.code", Integer.toString(eHolder.getErrorCode()));
+        errorProps.setProperty("error.reason", eHolder.getErrorMessage());
+        if (eHolder.getErrorCause() != null) {
+            if (eHolder.getErrorCause().getMessage() != null) {
+                errorProps.setProperty("exception.message", eHolder.getErrorCause().getMessage());
+            }
+            StringWriter sw = new StringWriter();
+            PrintWriter pw = new PrintWriter(sw);
+            eHolder.getErrorCause().printStackTrace(pw);
+            pw.close();
+            errorProps.setProperty("exception.stacktrace", sw.toString());
+        }
+        StringWriter sw = new StringWriter();
+        try {
+            errorProps.store(sw, "");
+            sw.close();
+            actionData.put(ACTION_DATA_ERROR_PROPS, sw.toString());
+
+            // external child IDs
+            String externalChildIdsProp = System.getProperty(ACTION_PREFIX + ACTION_DATA_EXTERNAL_CHILD_IDS);
+            if (externalChildIdsProp != null) {
+                File externalChildIDs = new File(externalChildIdsProp);
+                if (externalChildIDs.exists()) {
+                    actionData.put(ACTION_DATA_EXTERNAL_CHILD_IDS, getLocalFileContentStr(externalChildIDs, "", -1));
+                }
+            }
+        } catch (IOException ioe) {
+            System.err.println("A problem occured trying to fail the launcher");
+            ioe.printStackTrace();
+        } finally {
+            System.out.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
+            System.err.print("Failing Oozie Launcher, " + eHolder.getErrorMessage() + "\n");
+            if (eHolder.getErrorCause() != null) {
+                eHolder.getErrorCause().printStackTrace(System.out);
+                eHolder.getErrorCause().printStackTrace(System.err);
+            }
+        }
+    }
+
+    private static class AMRMCallBackHandler implements AMRMClientAsync.CallbackHandler {
+        @Override
+        public void onContainersCompleted(List<ContainerStatus> containerStatuses) {
+            //noop
+        }
+
+        @Override
+        public void onContainersAllocated(List<Container> containers) {
+            //noop
+        }
+
+        @Override
+        public void onShutdownRequest() {
+            failLauncher(0, "ResourceManager requested AM Shutdown", null);
+            // TODO: OYA: interrupt?
+        }
+
+        @Override
+        public void onNodesUpdated(List<NodeReport> nodeReports) {
+            //noop
+        }
+
+        @Override
+        public float getProgress() {
+            return 0.5f;    //TODO: OYA: maybe some action types can report better progress?
+        }
+
+        @Override
+        public void onError(final Throwable ex) {
+            failLauncher(0, ex.getMessage(), ex);
+            // TODO: OYA: interrupt?
+        }
+    }
+
+    public static String[] getMainArguments(Configuration conf) {
+        String[] args = new String[conf.getInt(CONF_OOZIE_ACTION_MAIN_ARG_COUNT, 0)];
+        for (int i = 0; i < args.length; i++) {
+            args[i] = conf.get(CONF_OOZIE_ACTION_MAIN_ARG_PREFIX + i);
+        }
+        return args;
+    }
+
+    private static class LauncherSecurityManager extends SecurityManager {
+        private boolean exitInvoked;
+        private int exitCode;
+        private SecurityManager securityManager;
+
+        public LauncherSecurityManager() {
+            exitInvoked = false;
+            exitCode = 0;
+            securityManager = System.getSecurityManager();
+        }
+
+        @Override
+        public void checkPermission(Permission perm, Object context) {
+            if (securityManager != null) {
+                // check everything with the original SecurityManager
+                securityManager.checkPermission(perm, context);
+            }
+        }
+
+        @Override
+        public void checkPermission(Permission perm) {
+            if (securityManager != null) {
+                // check everything with the original SecurityManager
+                securityManager.checkPermission(perm);
+            }
+        }
+
+        @Override
+        public void checkExit(int status) throws SecurityException {
+            exitInvoked = true;
+            exitCode = status;
+            throw new SecurityException("Intercepted System.exit(" + status + ")");
+        }
+
+        public boolean getExitInvoked() {
+            return exitInvoked;
+        }
+
+        public int getExitCode() {
+            return exitCode;
+        }
+
+        public void set() {
+            if (System.getSecurityManager() != this) {
+                System.setSecurityManager(this);
+            }
+        }
+
+        public void unset() {
+            if (System.getSecurityManager() == this) {
+                System.setSecurityManager(securityManager);
+            }
+        }
+    }
+
+
+    /**
+     * Print files and directories in current directory. Will list files in the sub-directory (only 1 level deep)
+     */
+    protected static void printContentsOfCurrentDir() {
+        File folder = new File(".");
+        System.out.println();
+        System.out.println("Files in current dir:" + folder.getAbsolutePath());
+        System.out.println("======================");
+
+        File[] listOfFiles = folder.listFiles();
+        for (File fileName : listOfFiles) {
+            if (fileName.isFile()) {
+                System.out.println("File: " + fileName.getName());
+            } else if (fileName.isDirectory()) {
+                System.out.println("Dir: " + fileName.getName());
+                File subDir = new File(fileName.getName());
+                File[] moreFiles = subDir.listFiles();
+                for (File subFileName : moreFiles) {
+                    if (subFileName.isFile()) {
+                        System.out.println("  File: " + subFileName.getName());
+                    } else if (subFileName.isDirectory()) {
+                        System.out.println("  Dir: " + subFileName.getName());
+                    }
+                }
+            }
+        }
+    }
+
+    protected static Configuration readLauncherConf() {
+        File confFile = new File(LAUNCHER_JOB_CONF_XML);
+        Configuration conf = new Configuration(false);
+        conf.addResource(new Path(confFile.getAbsolutePath()));
+        return conf;
+    }
+
+    protected static class ErrorHolder {
+        private int errorCode = 0;
+        private Throwable errorCause = null;
+        private String errorMessage = null;
+
+        public int getErrorCode() {
+            return errorCode;
+        }
+
+        public void setErrorCode(int errorCode) {
+            this.errorCode = errorCode;
+        }
+
+        public Throwable getErrorCause() {
+            return errorCause;
+        }
+
+        public void setErrorCause(Throwable errorCause) {
+            this.errorCause = errorCause;
+        }
+
+        public String getErrorMessage() {
+            return errorMessage;
+        }
+
+        public void setErrorMessage(String errorMessage) {
+            this.errorMessage = errorMessage;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..dbef441
--- /dev/null
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAMCallbackNotifier.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.Proxy;
+import java.net.URL;
+
+// Adapted from org.apache.hadoop.mapreduce.v2.app.JobEndNotifier
+/**
+ * This call sends back an HTTP GET callback to the configured URL.  It is meant for the {@link LauncherAM} to notify the
+ * Oozie Server that it has finished.
+ */
+public class LauncherAMCallbackNotifier {
+    private static final String OOZIE_LAUNCHER_CALLBACK = "oozie.launcher.callback.";
+    public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "retry.attempts";
+    public static final String OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL = OOZIE_LAUNCHER_CALLBACK + "retry.interval";
+    static final int OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX = 5000;
+    public static final String OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS = OOZIE_LAUNCHER_CALLBACK + "max.attempts";
+    public static final String OOZIE_LAUNCHER_CALLBACK_TIMEOUT = OOZIE_LAUNCHER_CALLBACK + "timeout";
+    public static final String OOZIE_LAUNCHER_CALLBACK_URL = OOZIE_LAUNCHER_CALLBACK + "url";
+    public static final String OOZIE_LAUNCHER_CALLBACK_PROXY = OOZIE_LAUNCHER_CALLBACK + "proxy";
+    public static final String OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN = "$jobStatus";
+
+    protected String userUrl;
+    protected String proxyConf;
+    protected int numTries; //Number of tries to attempt notification
+    protected int waitInterval; //Time (ms) to wait between retrying notification
+    protected int timeout; // Timeout (ms) on the connection and notification
+    protected URL urlToNotify; //URL to notify read from the config
+    protected Proxy proxyToUse = Proxy.NO_PROXY; //Proxy to use for notification
+
+    /**
+     * Parse the URL that needs to be notified of the end of the job, along
+     * with the number of retries in case of failure, the amount of time to
+     * wait between retries and proxy settings
+     * @param conf the configuration
+     */
+    public LauncherAMCallbackNotifier(Configuration conf) {
+        numTries = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, 0) + 1,
+                conf.getInt(OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, 1));
+
+        waitInterval = Math.min(conf.getInt(OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX),
+                OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+        waitInterval = (waitInterval < 0) ? OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX : waitInterval;
+
+        timeout = conf.getInt(OOZIE_LAUNCHER_CALLBACK_TIMEOUT, OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL_MAX);
+
+        userUrl = conf.get(OOZIE_LAUNCHER_CALLBACK_URL);
+
+        proxyConf = conf.get(OOZIE_LAUNCHER_CALLBACK_PROXY);
+
+        //Configure the proxy to use if its set. It should be set like
+        //proxyType@proxyHostname:port
+        if(proxyConf != null && !proxyConf.equals("") &&
+                proxyConf.lastIndexOf(":") != -1) {
+            int typeIndex = proxyConf.indexOf("@");
+            Proxy.Type proxyType = Proxy.Type.HTTP;
+            if(typeIndex != -1 &&
+                    proxyConf.substring(0, typeIndex).compareToIgnoreCase("socks") == 0) {
+                proxyType = Proxy.Type.SOCKS;
+            }
+            String hostname = proxyConf.substring(typeIndex + 1,
+                    proxyConf.lastIndexOf(":"));
+            String portConf = proxyConf.substring(proxyConf.lastIndexOf(":") + 1);
+            try {
+                int port = Integer.parseInt(portConf);
+                proxyToUse = new Proxy(proxyType,
+                        new InetSocketAddress(hostname, port));
+                System.out.println("Callback notification using proxy type \"" + proxyType +
+                        "\" hostname \"" + hostname + "\" and port \"" + port + "\"");
+            } catch(NumberFormatException nfe) {
+                System.err.println("Callback notification couldn't parse configured proxy's port "
+                        + portConf + ". Not going to use a proxy");
+            }
+        }
+
+    }
+
+    /**
+     * Notify the URL just once. Use best effort.
+     */
+    protected boolean notifyURLOnce() {
+        boolean success = false;
+        HttpURLConnection conn = null;
+        try {
+            System.out.println("Callback notification trying " + urlToNotify);
+            conn = (HttpURLConnection) urlToNotify.openConnection(proxyToUse);
+            conn.setConnectTimeout(timeout);
+            conn.setReadTimeout(timeout);
+            conn.setAllowUserInteraction(false);
+            if(conn.getResponseCode() != HttpURLConnection.HTTP_OK) {
+                System.err.println("Callback notification to " + urlToNotify +" failed with code: "
+                        + conn.getResponseCode() + " and message \"" + conn.getResponseMessage()
+                        +"\"");
+            }
+            else {
+                success = true;
+                System.out.println("Callback notification to " + urlToNotify + " succeeded");
+            }
+        } catch(IOException ioe) {
+            System.err.println("Callback notification to " + urlToNotify + " failed");
+            ioe.printStackTrace();
+        } finally {
+            if (conn != null) {
+                conn.disconnect();
+            }
+        }
+        return success;
+    }
+
+    /**
+     * Notify a server of the completion of a submitted job.
+     * @param finalStatus The Application Status
+     *
+     * @throws InterruptedException
+     */
+    public void notifyURL(FinalApplicationStatus finalStatus) throws InterruptedException {
+        // Do we need job-end notification?
+        if (userUrl == null) {
+            System.out.println("Callback notification URL not set, skipping.");
+            return;
+        }
+
+        //Do string replacements for final status
+        if (userUrl.contains(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN)) {
+            userUrl = userUrl.replace(OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN, finalStatus.toString());
+        }
+
+        // Create the URL, ensure sanity
+        try {
+            urlToNotify = new URL(userUrl);
+        } catch (MalformedURLException mue) {
+            System.err.println("Callback notification couldn't parse " + userUrl);
+            mue.printStackTrace();
+            return;
+        }
+
+        // Send notification
+        boolean success = false;
+        while (numTries-- > 0 && !success) {
+            System.out.println("Callback notification attempts left " + numTries);
+            success = notifyURLOnce();
+            if (!success) {
+                Thread.sleep(waitInterval);
+            }
+        }
+        if (!success) {
+            System.err.println("Callback notification failed to notify : " + urlToNotify);
+        } else {
+            System.out.println("Callback notification succeeded");
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
index f2cba13..eeffe81 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherMapper.java
@@ -49,7 +49,11 @@ import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
+import org.xml.sax.SAXException;
 
+import javax.xml.parsers.ParserConfigurationException;
+
+// TODO: OYA: Delete :)
 public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, Runnable {
 
     static final String CONF_OOZIE_ACTION_MAIN_CLASS = "oozie.launcher.action.main.class";
@@ -480,7 +484,7 @@ public class LauncherMapper<K1, V1, K2, V2> implements Mapper<K1, V1, K2, V2>, R
     }
 
     // Method to execute the prepare actions
-    private void executePrepare() throws IOException, LauncherException {
+    private void executePrepare() throws IOException, LauncherException, ParserConfigurationException, SAXException {
         String prepareXML = getJobConf().get(ACTION_PREPARE_XML);
         if (prepareXML != null) {
              if (!prepareXML.equals("")) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
index 21ae456..4a51d48 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/PrepareActionsDriver.java
@@ -46,35 +46,26 @@ public class PrepareActionsDriver {
      * @param prepareXML Prepare XML block in string format
      * @throws LauncherException
      */
-    static void doOperations(String prepareXML, Configuration conf) throws LauncherException {
-        try {
-            Document doc = getDocumentFromXML(prepareXML);
-            doc.getDocumentElement().normalize();
+    static void doOperations(String prepareXML, Configuration conf)
+            throws IOException, SAXException, ParserConfigurationException, LauncherException {
+        Document doc = getDocumentFromXML(prepareXML);
+        doc.getDocumentElement().normalize();
 
-            // Get the list of child nodes, basically, each one corresponding to a separate action
-            NodeList nl = doc.getDocumentElement().getChildNodes();
-            LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
+        // Get the list of child nodes, basically, each one corresponding to a separate action
+        NodeList nl = doc.getDocumentElement().getChildNodes();
+        LauncherURIHandlerFactory factory = new LauncherURIHandlerFactory(conf);
 
-            for (int i = 0; i < nl.getLength(); ++i) {
-                Node n = nl.item(i);
-                String operation = n.getNodeName();
-                if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
-                    continue;
-                }
-                String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
-                // use Path to avoid URIsyntax error caused by square bracket in glob
-                URI uri = new Path(pathStr).toUri();
-                LauncherURIHandler handler = factory.getURIHandler(uri);
-                execute(operation, uri, handler, conf);
+        for (int i = 0; i < nl.getLength(); ++i) {
+            Node n = nl.item(i);
+            String operation = n.getNodeName();
+            if (n.getAttributes() == null || n.getAttributes().getNamedItem("path") == null) {
+                continue;
             }
-        } catch (IOException ioe) {
-            throw new LauncherException(ioe.getMessage(), ioe);
-        } catch (SAXException saxe) {
-            throw new LauncherException(saxe.getMessage(), saxe);
-        } catch (ParserConfigurationException pce) {
-            throw new LauncherException(pce.getMessage(), pce);
-        } catch (IllegalArgumentException use) {
-            throw new LauncherException(use.getMessage(), use);
+            String pathStr = n.getAttributes().getNamedItem("path").getNodeValue().trim();
+            // use Path to avoid URIsyntax error caused by square bracket in glob
+            URI uri = new Path(pathStr).toUri();
+            LauncherURIHandler handler = factory.getURIHandler(uri);
+            execute(operation, uri, handler, conf);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/pig/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/pig/pom.xml b/sharelib/pig/pom.xml
index 562c530..ea674a1 100644
--- a/sharelib/pig/pom.xml
+++ b/sharelib/pig/pom.xml
@@ -136,18 +136,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index 46c6375..6d52ab4 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -370,18 +370,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/sqoop/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/sqoop/pom.xml b/sharelib/sqoop/pom.xml
index d875c93..aad13f9 100644
--- a/sharelib/sqoop/pom.xml
+++ b/sharelib/sqoop/pom.xml
@@ -239,18 +239,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/streaming/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/streaming/pom.xml b/sharelib/streaming/pom.xml
index fd79518..783c669 100644
--- a/sharelib/streaming/pom.xml
+++ b/sharelib/streaming/pom.xml
@@ -107,39 +107,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-antrun-plugin</artifactId>
-                <version>1.6</version>
-                <executions>
-                    <execution>
-                        <configuration>
-                            <target>
-                                <!-- needed to include Main class in classpath for mini yarn cluster for unit tests -->
-                                <echo file="${project.build.directory}/test-classes/mrapp-generated-classpath"
-                                      append="true"
-                                      message=":${project.build.directory}/classes"/>
-                            </target>
-                        </configuration>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                        <phase>generate-test-resources</phase>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>


[2/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)

Posted by rk...@apache.org.
http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index 879bfeb..794ad81 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -21,6 +21,7 @@ package org.apache.oozie.action.hadoop;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
@@ -35,18 +36,22 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
@@ -55,6 +60,7 @@ import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
 import org.apache.oozie.service.ConfigurationService;
+import org.apache.oozie.service.HadoopAccessorException;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
 import org.apache.oozie.service.Services;
@@ -62,7 +68,6 @@ import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.service.WorkflowStoreService;
-import org.apache.oozie.util.FSUtils;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
 import org.apache.oozie.util.XmlUtils;
@@ -336,7 +341,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         return new Context(wf, action);
     }
 
-    protected RunningJob submitAction(Context context, JavaActionExecutor javaActionExecutor) throws Exception {
+    // TODO: OYA: void
+    protected RunningJob submitAction(Context context, JavaActionExecutor javaActionExecutor) throws ActionExecutorException {
 
         WorkflowAction action = context.getAction();
         javaActionExecutor.prepareActionDir(getFileSystem(), context);
@@ -348,21 +354,37 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         assertNotNull(jobId);
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
-
-        JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        jobConf.set("yarn.resourcemanager.address", jobTracker);
-
-        JobClient jobClient =
-            Services.get().get(HadoopAccessorService.class).createJobClient(getTestUser(), jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
+        return null;
     }
 
-    protected RunningJob submitAction(Context context) throws Exception {
+    // TODO: OYA: void
+    protected RunningJob submitAction(Context context) throws ActionExecutorException {
         return submitAction(context, new JavaActionExecutor());
     }
 
+    private void waitUntilYarnAppState(String externalId, final YarnApplicationState state)
+            throws HadoopAccessorException, IOException, YarnException {
+        final ApplicationId appId = ConverterUtils.toApplicationId(externalId);
+
+        JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(getJobTrackerUri());
+        // This is needed here because we need a mutable final YarnClient
+        final MutableObject<YarnClient> yarnClientMO = new MutableObject<YarnClient>(null);
+        try {
+            yarnClientMO.setValue(Services.get().get(HadoopAccessorService.class).createYarnClient(getTestUser(), jobConf));
+            waitFor(60 * 1000, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    return yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState().equals(state);
+                }
+            });
+        } finally {
+            if (yarnClientMO.getValue() != null) {
+                yarnClientMO.getValue().close();
+            }
+        }
+        assertTrue(yarnClientMO.getValue().getApplicationReport(appId).getYarnApplicationState().equals(state));
+    }
+
     public void testSimpestSleSubmitOK() throws Exception {
         String actionXml = "<java>" +
                 "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
@@ -370,14 +392,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
                 "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
                 "</java>";
         Context context = createContext(actionXml, null);
-        final RunningJob runningJob = submitAction(context);
-        waitFor(60 * 1000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return runningJob.isComplete();
-            }
-        });
-        assertTrue(runningJob.isSuccessful());
+        submitAction(context);
+        waitUntilYarnAppState(context.getAction().getExternalId(), YarnApplicationState.FINISHED);
         ActionExecutor ae = new JavaActionExecutor();
         ae.check(context, context.getAction());
         assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
@@ -1817,118 +1833,12 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         JavaActionExecutor jae = new JavaActionExecutor("java");
         jae.setupLauncherConf(conf, xml, appPath, createContext("<java/>", null));
         assertEquals(5, conf.size());
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
         assertEquals("v1", conf.get("oozie.launcher.p1"));
         assertEquals("v1", conf.get("p1"));
         assertEquals("v2b", conf.get("oozie.launcher.p2"));
         assertEquals("v2b", conf.get("p2"));
     }
 
-    public void testInjectLauncherUseUberMode() throws Exception {
-        // default -- should set to true
-        JavaActionExecutor jae = new JavaActionExecutor();
-        Configuration conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
-        // action conf set to true -- should keep at true
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", true);
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
-        // action conf set to false -- should keep at false
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", false);
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
-
-        // disable at oozie-site level for just the "test" action
-        ConfigurationService.setBoolean("oozie.action.test.launcher.mapreduce.job.ubertask.enable", false);
-        JavaActionExecutor tjae = new JavaActionExecutor("test");
-
-        // default -- should not set
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        tjae.injectLauncherUseUberMode(conf);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        // default -- should be true
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
-        // action conf set to true -- should keep at true
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", true);
-        tjae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-        // action conf set to true -- should keep at true
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", true);
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
-        // action conf set to false -- should keep at false
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", false);
-        tjae.injectLauncherUseUberMode(conf);
-        assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
-        // action conf set to false -- should keep at false
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", false);
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
-
-        // disable at oozie-site level for all actions except for the "test" action
-        ConfigurationService.setBoolean("oozie.action.test.launcher.mapreduce.job.ubertask.enable", true);
-        ConfigurationService.setBoolean("oozie.action.launcher.mapreduce.job.ubertask.enable", false);
-
-        // default -- should be true
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        tjae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-        // default -- should not set
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        jae.injectLauncherUseUberMode(conf);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-
-        // action conf set to true -- should keep at true
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", true);
-        tjae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-        // action conf set to true -- should keep at true
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", true);
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-
-        // action conf set to false -- should keep at false
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", false);
-        tjae.injectLauncherUseUberMode(conf);
-        assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
-        // action conf set to false -- should keep at false
-        conf = new Configuration(false);
-        assertNull(conf.get("mapreduce.job.ubertask.enable"));
-        conf.setBoolean("mapreduce.job.ubertask.enable", false);
-        jae.injectLauncherUseUberMode(conf);
-        assertEquals("false", conf.get("mapreduce.job.ubertask.enable"));
-    }
-
     public void testUpdateConfForJavaTmpDir() throws Exception {
 
         //Test UpdateCOnfForJavaTmpDir for launcherConf
@@ -1993,355 +1903,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         assertEquals("-Xmx2560m -XX:NewRatio=8", jobConf.get(JavaActionExecutor.HADOOP_REDUCE_JAVA_OPTS));
         assertEquals("-Xmx1024m -Djava.io.tmpdir=./tmp", jobConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
     }
-    public void testUpdateConfForUberMode() throws Exception {
-        Element actionXml1 = XmlUtils
-                .parseXml("<java>"
-                        + "<job-tracker>"
-                        + getJobTrackerUri()
-                        + "</job-tracker>"
-                        + "<name-node>"
-                        + getNameNodeUri()
-                        + "</name-node>"
-                        + "<configuration>"
-                        + "<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
-                        + "<property><name>oozie.launcher.mapred.child.java.opts</name>"
-                        + "<value>-Xmx2048m -Djava.net.preferIPv4Stack=true</value></property>"
-                        + "<property><name>oozie.launcher.mapred.child.env</name><value>A=foo</value></property>"
-                        + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-        JavaActionExecutor ae = new JavaActionExecutor();
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        Context context = new Context(wf, action);
-        JobConf launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-        // memoryMB (2048 + 512)
-        assertEquals("2560", launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
-        // heap size in child.opts (2048 + 512)
-        int heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
-        assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true",
-                launcherConf.get("mapred.child.java.opts"));
-        assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true",
-                launcherConf.get("mapreduce.map.java.opts"));
-
-        assertEquals("-Xmx1024m -Xmx2048m -Djava.net.preferIPv4Stack=true -Xmx2560m -Djava.io.tmpdir=./tmp",
-                launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
-
-        assertEquals(2560, heapSize);
-
-        // env
-        assertEquals("A=foo", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
-
-        Element actionXml2 = XmlUtils
-                .parseXml("<java>"
-                        + "<job-tracker>"
-                        + getJobTrackerUri()
-                        + "</job-tracker>"
-                        + "<name-node>"
-                        + getNameNodeUri()
-                        + "</name-node>"
-                        + "<configuration>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>2048</value></property>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
-                        + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true </value></property>"
-                        + "<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
-                        + "<value>-Xmx2560m -XX:NewRatio=8</value></property>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
-                        + "<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
-                        + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
-        // memoryMB (3072 + 512)
-        assertEquals("3584", launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
-
-        // heap size (2560 + 512)
-        heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
-        assertEquals("-Xmx1536m -Xmx2560m -XX:NewRatio=8", launcherConf.get("mapred.child.java.opts"));
-        assertEquals("-Xmx1536m -Xmx2560m -XX:NewRatio=8", launcherConf.get("mapreduce.map.java.opts"));
-        assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx1536m -Xmx2560m -XX:NewRatio=8 " +
-                        "-Xmx3072m -Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
-        assertEquals(3072, heapSize);
-
-        // env (equqls to mapreduce.map.env + am.env)
-        assertTrue(launcherConf.get(JavaActionExecutor.YARN_AM_ENV).trim().equals("A=foo,B=bar"));
-
-        // Test limit is applied in case of 32 bit
-        Element actionXml3 = XmlUtils
-                .parseXml("<java>"
-                        + "<job-tracker>"
-                        + getJobTrackerUri()
-                        + "</job-tracker>"
-                        + "<name-node>"
-                        + getNameNodeUri()
-                        + "</name-node>"
-                        + "<configuration>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.resource.mb</name><value>3072</value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.memory.mb</name><value>4000</value></property>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
-                        + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true</value></property>"
-                        + "<property><name>oozie.launcher.mapred.child.java.opts</name><value>-Xmx1536m</value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.java.opts</name>"
-                        + "<value>-Xmx4000m -XX:NewRatio=8</value></property>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name><value>A=foo</value></property>"
-                        + "<property><name>oozie.launcher.mapred.child.env</name><value>B=bar</value></property>"
-                        + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
-        launcherConf = ae.createBaseHadoopConf(context, actionXml3);
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml3, launcherConf);
-
-        // memoryMB (limit to 4096)
-        assertEquals("4096", launcherConf.get(JavaActionExecutor.YARN_AM_RESOURCE_MB));
-
-        // heap size (limit to 3584)
-        heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
-        assertEquals("-Xmx1536m -Xmx4000m -XX:NewRatio=8", launcherConf.get("mapred.child.java.opts"));
-        assertEquals("-Xmx1536m -Xmx4000m -XX:NewRatio=8", launcherConf.get("mapreduce.map.java.opts"));
-        assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx1536m -Xmx4000m -XX:NewRatio=8 " +
-                        "-Xmx3584m -Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
-        assertEquals(3584, heapSize);
-
-        // env (equqls to mapreduce.map.env + am.env)
-        assertEquals("A=foo,B=bar", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
-    }
-
-    public void testUpdateConfForUberModeWithEnvDup() throws Exception {
-        Element actionXml1 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
-                + "<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>"
-                + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
-                + "<value>JAVA_HOME=/home/blah/java/jdk64/current,A=foo,B=bar</value></property>"
-                + "<property><name>oozie.launcher.mapreduce.map.env</name>"
-                + "<value>JAVA_HOME=/home/blah/java/jdk64/latest,C=blah</value></property>" + "</configuration>"
-                + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-        JavaActionExecutor ae = new JavaActionExecutor();
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        Context context = new Context(wf, action);
-        JobConf launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-
-        // uber mode should be disabled since JAVA_HOME points to different paths in am.evn and map.env
-        assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
-        // testing complicated env setting case
-        Element actionXml2 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
-                + "<name-node>" + getNameNodeUri() + "</name-node>" + "<configuration>" + "<property>"
-                + "<name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
-                + "<value>LD_LIBRARY_PATH=$HADOOP_HOME_1/lib/native/`$JAVA_HOME/bin/java -d32 -version;"
-                + "if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`</value></property>"
-                + "<property>" + "<name>oozie.launcher.mapreduce.map.env</name>"
-                + "<value>LD_LIBRARY_PATH=$HADOOP_HOME_2/lib/native/`$JAVA_HOME/bin/java -d32 -version;"
-                + "if [ $? -eq 0 ]; then echo Linux-i386-32; else echo Linux-amd64-64;fi`</value></property>"
-                + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
-        // uber mode should be disabled since LD_LIBRARY_PATH is different in am.evn and map.env
-        assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
-        Element actionXml3 = XmlUtils
-                .parseXml("<java>"
-                        + "<job-tracker>"
-                        + getJobTrackerUri()
-                        + "</job-tracker>"
-                        + "<name-node>"
-                        + getNameNodeUri()
-                        + "</name-node>"
-                        + "<configuration>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.env</name>"
-                        + "<value>JAVA_HOME=/home/blah/java/jdk64/current,PATH=A,PATH=B</value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.env</name>"
-                        + "<value>JAVA_HOME=/home/blah/java/jdk64/current,PATH=A</value></property>"
-                        + "</configuration>" + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-
-        launcherConf = ae.createBaseHadoopConf(context, actionXml3);
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml3, launcherConf);
-
-        // uber mode should be enabled since JAVA_HOME is the same, and PATH doesn't conflict
-        assertEquals("true", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
-        // JAVA_HOME, PATH=A duplication is removed
-        String a = launcherConf.get(JavaActionExecutor.YARN_AM_ENV);
-        assertEquals("JAVA_HOME=/home/blah/java/jdk64/current,PATH=A,PATH=B",
-                launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
-    }
-
-    public void testUpdateConfForUberModeForJavaOpts() throws Exception {
-        Element actionXml1 = XmlUtils
-                .parseXml("<java>"
-                        + "<job-tracker>"
-                        + getJobTrackerUri()
-                        + "</job-tracker>"
-                        + "<name-node>"
-                        + getNameNodeUri()
-                        + "</name-node>"
-                        + "<configuration>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
-                        + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true </value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.java.opts</name><value>-Xmx1536m</value></property>"
-                        + "</configuration>" + "<main-class>MAIN-CLASS</main-class>"
-                        + "<java-opt>-Xmx2048m</java-opt>"
-                        + "<java-opt>-Dkey1=val1</java-opt>"
-                        + "<java-opt>-Dkey2=val2</java-opt>"
-                        + "</java>");
-        JavaActionExecutor ae = new JavaActionExecutor();
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        Context context = new Context(wf, action);
-        JobConf launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-
-        // heap size (2048 + 512)
-        int heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
-        assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Dkey2=val2",
-                launcherConf.get("mapred.child.java.opts"));
-        assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Dkey2=val2",
-                launcherConf.get("mapreduce.map.java.opts"));
-        assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Dkey2=val2 -Xmx2560m " +
-                        "-Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
-        assertEquals(2560, heapSize);
-
-        Element actionXml2 = XmlUtils
-                .parseXml("<java>"
-                        + "<job-tracker>"
-                        + getJobTrackerUri()
-                        + "</job-tracker>"
-                        + "<name-node>"
-                        + getNameNodeUri()
-                        + "</name-node>"
-                        + "<configuration>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
-                        + "<value>-Xmx1024m -Djava.net.preferIPv4Stack=true </value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.java.opts</name><value>-Xmx1536m</value></property>"
-                        + "</configuration>" + "<main-class>MAIN-CLASS</main-class>"
-                        + "<java-opts>-Xmx2048m -Dkey1=val1</java-opts>"
-                        + "</java>");
-
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
-        // heap size (2048 + 512)
-        heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
-        assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1",
-                launcherConf.get("mapred.child.java.opts"));
-        assertEquals("-Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1",
-                launcherConf.get("mapreduce.map.java.opts"));
-        assertEquals("-Xmx1024m -Djava.net.preferIPv4Stack=true -Xmx200m -Xmx1536m -Xmx2048m -Dkey1=val1 -Xmx2560m " +
-                        "-Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
-        assertEquals(2560, heapSize);
-
-        Element actionXml3 = XmlUtils
-                .parseXml("<java>"
-                        + "<job-tracker>"
-                        + getJobTrackerUri()
-                        + "</job-tracker>"
-                        + "<name-node>"
-                        + getNameNodeUri()
-                        + "</name-node>"
-                        + "<configuration>"
-                        + "<property><name>oozie.launcher.yarn.app.mapreduce.am.command-opts</name>"
-                        + "<value>-Xmx2048m -Djava.net.preferIPv4Stack=true </value></property>"
-                        + "<property><name>oozie.launcher.mapreduce.map.java.opts</name><value>-Xmx3072m</value></property>"
-                        + "</configuration>" + "<main-class>MAIN-CLASS</main-class>"
-                        + "<java-opts>-Xmx1024m -Dkey1=val1</java-opts>"
-                        + "</java>");
-
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml3, launcherConf);
-
-        // heap size (2048 + 512)
-        heapSize = ae.extractHeapSizeMB(launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS));
-        assertEquals("-Xmx200m -Xmx3072m -Xmx1024m -Dkey1=val1",
-                launcherConf.get("mapred.child.java.opts"));
-        assertEquals("-Xmx200m -Xmx3072m -Xmx1024m -Dkey1=val1",
-                launcherConf.get("mapreduce.map.java.opts"));
-        assertEquals("-Xmx2048m -Djava.net.preferIPv4Stack=true -Xmx200m -Xmx3072m -Xmx1024m -Dkey1=val1 -Xmx2560m " +
-                        "-Djava.io.tmpdir=./tmp", launcherConf.get(JavaActionExecutor.YARN_AM_COMMAND_OPTS).trim());
-        assertEquals(2560, heapSize);
-    }
-
-    public void testDisableUberForProperties() throws Exception {
-        Element actionXml1 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
-                + "<name-node>" + getNameNodeUri() + "</name-node>"
-                + "<configuration>"
-                + "<property><name>oozie.launcher.mapreduce.job.classloader</name>"
-                + "<value>true</value></property>"
-                + "</configuration>"
-                + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-        JavaActionExecutor ae = new JavaActionExecutor();
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        Context context = new Context(wf, action);
-        JobConf launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-
-        // uber mode should be disabled since oozie.launcher.mapreduce.job.classloader=true
-        assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-
-        Element actionXml2 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
-                + "<name-node>" + getNameNodeUri() + "</name-node>"
-                + "<configuration>"
-                + "<property><name>oozie.launcher.mapreduce.user.classpath.first</name>"
-                + "<value>true</value></property>"
-                + "</configuration>"
-                + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-        ae = new JavaActionExecutor();
-        protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        wf = createBaseWorkflow(protoConf, "action");
-        action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        context = new Context(wf, action);
-        launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml2, launcherConf);
-
-        // uber mode should be disabled since oozie.launcher.mapreduce.user.classpath.first=true
-        assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-    }
-
-    public void testDisableUberForUserProperties() throws Exception {
-        Element actionXml1 = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
-                + "<name-node>" + getNameNodeUri() + "</name-node>"
-                + "<configuration>"
-                + "<property><name>oozie.launcher.mapreduce.job.ubertask.enable</name>"
-                + "<value>false</value></property>"
-                + "</configuration>"
-                + "<main-class>MAIN-CLASS</main-class>" + "</java>");
-        JavaActionExecutor ae = new JavaActionExecutor();
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        Context context = new Context(wf, action);
-        JobConf launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml1, launcherConf);
-        // uber mode should be disabled since oozie.launcher.mapreduce.job.classloader=true
-        assertEquals("false", launcherConf.get(JavaActionExecutor.HADOOP_YARN_UBER_MODE));
-    }
 
     public void testUpdateConfForTimeLineServiceEnabled() throws Exception {
         Element actionXml = XmlUtils
@@ -2766,12 +2327,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         assertEquals("AA", conf.get("a"));
         assertEquals("action.barbar", conf.get("oozie.launcher.action.foofoo"));
         assertEquals("action.barbar", conf.get("action.foofoo"));
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-        if (conf.size() == 7) {
-            assertEquals(getJobTrackerUri(), conf.get("yarn.resourcemanager.address"));
-        } else {
-            assertEquals(6, conf.size());
-        }
+        assertEquals(getJobTrackerUri(), conf.get("yarn.resourcemanager.address"));
+        assertEquals(6, conf.size());
 
         conf = new Configuration(false);
         Assert.assertEquals(0, conf.size());
@@ -2780,12 +2337,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
         assertEquals(getJobTrackerUri(), conf.get("yarn.resourcemanager.address"));
         assertEquals("action.barbar", conf.get("oozie.launcher.action.foofoo"));
         assertEquals("action.barbar", conf.get("action.foofoo"));
-        assertEquals("true", conf.get("mapreduce.job.ubertask.enable"));
-        if (conf.size() == 5) {
-            assertEquals(getJobTrackerUri(), conf.get("mapreduce.jobtracker.address"));
-        } else {
-            assertEquals(4, conf.size());
-        }
+        assertEquals(getJobTrackerUri(), conf.get("mapreduce.jobtracker.address"));
+        assertEquals(4, conf.size());
     }
 
     public void testSetRootLoggerLevel() throws Exception {

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
new file mode 100644
index 0000000..ed29299
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAM.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RunningJob;
+import org.apache.oozie.service.HadoopAccessorService;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.test.XFsTestCase;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Writer;
+import java.net.URI;
+import java.util.Map;
+
+public class TestLauncherAM extends XFsTestCase {
+
+
+    // TODO: OYA: write tests later
+
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..d0b4d5b
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.oozie.QueryServlet;
+import org.apache.oozie.command.wf.HangServlet;
+import org.apache.oozie.test.EmbeddedServletContainer;
+import org.apache.oozie.test.XTestCase;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+import java.net.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+// A lot of this adapted from org.apache.hadoop.mapreduce.v2.app.TestJobEndNotifier and org.apache.hadoop.mapred.TestJobEndNotifier
+public class TestLauncherAMCallbackNotifier extends XTestCase {
+
+    public void testConfiguration() throws Exception {
+        Configuration conf = new Configuration(false);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "0");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "10");
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(0, cn.numTries);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(1, cn.numTries);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "20");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(11, cn.numTries);  //11 because number of _retries_ is 10
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(1000, cn.waitInterval);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "10000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(5000, cn.waitInterval);
+        //Test negative numbers are set to default
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "-10");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(5000, cn.waitInterval);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_TIMEOUT, "1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(1000, cn.timeout);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:someport");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "socks@somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "SOCKS@somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "sfafn@somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+    }
+
+    public void testNotifyRetries() throws InterruptedException {
+        Configuration conf = new Configuration(false);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, "http://nonexistent");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+        LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+        long start = System.currentTimeMillis();
+        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+        long end = System.currentTimeMillis();
+        Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+        Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "3");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "3");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "3000");
+
+        cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+        start = System.currentTimeMillis();
+        cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+        end = System.currentTimeMillis();
+        Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce();
+        Assert.assertTrue("Should have taken more than 9 seconds but it only took " + (end - start), end - start >= 9000);
+    }
+
+    public void testNotifyTimeout() throws Exception {
+        EmbeddedServletContainer container = null;
+        try {
+            container = new EmbeddedServletContainer("blah");
+            Map<String, String> params = new HashMap<String, String>();
+            params.put(HangServlet.SLEEP_TIME_MS, "1000000");
+            container.addServletEndpoint("/hang/*", HangServlet.class, params);
+            container.start();
+
+            Configuration conf = new Configuration(false);
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL("/hang/*"));
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+            LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+            long start = System.currentTimeMillis();
+            cnSpy.notifyURL(FinalApplicationStatus.SUCCEEDED);
+            long end = System.currentTimeMillis();
+            Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+            Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+        } finally {
+            if (container != null) {
+                container.stop();
+            }
+        }
+    }
+
+    public void testNotify() throws Exception {
+        EmbeddedServletContainer container = null;
+        try {
+            container = new EmbeddedServletContainer("blah");
+            container.addServletEndpoint("/count/*", QueryServlet.class);
+            container.start();
+
+            Configuration conf = new Configuration(false);
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL("/count/?status=$jobStatus"));
+            conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+            LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+            QueryServlet.lastQueryString = null;
+            assertNull(QueryServlet.lastQueryString);
+            cn.notifyURL(FinalApplicationStatus.SUCCEEDED);
+            waitFor(5000, new Predicate() {
+                @Override
+                public boolean evaluate() throws Exception {
+                    return "status=SUCCEEDED".equals(QueryServlet.lastQueryString);
+                }
+            });
+            assertEquals("status=SUCCEEDED", QueryServlet.lastQueryString);
+        } finally {
+            if (container != null) {
+                container.stop();
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
index df9e939..e940d39 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
@@ -24,6 +24,9 @@ import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XFsTestCase;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
 
 public class TestPrepareActionsDriver extends XFsTestCase {
 
@@ -40,7 +43,7 @@ public class TestPrepareActionsDriver extends XFsTestCase {
     }
 
     // Test to check if prepare action is performed as expected when the prepare XML block is a valid one
-    public void testDoOperationsWithValidXML() throws LauncherException, IOException {
+    public void testDoOperationsWithValidXML() throws LauncherException, IOException, ParserConfigurationException, SAXException {
         Path actionDir = getFsTestCaseDir();
         FileSystem fs = getFileSystem();
         Path newDir = new Path(actionDir, "newDir");
@@ -57,7 +60,7 @@ public class TestPrepareActionsDriver extends XFsTestCase {
         assertTrue(fs.exists(actionDir));
     }
 
-    // Test to check if LauncherException is thrown when the prepare XML block is invalid
+    // Test to check if Exception is thrown when the prepare XML block is invalid
     public void testDoOperationsWithInvalidXML() throws LauncherException, IOException {
         Path actionDir = getFsTestCaseDir();
         FileSystem fs = getFileSystem();
@@ -75,11 +78,9 @@ public class TestPrepareActionsDriver extends XFsTestCase {
             LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
             PrepareActionsDriver.doOperations(prepareXML, conf);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (LauncherException le) {
-            assertEquals(le.getCause().getClass(), org.xml.sax.SAXParseException.class);
-            assertEquals(le.getMessage(), "Content is not allowed in prolog.");
-        } catch(Exception ex){
-            fail("Expected a LauncherException but received an Exception");
+        } catch (Exception ex) {
+            assertEquals(ex.getClass(), org.xml.sax.SAXParseException.class);
+            assertEquals(ex.getMessage(), "Content is not allowed in prolog.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
index 6a962a1..9468fad 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
@@ -367,25 +367,4 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
         assertNotNull(runningJob);
         return runningJob;
     }
-
-    public void testShellMainPathInUber() throws Exception {
-        Services.get().getConf().setBoolean("oozie.action.shell.launcher.mapreduce.job.ubertask.enable", true);
-
-        Element actionXml = XmlUtils.parseXml("<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
-                + "<name-node>" + getNameNodeUri() + "</name-node>" + "<exec>script.sh</exec>"
-                + "<argument>a=A</argument>" + "<argument>b=B</argument>" + "</shell>");
-        ShellActionExecutor ae = new ShellActionExecutor();
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        Context context = new Context(wf, action);
-        JobConf launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, launcherConf);
-        // env
-        assertEquals("PATH=.:$PATH", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
-    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
index 3344cf9..d90aeb6 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
@@ -18,6 +18,8 @@
 
 package org.apache.oozie.command.wf;
 
+import org.apache.oozie.util.XLog;
+
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -25,14 +27,27 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 
 /**
- * Servlet that 'hangs' for 200 ms. Used by TestNotificationXCommand
+ * Servlet that 'hangs' for some amount of time (200ms) by default.
+ * The time can be configured by setting {@link HangServlet#SLEEP_TIME_MS} as an init parameter for the servlet.
  */
 public class HangServlet extends HttpServlet {
 
+    public static final String SLEEP_TIME_MS = "sleep_time_ms";
+
     protected void doGet(HttpServletRequest request, HttpServletResponse response)
         throws ServletException, IOException {
         try {
-            Thread.sleep(200);
+            long time = 200;
+            String sleeptime = getInitParameter(SLEEP_TIME_MS);
+            if (sleeptime != null) {
+                try {
+                    time = Long.parseLong(sleeptime);
+                } catch (NumberFormatException nfe) {
+                    XLog.getLog(HangServlet.class).error("Invalid sleep time, using default (200)", nfe);
+                }
+            }
+            XLog.getLog(HangServlet.class).info("Sleeping for " + time + " ms");
+            Thread.sleep(time);
         }
         catch (Exception ex) {
             //NOP

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
index 2153bf1..9b48df5 100644
--- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
@@ -209,9 +209,6 @@ public class TestConfigurationService extends XTestCase {
 
         assertEquals(2048, ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA));
         assertEquals("http://localhost:8080/oozie?job=", ConfigurationService.get(JobXCommand.CONF_CONSOLE_URL));
-        assertEquals(true, ConfigurationService.getBoolean(JavaActionExecutor.CONF_HADOOP_YARN_UBER_MODE));
-        assertEquals(false, ConfigurationService.getBoolean(
-                "oozie.action.shell.launcher." + JavaActionExecutor.HADOOP_YARN_UBER_MODE));
         assertEquals(false, ConfigurationService.getBoolean(HadoopAccessorService.KERBEROS_AUTH_ENABLED));
 
         assertEquals(0, ConfigurationService.getStrings("no.defined").length);

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
index 96faa48..2798719 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
@@ -18,7 +18,17 @@
 
 package org.apache.oozie.service;
 
-import org.apache.oozie.test.XTestCase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.*;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.test.XFsTestCase;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +44,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.util.XConfiguration;
 
-public class TestHadoopAccessorService extends XTestCase {
+public class TestHadoopAccessorService extends XFsTestCase {
 
     protected void setUp() throws Exception {
         super.setUp();
@@ -136,45 +146,89 @@ public class TestHadoopAccessorService extends XTestCase {
          */
         assertEquals("100", conf.get("action.testprop"));
         assertEquals("1", conf.get("default.testprop"));
+    }
+
+    public void testCreateJobClient() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        JobConf conf = has.createJobConf(getJobTrackerUri());
+
+        JobClient jc = has.createJobClient(getTestUser(), conf);
+        assertNotNull(jc);
+        jc.getAllJobs();
+
+        try {
+            has.createJobClient("invalid-user", conf);
+            fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
+        }
+        catch (HadoopAccessorException ex) {
+            assertEquals(ErrorCode.E0902, ex.getErrorCode());
+        }
 
+        JobConf conf2 = new JobConf(false);
+        conf2.set("mapred.job.tracker", getJobTrackerUri());
+        try {
+            has.createJobClient(getTestUser(), conf2);
+            fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+        }
+        catch (HadoopAccessorException ex) {
+            assertEquals(ErrorCode.E0903, ex.getErrorCode());
+        }
     }
 
-    public void testAccessor() throws Exception {
-        Services services = Services.get();
-        HadoopAccessorService has = services.get(HadoopAccessorService.class);
+    public void testCreateYarnClient() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
         JobConf conf = has.createJobConf(getJobTrackerUri());
-        conf.set("mapred.job.tracker", getJobTrackerUri());
-        conf.set("fs.default.name", getNameNodeUri());
 
-        URI uri = new URI(getNameNodeUri());
+        YarnClient yc = has.createYarnClient(getTestUser(), conf);
+        assertNotNull(yc);
+        yc.getApplications();
 
-        //valid user
-        String user = getTestUser();
-        String group = getTestGroup();
+        try {
+            yc = has.createYarnClient("invalid-user", conf);
+            assertNotNull(yc);
+            yc.getApplications();
+            fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
+        }
+        catch (AuthorizationException ex) {
+        }
 
-        JobClient jc = has.createJobClient(user, conf);
-        assertNotNull(jc);
-        FileSystem fs = has.createFileSystem(user, new URI(getNameNodeUri()), conf);
-        assertNotNull(fs);
-        fs = has.createFileSystem(user, uri, conf);
-        assertNotNull(fs);
+        JobConf conf2 = new JobConf(false);
+        conf2.set("yarn.resourcemanager.address", getJobTrackerUri());
+        try {
+            has.createYarnClient(getTestUser(), conf2);
+            fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+        }
+        catch (HadoopAccessorException ex) {
+            assertEquals(ErrorCode.E0903, ex.getErrorCode());
+        }
+    }
 
-        //invalid user
+    public void testCreateFileSystem() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        JobConf conf = has.createJobConf(getJobTrackerUri());
 
-        user = "invalid";
+        FileSystem fs = has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf);
+        assertNotNull(fs);
+        fs.exists(new Path(getNameNodeUri(), "/foo"));
 
         try {
-            has.createJobClient(user, conf);
-            fail();
+            fs = has.createFileSystem("invalid-user", new URI(getNameNodeUri()), conf);
+            assertNotNull(fs);
+            fs.exists(new Path(getNameNodeUri(), "/foo"));
+            fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
         }
-        catch (Throwable ex) {
+        catch (RemoteException ex) {
+            assertEquals(AuthorizationException.class.getName(), ex.getClassName());
         }
 
+        JobConf conf2 = new JobConf(false);
+        conf2.set("fs.default.name", getNameNodeUri());
         try {
-            has.createFileSystem(user, uri, conf);
-            fail();
+            has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf2);
+            fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
         }
-        catch (Throwable ex) {
+        catch (HadoopAccessorException ex) {
+            assertEquals(ErrorCode.E0903, ex.getErrorCode());
         }
     }
 
@@ -290,4 +344,21 @@ public class TestHadoopAccessorService extends XTestCase {
         }
         has.destroy();
     }
+
+    public void testCreateLocalResourceForConfigurationFile() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        String filename = "foo.xml";
+        Configuration conf = has.createJobConf(getNameNodeUri());
+        conf.set("foo", "bar");
+        LocalResource lRes = has.createLocalResourceForConfigurationFile(filename, getTestUser(), conf, getFileSystem().getUri(),
+                getFsTestCaseDir());
+        assertNotNull(lRes);
+        assertEquals(LocalResourceType.FILE, lRes.getType());
+        assertEquals(LocalResourceVisibility.APPLICATION, lRes.getVisibility());
+        Path resPath = ConverterUtils.getPathFromYarnURL(lRes.getResource());
+        assertEquals(new Path(getFsTestCaseDir(), "foo.xml"), resPath);
+        Configuration conf2 = new Configuration(false);
+        conf2.addResource(getFileSystem().open(resPath));
+        assertEquals("bar", conf2.get("foo"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/test/XTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/test/XTestCase.java b/core/src/test/java/org/apache/oozie/test/XTestCase.java
index e360369..81a33fd 100644
--- a/core/src/test/java/org/apache/oozie/test/XTestCase.java
+++ b/core/src/test/java/org/apache/oozie/test/XTestCase.java
@@ -82,6 +82,7 @@ import org.apache.oozie.sla.SLASummaryBean;
 import org.apache.oozie.store.StoreException;
 import org.apache.oozie.test.MiniHCatServer.RUNMODE;
 import org.apache.oozie.test.hive.MiniHS2;
+import org.apache.oozie.util.ClasspathUtils;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XConfiguration;
@@ -877,6 +878,7 @@ public abstract class XTestCase extends TestCase {
 
     private static MiniDFSCluster dfsCluster = null;
     private static MiniDFSCluster dfsCluster2 = null;
+    // TODO: OYA: replace with MiniYarnCluster or MiniMRYarnCluster
     private static MiniMRCluster mrCluster = null;
     private static MiniHCatServer hcatServer = null;
     private static MiniHS2 hiveserver2 = null;
@@ -886,6 +888,8 @@ public abstract class XTestCase extends TestCase {
 			if (System.getProperty("hadoop.log.dir") == null) {
 				System.setProperty("hadoop.log.dir", testCaseDir);
 			}
+            // Tell the ClasspathUtils that we're using a mini cluster
+            ClasspathUtils.setUsingMiniYarnCluster(true);
             int taskTrackers = 2;
             int dataNodes = 2;
             String oozieUser = getOozieUser();

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java b/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java
new file mode 100644
index 0000000..3a7215b
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/util/TestClasspathUtils.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.test.XFsTestCase;
+import org.apache.oozie.test.XTestCase;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TestClasspathUtils extends XFsTestCase {
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        // This is normally true, and adds the entirety of the current classpath in ClasspathUtils, which we don't want to test or
+        // worry about here.  Temporarily set this back to false so it behaves normally.
+        ClasspathUtils.setUsingMiniYarnCluster(false);
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        // Make sure to turn this back on for subsequent tests
+        ClasspathUtils.setUsingMiniYarnCluster(true);
+        super.tearDown();
+    }
+
+    public void testSetupClasspath() throws Exception {
+        Configuration conf = new Configuration(false);
+        Map<String, String> env = new HashMap<String, String>();
+
+        Path p1 = new Path(getFsTestCaseDir(), "foo.xml");
+        getFileSystem().createNewFile(p1);
+        DistributedCache.addFileToClassPath(p1, conf);
+
+        Path p2 = new Path(getFsTestCaseDir(), "foo.txt");
+        getFileSystem().createNewFile(p2);
+        DistributedCache.addFileToClassPath(p2, conf);
+
+        Path p3 = new Path(getFsTestCaseDir(), "foo.zip");
+        getFileSystem().createNewFile(p3);
+        DistributedCache.addArchiveToClassPath(p3, conf);
+
+        ClasspathUtils.setupClasspath(env, conf);
+
+        assertEquals(2, env.size());
+        assertTrue(env.containsKey("CLASSPATH"));
+        String[] paths = env.get("CLASSPATH").split(":");
+        assertEquals(12, paths.length);
+        Arrays.sort(paths);
+        assertEquals("$HADOOP_COMMON_HOME/share/hadoop/common/*", paths[0]);
+        assertEquals("$HADOOP_COMMON_HOME/share/hadoop/common/lib/*", paths[1]);
+        assertEquals("$HADOOP_CONF_DIR", paths[2]);
+        assertEquals("$HADOOP_HDFS_HOME/share/hadoop/hdfs/*", paths[3]);
+        assertEquals("$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*", paths[4]);
+        assertEquals("$HADOOP_YARN_HOME/share/hadoop/yarn/*", paths[5]);
+        assertEquals("$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*", paths[6]);
+        assertEquals("$PWD", paths[7]);
+        assertEquals("$PWD/*", paths[8]);
+        assertEquals("job.jar/classes/", paths[9]);
+        assertEquals("job.jar/job.jar", paths[10]);
+        assertEquals("job.jar/lib/*", paths[11]);
+
+        assertTrue(env.containsKey("$PWD"));
+        paths = env.get("$PWD").split(":");
+        assertEquals(3, paths.length);
+        Arrays.sort(paths);
+        assertEquals("$PWD/foo.txt", paths[0]);
+        assertEquals("$PWD/foo.xml", paths[1]);
+        assertEquals("$PWD/foo.zip", paths[2]);
+    }
+
+    public void testAddMapReduceToClasspath() throws Exception {
+        Configuration conf = new Configuration(false);
+        Map<String, String> env = new HashMap<String, String>();
+
+        ClasspathUtils.addMapReduceToClasspath(env, conf);
+
+        assertEquals(1, env.size());
+        assertTrue(env.containsKey("CLASSPATH"));
+        String[] paths = env.get("CLASSPATH").split(":");
+        assertEquals(2, paths.length);
+        Arrays.sort(paths);
+        assertEquals("$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*", paths[0]);
+        assertEquals("$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*", paths[1]);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 3a23cbf..c75911e 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)
 OOZIE-2316 Drop support for Hadoop 1 and 0.23 (asasvari via rkanter)
 OOZIE-2503 show ChildJobURLs to spark action (satishsaley via puru)
 OOZIE-2551 Feature request: epoch timestamp generation (jtolar via puru)

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/distcp/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/distcp/pom.xml b/sharelib/distcp/pom.xml
index c8cc47c..cb01faa 100644
--- a/sharelib/distcp/pom.xml
+++ b/sharelib/distcp/pom.xml
@@ -91,18 +91,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/hcatalog/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hcatalog/pom.xml b/sharelib/hcatalog/pom.xml
index 2b0c504..f4273a5 100644
--- a/sharelib/hcatalog/pom.xml
+++ b/sharelib/hcatalog/pom.xml
@@ -297,18 +297,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/hive/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive/pom.xml b/sharelib/hive/pom.xml
index d10d7b8..ba49403 100644
--- a/sharelib/hive/pom.xml
+++ b/sharelib/hive/pom.xml
@@ -171,18 +171,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/hive2/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/hive2/pom.xml b/sharelib/hive2/pom.xml
index ce967c5..329832d 100644
--- a/sharelib/hive2/pom.xml
+++ b/sharelib/hive2/pom.xml
@@ -152,18 +152,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/sharelib/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/oozie/pom.xml b/sharelib/oozie/pom.xml
index dd95b45..b2da4e2 100644
--- a/sharelib/oozie/pom.xml
+++ b/sharelib/oozie/pom.xml
@@ -85,18 +85,6 @@
                             <outputFile>${project.build.directory}/classpath</outputFile>
                         </configuration>
                     </execution>
-                    <execution>
-                        <id>create-mrapp-generated-classpath</id>
-                        <phase>generate-test-resources</phase>
-                        <goals>
-                            <goal>build-classpath</goal>
-                        </goals>
-                        <configuration>
-                            <!-- needed to run the unit test for DS to generate the required classpath
-                            that is required in the env of the launch container in the mini mr/yarn cluster -->
-                            <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                        </configuration>
-                    </execution>
                 </executions>
             </plugin>
             <plugin>


[3/3] oozie git commit: OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)

Posted by rk...@apache.org.
OOZIE-2590 OYA: Create basic Oozie Launcher Application Master (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/fea512cf
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/fea512cf
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/fea512cf

Branch: refs/heads/oya
Commit: fea512cf66aec92d867e13c200978fd103868ab1
Parents: a37835f
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Jul 25 18:24:35 2016 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Jul 25 18:24:35 2016 -0700

----------------------------------------------------------------------
 core/pom.xml                                    |  17 -
 .../action/hadoop/DistcpActionExecutor.java     |   7 +-
 .../action/hadoop/Hive2ActionExecutor.java      |   5 +-
 .../oozie/action/hadoop/HiveActionExecutor.java |   5 +-
 .../oozie/action/hadoop/JavaActionExecutor.java | 593 ++++++++---------
 .../action/hadoop/LauncherMapperHelper.java     |  12 -
 .../action/hadoop/SparkActionExecutor.java      |   5 +-
 .../action/hadoop/SqoopActionExecutor.java      |   9 +-
 .../oozie/service/HadoopAccessorService.java    |  97 ++-
 .../org/apache/oozie/util/ClasspathUtils.java   | 145 +++++
 core/src/main/resources/oozie-default.xml       |  25 -
 .../java/org/apache/oozie/QueryServlet.java     |  40 ++
 .../action/hadoop/TestJavaActionExecutor.java   | 531 ++--------------
 .../oozie/action/hadoop/TestLauncherAM.java     |  46 ++
 .../hadoop/TestLauncherAMCallbackNotifier.java  | 170 +++++
 .../action/hadoop/TestPrepareActionsDriver.java |  15 +-
 .../action/hadoop/TestShellActionExecutor.java  |  21 -
 .../apache/oozie/command/wf/HangServlet.java    |  19 +-
 .../oozie/service/TestConfigurationService.java |   3 -
 .../service/TestHadoopAccessorService.java      | 121 +++-
 .../java/org/apache/oozie/test/XTestCase.java   |   4 +
 .../apache/oozie/util/TestClasspathUtils.java   | 110 ++++
 release-log.txt                                 |   1 +
 sharelib/distcp/pom.xml                         |  12 -
 sharelib/hcatalog/pom.xml                       |  12 -
 sharelib/hive/pom.xml                           |  12 -
 sharelib/hive2/pom.xml                          |  12 -
 sharelib/oozie/pom.xml                          |  12 -
 .../apache/oozie/action/hadoop/LauncherAM.java  | 636 +++++++++++++++++++
 .../hadoop/LauncherAMCallbackNotifier.java      | 175 +++++
 .../oozie/action/hadoop/LauncherMapper.java     |   6 +-
 .../action/hadoop/PrepareActionsDriver.java     |  43 +-
 sharelib/pig/pom.xml                            |  12 -
 sharelib/spark/pom.xml                          |  12 -
 sharelib/sqoop/pom.xml                          |  12 -
 sharelib/streaming/pom.xml                      |  33 -
 36 files changed, 1899 insertions(+), 1091 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/pom.xml
----------------------------------------------------------------------
diff --git a/core/pom.xml b/core/pom.xml
index 6584af8..86feea0 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -453,23 +453,6 @@
                 </configuration>
             </plugin>
             <plugin>
-              <artifactId>maven-dependency-plugin</artifactId>
-              <executions>
-                <execution>
-                  <id>create-mrapp-generated-classpath</id>
-                  <phase>generate-test-resources</phase>
-                  <goals>
-                    <goal>build-classpath</goal>
-                  </goals>
-                  <configuration>
-                    <!-- needed to run the unit test for DS to generate the required classpath
-                         that is required in the env of the launch container in the mini mr/yarn cluster -->
-                    <outputFile>${project.build.directory}/test-classes/mrapp-generated-classpath</outputFile>
-                  </configuration>
-                </execution>
-              </executions>
-            </plugin>
-            <plugin>
                 <groupId>org.apache.openjpa</groupId>
                 <artifactId>openjpa-maven-plugin</artifactId>
                 <executions>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
index 96726da..99652e8 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
@@ -26,13 +26,10 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.ActionExecutor.Context;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorException;
-import org.apache.oozie.service.Services;
 import org.apache.oozie.util.XLog;
 import org.jdom.Element;
 import org.jdom.JDOMException;
@@ -126,9 +123,9 @@ public class DistcpActionExecutor extends JavaActionExecutor{
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
index b5b1bf9..9ba6318 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2ActionExecutor.java
@@ -28,7 +28,6 @@ import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -134,9 +133,9 @@ public class Hive2ActionExecutor extends ScriptLanguageActionExecutor {
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
index c74e9e6..a850957 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HiveActionExecutor.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.XOozieClient;
@@ -125,9 +124,9 @@ public class HiveActionExecutor extends ScriptLanguageActionExecutor {
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 99e3344..d573fc3 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -18,42 +18,38 @@
 
 package org.apache.oozie.action.hadoop;
 
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.StringReader;
-import java.net.ConnectException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Properties;
-import java.util.Set;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.AccessControlException;
+import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.client.api.YarnClientApplication;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
@@ -69,18 +65,41 @@ import org.apache.oozie.service.Services;
 import org.apache.oozie.service.ShareLibService;
 import org.apache.oozie.service.URIHandlerService;
 import org.apache.oozie.service.WorkflowAppService;
+import org.apache.oozie.util.ClasspathUtils;
 import org.apache.oozie.util.ELEvaluationException;
 import org.apache.oozie.util.ELEvaluator;
-import org.apache.oozie.util.XLog;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.util.JobUtils;
 import org.apache.oozie.util.LogUtils;
 import org.apache.oozie.util.PropertiesUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+import org.apache.oozie.util.XmlUtils;
 import org.jdom.Element;
 import org.jdom.JDOMException;
 import org.jdom.Namespace;
 
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 
 public class JavaActionExecutor extends ActionExecutor {
 
@@ -94,7 +113,6 @@ public class JavaActionExecutor extends ActionExecutor {
     public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
     public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
     public static final String HADOOP_YARN_TIMELINE_SERVICE_ENABLED = "yarn.timeline-service.enabled";
-    public static final String HADOOP_YARN_UBER_MODE = "mapreduce.job.ubertask.enable";
     public static final String HADOOP_YARN_KILL_CHILD_JOBS_ON_AMRESTART = "oozie.action.launcher.am.restart.kill.childjobs";
     public static final String HADOOP_MAP_MEMORY_MB = "mapreduce.map.memory.mb";
     public static final String HADOOP_CHILD_JAVA_OPTS = "mapred.child.java.opts";
@@ -117,7 +135,6 @@ public class JavaActionExecutor extends ActionExecutor {
     protected XLog LOG = XLog.getLog(getClass());
     private static final Pattern heapPattern = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
     private static final String JAVA_TMP_DIR_SETTINGS = "-Djava.io.tmpdir=";
-    public static final String CONF_HADOOP_YARN_UBER_MODE = "oozie.action.launcher." + HADOOP_YARN_UBER_MODE;
     public static final String HADOOP_JOB_CLASSLOADER = "mapreduce.job.classloader";
     public static final String HADOOP_USER_CLASSPATH_FIRST = "mapreduce.user.classpath.first";
     public static final String OOZIE_CREDENTIALS_SKIP = "oozie.credentials.skip";
@@ -138,10 +155,11 @@ public class JavaActionExecutor extends ActionExecutor {
 
     public static List<Class> getCommonLauncherClasses() {
         List<Class> classes = new ArrayList<Class>();
-        classes.add(LauncherMapper.class);
         classes.add(OozieLauncherInputFormat.class);
         classes.add(LauncherMain.class);
         classes.addAll(Services.get().get(URIHandlerService.class).getClassesForLauncher());
+        classes.add(LauncherAM.class);
+        classes.add(LauncherAMCallbackNotifier.class);
         return classes;
     }
 
@@ -159,7 +177,7 @@ public class JavaActionExecutor extends ActionExecutor {
     @Override
     public void initActionType() {
         super.initActionType();
-        maxActionOutputLen = ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
+        maxActionOutputLen = ConfigurationService.getInt(LauncherAM.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA);
         //Get the limit for the maximum allowed size of action stats
         maxExternalStatsSize = ConfigurationService.getInt(JavaActionExecutor.MAX_EXTERNAL_STATS_SIZE);
         maxExternalStatsSize = (maxExternalStatsSize == -1) ? Integer.MAX_VALUE : maxExternalStatsSize;
@@ -257,8 +275,6 @@ public class JavaActionExecutor extends ActionExecutor {
             } catch (URISyntaxException ex) {
                 throw convertException(ex);
             }
-            // Inject use uber mode for launcher
-            injectLauncherUseUberMode(launcherConf);
             XConfiguration.copy(launcherConf, conf);
             checkForDisallowedProps(launcherConf, "launcher configuration");
             // Inject config-class for launcher to use for action
@@ -273,25 +289,6 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    void injectLauncherUseUberMode(Configuration launcherConf) {
-        // Set Uber Mode for the launcher (YARN only, ignored by MR1)
-        // Priority:
-        // 1. action's <configuration>
-        // 2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable
-        // 3. oozie.action.launcher.mapreduce.job.ubertask.enable
-        if (launcherConf.get(HADOOP_YARN_UBER_MODE) == null) {
-            if (ConfigurationService.get("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE).length() > 0) {
-                if (ConfigurationService.getBoolean("oozie.action." + getType() + ".launcher." + HADOOP_YARN_UBER_MODE)) {
-                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
-                }
-            } else {
-                if (ConfigurationService.getBoolean("oozie.action.launcher." + HADOOP_YARN_UBER_MODE)) {
-                    launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, true);
-                }
-            }
-        }
-    }
-
     void injectLauncherTimelineServiceEnabled(Configuration launcherConf, Configuration actionConf) {
         // Getting delegation token for ATS. If tez-site.xml is present in distributed cache, turn on timeline service.
         if (actionConf.get("oozie.launcher." + HADOOP_YARN_TIMELINE_SERVICE_ENABLED) == null
@@ -303,104 +300,6 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    void updateConfForUberMode(Configuration launcherConf) {
-
-        // child.env
-        boolean hasConflictEnv = false;
-        String launcherMapEnv = launcherConf.get(HADOOP_MAP_JAVA_ENV);
-        if (launcherMapEnv == null) {
-            launcherMapEnv = launcherConf.get(HADOOP_CHILD_JAVA_ENV);
-        }
-        String amEnv = launcherConf.get(YARN_AM_ENV);
-        StringBuffer envStr = new StringBuffer();
-        HashMap<String, List<String>> amEnvMap = null;
-        HashMap<String, List<String>> launcherMapEnvMap = null;
-        if (amEnv != null) {
-            envStr.append(amEnv);
-            amEnvMap = populateEnvMap(amEnv);
-        }
-        if (launcherMapEnv != null) {
-            launcherMapEnvMap = populateEnvMap(launcherMapEnv);
-            if (amEnvMap != null) {
-                Iterator<String> envKeyItr = launcherMapEnvMap.keySet().iterator();
-                while (envKeyItr.hasNext()) {
-                    String envKey = envKeyItr.next();
-                    if (amEnvMap.containsKey(envKey)) {
-                        List<String> amValList = amEnvMap.get(envKey);
-                        List<String> launcherValList = launcherMapEnvMap.get(envKey);
-                        Iterator<String> valItr = launcherValList.iterator();
-                        while (valItr.hasNext()) {
-                            String val = valItr.next();
-                            if (!amValList.contains(val)) {
-                                hasConflictEnv = true;
-                                break;
-                            }
-                            else {
-                                valItr.remove();
-                            }
-                        }
-                        if (launcherValList.isEmpty()) {
-                            envKeyItr.remove();
-                        }
-                    }
-                }
-            }
-        }
-        if (hasConflictEnv) {
-            launcherConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
-        }
-        else {
-            if (launcherMapEnvMap != null) {
-                for (String key : launcherMapEnvMap.keySet()) {
-                    List<String> launcherValList = launcherMapEnvMap.get(key);
-                    for (String val : launcherValList) {
-                        if (envStr.length() > 0) {
-                            envStr.append(",");
-                        }
-                        envStr.append(key).append("=").append(val);
-                    }
-                }
-            }
-
-            launcherConf.set(YARN_AM_ENV, envStr.toString());
-
-            // memory.mb
-            int launcherMapMemoryMB = launcherConf.getInt(HADOOP_MAP_MEMORY_MB, 1536);
-            int amMemoryMB = launcherConf.getInt(YARN_AM_RESOURCE_MB, 1536);
-            // YARN_MEMORY_MB_MIN to provide buffer.
-            // suppose launcher map aggressively use high memory, need some
-            // headroom for AM
-            int memoryMB = Math.max(launcherMapMemoryMB, amMemoryMB) + YARN_MEMORY_MB_MIN;
-            // limit to 4096 in case of 32 bit
-            if (launcherMapMemoryMB < 4096 && amMemoryMB < 4096 && memoryMB > 4096) {
-                memoryMB = 4096;
-            }
-            launcherConf.setInt(YARN_AM_RESOURCE_MB, memoryMB);
-
-            // We already made mapred.child.java.opts and
-            // mapreduce.map.java.opts equal, so just start with one of them
-            String launcherMapOpts = launcherConf.get(HADOOP_MAP_JAVA_OPTS, "");
-            String amChildOpts = launcherConf.get(YARN_AM_COMMAND_OPTS);
-            StringBuilder optsStr = new StringBuilder();
-            int heapSizeForMap = extractHeapSizeMB(launcherMapOpts);
-            int heapSizeForAm = extractHeapSizeMB(amChildOpts);
-            int heapSize = Math.max(heapSizeForMap, heapSizeForAm) + YARN_MEMORY_MB_MIN;
-            // limit to 3584 in case of 32 bit
-            if (heapSizeForMap < 4096 && heapSizeForAm < 4096 && heapSize > 3584) {
-                heapSize = 3584;
-            }
-            if (amChildOpts != null) {
-                optsStr.append(amChildOpts);
-            }
-            optsStr.append(" ").append(launcherMapOpts.trim());
-            if (heapSize > 0) {
-                // append calculated total heap size to the end
-                optsStr.append(" ").append("-Xmx").append(heapSize).append("m");
-            }
-            launcherConf.set(YARN_AM_COMMAND_OPTS, optsStr.toString().trim());
-        }
-    }
-
     void updateConfForJavaTmpDir(Configuration conf) {
         String amChildOpts = conf.get(YARN_AM_COMMAND_OPTS);
         String oozieJavaTmpDirSetting = "-Djava.io.tmpdir=./tmp";
@@ -868,7 +767,7 @@ public class JavaActionExecutor extends ActionExecutor {
 
 
     protected String getLauncherMain(Configuration launcherConf, Element actionXml) {
-        return launcherConf.get(LauncherMapper.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
+        return launcherConf.get(LauncherAM.CONF_OOZIE_ACTION_MAIN_CLASS, JavaMain.class.getName());
     }
 
     private void setJavaMain(Configuration actionConf, Element actionXml) {
@@ -1004,15 +903,6 @@ public class JavaActionExecutor extends ActionExecutor {
             launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
             launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
 
-            // setting for uber mode
-            if (launcherJobConf.getBoolean(HADOOP_YARN_UBER_MODE, false)) {
-                if (checkPropertiesToDisableUber(launcherJobConf)) {
-                    launcherJobConf.setBoolean(HADOOP_YARN_UBER_MODE, false);
-                }
-                else {
-                    updateConfForUberMode(launcherJobConf);
-                }
-            }
             updateConfForJavaTmpDir(launcherJobConf);
             injectLauncherTimelineServiceEnabled(launcherJobConf, actionConf);
 
@@ -1027,23 +917,9 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    private boolean checkPropertiesToDisableUber(Configuration launcherConf) {
-        boolean disable = false;
-        if (launcherConf.getBoolean(HADOOP_JOB_CLASSLOADER, false)) {
-            disable = true;
-        }
-        else if (launcherConf.getBoolean(HADOOP_USER_CLASSPATH_FIRST, false)) {
-            disable = true;
-        }
-        return disable;
-    }
-
     private void injectCallback(Context context, Configuration conf) {
-        String callback = context.getCallbackUrl("$jobStatus");
-        if (conf.get("job.end.notification.url") != null) {
-            LOG.warn("Overriding the action job end notification URI");
-        }
-        conf.set("job.end.notification.url", callback);
+        String callback = context.getCallbackUrl(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_JOBSTATUS_TOKEN);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, callback);
     }
 
     void injectActionCallback(Context context, Configuration actionConf) {
@@ -1062,7 +938,7 @@ public class JavaActionExecutor extends ActionExecutor {
         }
     }
 
-    public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction action) throws ActionExecutorException {
+    public void submitLauncher(FileSystem actionFs, final Context context, WorkflowAction action) throws ActionExecutorException {
         JobClient jobClient = null;
         boolean exception = false;
         try {
@@ -1119,14 +995,17 @@ public class JavaActionExecutor extends ActionExecutor {
                     }
                 }
             }
-
             JobConf launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf);
 
-            LOG.debug("Creating Job Client for action " + action.getId());
-            jobClient = createJobClient(context, launcherJobConf);
-            String launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
-                    .getRecoveryId());
-            boolean alreadyRunning = launcherId != null;
+            boolean alreadyRunning = false;
+            String launcherId = null;
+            String consoleUrl = null;
+            // TODO: OYA: equivalent of this? (recovery, alreadyRunning)  When does this happen?
+//            LOG.debug("Creating Job Client for action " + action.getId());
+//            jobClient = createJobClient(context, launcherJobConf);
+//            launcherId = LauncherMapperHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context
+//                    .getRecoveryId());
+//            alreadyRunning = launcherId != null;
             RunningJob runningJob;
 
             // if user-retry is on, always submit new launcher
@@ -1141,13 +1020,13 @@ public class JavaActionExecutor extends ActionExecutor {
                 }
             }
             else {
-                LOG.debug("Submitting the job through Job Client for action " + action.getId());
-
-                // setting up propagation of the delegation token.
-                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
-                Token<DelegationTokenIdentifier> mrdt = jobClient.getDelegationToken(has
-                        .getMRDelegationTokenRenewer(launcherJobConf));
-                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
+                // TODO: OYA: do we actually need an MR token?  IIRC, it's issued by the JHS
+//                // setting up propagation of the delegation token.
+//                Token<DelegationTokenIdentifier> mrdt = null;
+//                HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+//                mrdt = jobClient.getDelegationToken(has
+//                        .getMRDelegationTokenRenewer(launcherJobConf));
+//                launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt);
 
                 // insert credentials tokens to launcher job conf if needed
                 if (needInjectCredentials() && credentialsConf != null) {
@@ -1173,17 +1052,36 @@ public class JavaActionExecutor extends ActionExecutor {
                 else {
                     LOG.info("No need to inject credentials.");
                 }
-                runningJob = jobClient.submitJob(launcherJobConf);
-                if (runningJob == null) {
-                    throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017",
-                            "Error submitting launcher for action [{0}]", action.getId());
+
+                YarnClient yarnClient = null;
+                try {
+                    String user = context.getWorkflow().getUser();
+
+                    // Create application
+                    yarnClient = createYarnClient(context, launcherJobConf);
+                    YarnClientApplication newApp = yarnClient.createApplication();
+                    ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
+
+                    // Create launch context for app master
+                    ApplicationSubmissionContext appContext =
+                            createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf);
+
+                    // Submit the launcher AM
+                    yarnClient.submitApplication(appContext);
+
+                    launcherId = appId.toString();
+                    LOG.debug("After submission get the launcherId [{0}]", launcherId);
+                    ApplicationReport appReport = yarnClient.getApplicationReport(appId);
+                    consoleUrl = appReport.getTrackingUrl();
+                } finally {
+                    if (yarnClient != null) {
+                        yarnClient.close();
+                        yarnClient = null;
+                    }
                 }
-                launcherId = runningJob.getID().toString();
-                LOG.debug("After submission get the launcherId " + launcherId);
             }
 
             String jobTracker = launcherJobConf.get(HADOOP_YARN_RM);
-            String consoleUrl = runningJob.getTrackingURL();
             context.setStartData(launcherId, jobTracker, consoleUrl);
         }
         catch (Exception ex) {
@@ -1206,6 +1104,91 @@ public class JavaActionExecutor extends ActionExecutor {
             }
         }
     }
+
+    private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, JobConf launcherJobConf, String user,
+                                                                    Context context, Configuration actionConf)
+            throws IOException, HadoopAccessorException, URISyntaxException {
+        // Create launch context for app master
+        ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
+
+        // set the application id
+        appContext.setApplicationId(appId);
+
+        // set the application name
+        appContext.setApplicationName(launcherJobConf.getJobName());
+        appContext.setApplicationType("Oozie Launcher");
+
+        // Set the priority for the application master
+        Priority pri = Records.newRecord(Priority.class);
+        int priority = 0; // TODO: OYA: Add a constant or a config
+        pri.setPriority(priority);
+        appContext.setPriority(pri);
+
+        // Set the queue to which this application is to be submitted in the RM
+        appContext.setQueue(launcherJobConf.getQueueName());
+
+        // Set up the container launch context for the application master
+        ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
+
+        // Set the resources to localize
+        Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+        ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf);
+        MRApps.setupDistributedCache(launcherJobConf, localResources);
+        // Add the Launcher and Action configs as Resources
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user,
+                launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir());
+        localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR);
+        LocalResource actionConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.ACTION_CONF_XML, user, actionConf,
+                context.getAppFileSystem().getUri(), context.getActionDir());
+        localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR);
+        amContainer.setLocalResources(localResources);
+
+        // Set the environment variables
+        Map<String, String> env = new HashMap<String, String>();
+        // This adds the Hadoop jars to the classpath in the Launcher JVM
+        ClasspathUtils.setupClasspath(env, launcherJobConf);
+        if (false) {        // TODO: OYA: config to add MR jars?  Probably also needed for MR Action
+            ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+        }
+        amContainer.setEnvironment(env);
+
+        // Set the command
+        List<String> vargs = new ArrayList<String>(6);
+        vargs.add(MRApps.crossPlatformifyMREnv(launcherJobConf, ApplicationConstants.Environment.JAVA_HOME)
+                + "/bin/java");
+        // TODO: OYA: remove attach debugger to AM; useful for debugging
+//                    vargs.add("-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005");
+        MRApps.addLog4jSystemProperties("INFO", 1024 * 1024, 0, vargs);
+        vargs.add(LauncherAM.class.getName());
+        vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+                Path.SEPARATOR + ApplicationConstants.STDOUT);
+        vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
+                Path.SEPARATOR + ApplicationConstants.STDERR);
+        List<String> vargsFinal = new ArrayList<String>(6);
+        StringBuilder mergedCommand = new StringBuilder();
+        for (CharSequence str : vargs) {
+            mergedCommand.append(str).append(" ");
+        }
+        vargsFinal.add(mergedCommand.toString());
+        LOG.debug("Command to launch container for ApplicationMaster is : "
+                + mergedCommand);
+        amContainer.setCommands(vargsFinal);
+        appContext.setAMContainerSpec(amContainer);
+
+        // Set tokens
+        DataOutputBuffer dob = new DataOutputBuffer();
+        launcherJobConf.getCredentials().writeTokenStorageToStream(dob);
+        amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
+
+        // Set Resources
+        // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores)
+        Resource resource = Resource.newInstance(2048, 1);
+        appContext.setResource(resource);
+
+        return appContext;
+    }
+
     private boolean needInjectCredentials() {
         boolean methodExists = true;
 
@@ -1409,6 +1392,19 @@ public class JavaActionExecutor extends ActionExecutor {
         return Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
     }
 
+    /**
+     * Create yarn client object
+     *
+     * @param context
+     * @param jobConf
+     * @return YarnClient
+     * @throws HadoopAccessorException
+     */
+    protected YarnClient createYarnClient(Context context, JobConf jobConf) throws HadoopAccessorException {
+        String user = context.getWorkflow().getUser();
+        return Services.get().get(HadoopAccessorService.class).createYarnClient(user, jobConf);
+    }
+
     protected RunningJob getRunningJob(Context context, WorkflowAction action, JobClient jobClient) throws Exception{
         RunningJob runningJob = jobClient.getJob(JobID.forName(action.getExternalId()));
         return runningJob;
@@ -1425,129 +1421,112 @@ public class JavaActionExecutor extends ActionExecutor {
 
     @Override
     public void check(Context context, WorkflowAction action) throws ActionExecutorException {
-        JobClient jobClient = null;
-        boolean exception = false;
+        boolean fallback = false;
+        LOG = XLog.resetPrefix(LOG);
         LogUtils.setLogInfo(action);
+        YarnClient yarnClient = null;
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
-            FileSystem actionFs = context.getAppFileSystem();
             JobConf jobConf = createBaseHadoopConf(context, actionXml);
-            jobClient = createJobClient(context, jobConf);
-            RunningJob runningJob = getRunningJob(context, action, jobClient);
-            if (runningJob == null) {
-                context.setExecutionData(FAILED, null);
-                throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
-                        "Could not lookup launched hadoop Job ID [{0}] which was associated with " +
-                        " action [{1}].  Failing this action!", getActualExternalId(action), action.getId());
-            }
-            if (runningJob.isComplete()) {
+            FileSystem actionFs = context.getAppFileSystem();
+            yarnClient = createYarnClient(context, jobConf);
+            FinalApplicationStatus appStatus = null;
+            try {
+                ApplicationReport appReport =
+                        yarnClient.getApplicationReport(ConverterUtils.toApplicationId(action.getExternalId()));
+                YarnApplicationState appState = appReport.getYarnApplicationState();
+                if (appState == YarnApplicationState.FAILED || appState == YarnApplicationState.FINISHED
+                        || appState == YarnApplicationState.KILLED) {
+                    appStatus = appReport.getFinalApplicationStatus();
+                }
+
+            } catch (Exception ye) {
+                LOG.debug("Exception occurred while checking Launcher AM status; will try checking action data file instead ", ye);
+                // Fallback to action data file if we can't find the Launcher AM (maybe it got purged)
+                fallback = true;
+            }
+            if (appStatus != null || fallback) {
                 Path actionDir = context.getActionDir();
                 String newId = null;
                 // load sequence file into object
                 Map<String, String> actionData = LauncherMapperHelper.getActionData(actionFs, actionDir, jobConf);
-                if (actionData.containsKey(LauncherMapper.ACTION_DATA_NEW_ID)) {
-                    newId = actionData.get(LauncherMapper.ACTION_DATA_NEW_ID);
-                    String launcherId = action.getExternalId();
-                    runningJob = jobClient.getJob(JobID.forName(newId));
-                    if (runningJob == null) {
-                        context.setExternalStatus(FAILED);
+                if (fallback) {
+                    String finalStatus = actionData.get(LauncherAM.ACTION_DATA_FINAL_STATUS);
+                    if (finalStatus != null) {
+                        appStatus = FinalApplicationStatus.valueOf(finalStatus);
+                    } else {
+                        context.setExecutionData(FAILED, null);
                         throw new ActionExecutorException(ActionExecutorException.ErrorType.FAILED, "JA017",
-                                "Unknown hadoop job [{0}] associated with action [{1}].  Failing this action!", newId,
-                                action.getId());
+                                "Unknown hadoop job [{0}] associated with action [{1}] and couldn't determine status from" +
+                                        " action data.  Failing this action!", action.getExternalId(), action.getId());
                     }
-                    context.setExternalChildIDs(newId);
-                    LOG.info(XLog.STD, "External ID swap, old ID [{0}] new ID [{1}]", launcherId,
-                            newId);
                 }
-                else {
-                    String externalIDs = actionData.get(LauncherMapper.ACTION_DATA_EXTERNAL_CHILD_IDS);
-                    if (externalIDs != null) {
-                        context.setExternalChildIDs(externalIDs);
-                        LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
-                    }
+                String externalIDs = actionData.get(LauncherAM.ACTION_DATA_EXTERNAL_CHILD_IDS);
+                if (externalIDs != null) {
+                    context.setExternalChildIDs(externalIDs);
+                    LOG.info(XLog.STD, "Hadoop Jobs launched : [{0}]", externalIDs);
                 }
-                if (runningJob.isComplete()) {
-                    // fetching action output and stats for the Map-Reduce action.
-                    if (newId != null) {
-                        actionData = LauncherMapperHelper.getActionData(actionFs, context.getActionDir(), jobConf);
+                LOG.info(XLog.STD, "action completed, external ID [{0}]",
+                        action.getExternalId());
+                context.setExecutionData(appStatus.toString(), null);
+                if (appStatus == FinalApplicationStatus.SUCCEEDED) {
+                    if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
+                        context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
+                                .get(LauncherAM.ACTION_DATA_OUTPUT_PROPS)));
+                        LOG.info(XLog.STD, "action produced output");
                     }
-                    LOG.info(XLog.STD, "action completed, external ID [{0}]",
-                            action.getExternalId());
-                    if (LauncherMapperHelper.isMainSuccessful(runningJob)) {
-                        if (getCaptureOutput(action) && LauncherMapperHelper.hasOutputData(actionData)) {
-                            context.setExecutionData(SUCCEEDED, PropertiesUtils.stringToProperties(actionData
-                                    .get(LauncherMapper.ACTION_DATA_OUTPUT_PROPS)));
-                            LOG.info(XLog.STD, "action produced output");
+                    else {
+                        context.setExecutionData(SUCCEEDED, null);
+                    }
+                    if (LauncherMapperHelper.hasStatsData(actionData)) {
+                        context.setExecutionStats(actionData.get(LauncherAM.ACTION_DATA_STATS));
+                        LOG.info(XLog.STD, "action produced stats");
+                    }
+                    getActionData(actionFs, action, context);
+                }
+                else {
+                    String errorReason;
+                    if (actionData.containsKey(LauncherAM.ACTION_DATA_ERROR_PROPS)) {
+                        Properties props = PropertiesUtils.stringToProperties(actionData
+                                .get(LauncherAM.ACTION_DATA_ERROR_PROPS));
+                        String errorCode = props.getProperty("error.code");
+                        if ("0".equals(errorCode)) {
+                            errorCode = "JA018";
                         }
-                        else {
-                            context.setExecutionData(SUCCEEDED, null);
+                        if ("-1".equals(errorCode)) {
+                            errorCode = "JA019";
                         }
-                        if (LauncherMapperHelper.hasStatsData(actionData)) {
-                            context.setExecutionStats(actionData.get(LauncherMapper.ACTION_DATA_STATS));
-                            LOG.info(XLog.STD, "action produced stats");
+                        errorReason = props.getProperty("error.reason");
+                        LOG.warn("Launcher ERROR, reason: {0}", errorReason);
+                        String exMsg = props.getProperty("exception.message");
+                        String errorInfo = (exMsg != null) ? exMsg : errorReason;
+                        context.setErrorInfo(errorCode, errorInfo);
+                        String exStackTrace = props.getProperty("exception.stacktrace");
+                        if (exMsg != null) {
+                            LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
                         }
-                        getActionData(actionFs, runningJob, action, context);
                     }
                     else {
-                        String errorReason;
-                        if (actionData.containsKey(LauncherMapper.ACTION_DATA_ERROR_PROPS)) {
-                            Properties props = PropertiesUtils.stringToProperties(actionData
-                                    .get(LauncherMapper.ACTION_DATA_ERROR_PROPS));
-                            String errorCode = props.getProperty("error.code");
-                            if ("0".equals(errorCode)) {
-                                errorCode = "JA018";
-                            }
-                            if ("-1".equals(errorCode)) {
-                                errorCode = "JA019";
-                            }
-                            errorReason = props.getProperty("error.reason");
-                            LOG.warn("Launcher ERROR, reason: {0}", errorReason);
-                            String exMsg = props.getProperty("exception.message");
-                            String errorInfo = (exMsg != null) ? exMsg : errorReason;
-                            context.setErrorInfo(errorCode, errorInfo);
-                            String exStackTrace = props.getProperty("exception.stacktrace");
-                            if (exMsg != null) {
-                                LOG.warn("Launcher exception: {0}{E}{1}", exMsg, exStackTrace);
-                            }
-                        }
-                        else {
-                            errorReason = XLog.format("LauncherMapper died, check Hadoop LOG for job [{0}:{1}]", action
-                                    .getTrackerUri(), action.getExternalId());
-                            LOG.warn(errorReason);
-                        }
-                        context.setExecutionData(FAILED_KILLED, null);
+                        errorReason = XLog.format("Launcher AM died, check Hadoop LOG for job [{0}:{1}]", action
+                                .getTrackerUri(), action.getExternalId());
+                        LOG.warn(errorReason);
                     }
-                }
-                else {
-                    context.setExternalStatus("RUNNING");
-                    LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
-                            runningJob.getID());
+                    context.setExecutionData(FAILED_KILLED, null);
                 }
             }
             else {
-                context.setExternalStatus("RUNNING");
+                context.setExternalStatus(YarnApplicationState.RUNNING.toString());
                 LOG.info(XLog.STD, "checking action, hadoop job ID [{0}] status [RUNNING]",
-                        runningJob.getID());
+                        action.getExternalId());
             }
         }
         catch (Exception ex) {
             LOG.warn("Exception in check(). Message[{0}]", ex.getMessage(), ex);
-            exception = true;
             throw convertException(ex);
         }
         finally {
-            if (jobClient != null) {
-                try {
-                    jobClient.close();
-                }
-                catch (Exception e) {
-                    if (exception) {
-                        LOG.error("JobClient error: ", e);
-                    }
-                    else {
-                        throw convertException(e);
-                    }
-                }
+            if (yarnClient != null) {
+                IOUtils.closeQuietly(yarnClient);
             }
         }
     }
@@ -1555,14 +1534,12 @@ public class JavaActionExecutor extends ActionExecutor {
     /**
      * Get the output data of an action. Subclasses should override this method
      * to get action specific output data.
-     *
      * @param actionFs the FileSystem object
-     * @param runningJob the runningJob
      * @param action the Workflow action
      * @param context executor context
      *
      */
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
     }
 
@@ -1585,38 +1562,28 @@ public class JavaActionExecutor extends ActionExecutor {
 
     @Override
     public void kill(Context context, WorkflowAction action) throws ActionExecutorException {
-        JobClient jobClient = null;
-        boolean exception = false;
+        YarnClient yarnClient = null;
         try {
             Element actionXml = XmlUtils.parseXml(action.getConf());
+            String user = context.getWorkflow().getUser();
             JobConf jobConf = createBaseHadoopConf(context, actionXml);
-            jobClient = createJobClient(context, jobConf);
-            RunningJob runningJob = getRunningJob(context, action, jobClient);
-            if (runningJob != null) {
-                runningJob.killJob();
-            }
+            yarnClient = createYarnClient(context, jobConf);
+            yarnClient.killApplication(ConverterUtils.toApplicationId(action.getExternalId()));
             context.setExternalStatus(KILLED);
             context.setExecutionData(KILLED, null);
-        }
-        catch (Exception ex) {
-            exception = true;
+        } catch (Exception ex) {
+            LOG.error("Error: ", ex);
             throw convertException(ex);
-        }
-        finally {
+        } finally {
             try {
                 FileSystem actionFs = context.getAppFileSystem();
                 cleanUpActionDir(actionFs, context);
-                if (jobClient != null) {
-                    jobClient.close();
-                }
-            }
-            catch (Exception ex) {
-                if (exception) {
-                    LOG.error("Error: ", ex);
-                }
-                else {
-                    throw convertException(ex);
+                if (yarnClient != null) {
+                    yarnClient.close();
                 }
+            } catch (Exception ex) {
+                LOG.error("Error: ", ex);
+                throw convertException(ex);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
index 69e1044..07d1262 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/LauncherMapperHelper.java
@@ -145,18 +145,6 @@ public class LauncherMapperHelper {
           launcherConf.setBoolean("oozie.hadoop-2.0.2-alpha.workaround.for.distributed.cache", true);
         }
 
-        FileSystem fs =
-          Services.get().get(HadoopAccessorService.class).createFileSystem(launcherConf.get("user.name"),
-                                                                           actionDir.toUri(), launcherConf);
-        fs.mkdirs(actionDir);
-
-        OutputStream os = fs.create(new Path(actionDir, LauncherMapper.ACTION_CONF_XML));
-        try {
-            actionConf.writeXml(os);
-        } finally {
-            IOUtils.closeSafely(os);
-        }
-
         launcherConf.setInputFormat(OozieLauncherInputFormat.class);
         launcherConf.set("mapred.output.dir", new Path(actionDir, "output").toString());
     }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 252f461..6a41235 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.service.HadoopAccessorException;
@@ -157,9 +156,9 @@ public class SparkActionExecutor extends JavaActionExecutor {
     }
 
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException {
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
index 6813a37..82e5f0c 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
@@ -23,7 +23,6 @@ import java.io.StringReader;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Properties;
 import java.util.StringTokenizer;
 
 import org.apache.hadoop.conf.Configuration;
@@ -232,17 +231,15 @@ public class SqoopActionExecutor extends JavaActionExecutor {
 
     /**
      * Get the stats and external child IDs
-     *
-     * @param actionFs the FileSystem object
-     * @param runningJob the runningJob
+     *  @param actionFs the FileSystem object
      * @param action the Workflow action
      * @param context executor context
      *
      */
     @Override
-    protected void getActionData(FileSystem actionFs, RunningJob runningJob, WorkflowAction action, Context context)
+    protected void getActionData(FileSystem actionFs, WorkflowAction action, Context context)
             throws HadoopAccessorException, JDOMException, IOException, URISyntaxException{
-        super.getActionData(actionFs, runningJob, action, context);
+        super.getActionData(actionFs, action, context);
         readExternalChildIDs(action, context);
     }
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
index 794e825..0177241 100644
--- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
+++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.service;
 
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
@@ -29,7 +30,14 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.hadoop.yarn.util.Records;
 import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.hadoop.JavaActionExecutor;
 import org.apache.oozie.util.ParamChecker;
 import org.apache.oozie.util.XConfiguration;
@@ -39,6 +47,7 @@ import org.apache.oozie.workflow.lite.LiteWorkflowAppParser;
 
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.InputStream;
@@ -511,9 +520,43 @@ public class HadoopAccessorService implements Service {
     }
 
     /**
-     * Return a FileSystem created with the provided user for the specified URI.
+     * Return a YarnClient created with the provided user and configuration.
      *
+     * @param user The username to impersonate
+     * @param conf The conf
+     * @return a YarnClient with the provided user and configuration
+     * @throws HadoopAccessorException if the client could not be created.
+     */
+    public YarnClient createYarnClient(String user, final Configuration conf) throws HadoopAccessorException {
+        ParamChecker.notEmpty(user, "user");
+        if (!conf.getBoolean(OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED, false)) {
+            throw new HadoopAccessorException(ErrorCode.E0903);
+        }
+        String rm = conf.get(JavaActionExecutor.HADOOP_YARN_RM);
+        validateJobTracker(rm);
+        try {
+            UserGroupInformation ugi = getUGI(user);
+            YarnClient yarnClient = ugi.doAs(new PrivilegedExceptionAction<YarnClient>() {
+                @Override
+                public YarnClient run() throws Exception {
+                    YarnClient yarnClient = YarnClient.createYarnClient();
+                    yarnClient.init(conf);
+                    yarnClient.start();
+                    return yarnClient;
+                }
+            });
+            return yarnClient;
+        } catch (InterruptedException ex) {
+            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+        } catch (IOException ex) {
+            throw new HadoopAccessorException(ErrorCode.E0902, ex.getMessage(), ex);
+        }
+    }
+
+    /**
+     * Return a FileSystem created with the provided user for the specified URI.
      *
+     * @param user The username to impersonate
      * @param uri file system URI.
      * @param conf Configuration with all necessary information to create the FileSystem.
      * @return FileSystem created with the provided user/group.
@@ -667,4 +710,56 @@ public class HadoopAccessorService implements Service {
         return supportedSchemes;
     }
 
+    /**
+     * Creates a {@link LocalResource} for the Configuration to localize it for a Yarn Container.  This involves also writing it
+     * to HDFS.
+     * Example usage:
+     * * <pre>
+     * {@code
+     * LocalResource res1 = createLocalResourceForConfigurationFile(filename1, user, conf, uri, dir);
+     * LocalResource res2 = createLocalResourceForConfigurationFile(filename2, user, conf, uri, dir);
+     * ...
+     * Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
+     * localResources.put(filename1, res1);
+     * localResources.put(filename2, res2);
+     * ...
+     * containerLaunchContext.setLocalResources(localResources);
+     * }
+     * </pre>
+     *
+     * @param filename The filename to use on the remote filesystem and once it has been localized.
+     * @param user The user
+     * @param conf The configuration to process
+     * @param uri The URI of the remote filesystem (e.g. HDFS)
+     * @param dir The directory on the remote filesystem to write the file to
+     * @return
+     * @throws IOException A problem occurred writing the file
+     * @throws HadoopAccessorException A problem occured with Hadoop
+     * @throws URISyntaxException A problem occurred parsing the URI
+     */
+    public LocalResource createLocalResourceForConfigurationFile(String filename, String user, Configuration conf, URI uri,
+                                                                 Path dir)
+            throws IOException, HadoopAccessorException, URISyntaxException {
+        File f = File.createTempFile(filename, ".tmp");
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(f);
+            conf.writeXml(fos);
+        } finally {
+            if (fos != null) {
+                fos.close();
+            }
+        }
+        FileSystem fs = createFileSystem(user, uri, conf);
+        Path dst = new Path(dir, filename);
+        fs.copyFromLocalFile(new Path(f.getAbsolutePath()), dst);
+        LocalResource localResource = Records.newRecord(LocalResource.class);
+        localResource.setType(LocalResourceType.FILE); localResource.setVisibility(LocalResourceVisibility.APPLICATION);
+        localResource.setResource(ConverterUtils.getYarnUrlFromPath(dst));
+        FileStatus destStatus = fs.getFileStatus(dst);
+        localResource.setTimestamp(destStatus.getModificationTime());
+        localResource.setSize(destStatus.getLen());
+        return localResource;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
new file mode 100644
index 0000000..8533371
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/util/ClasspathUtils.java
@@ -0,0 +1,145 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.v2.util.MRApps;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class ClasspathUtils {
+    private static boolean usingMiniYarnCluster = false;
+    private static final List<String> CLASSPATH_ENTRIES = Arrays.asList(
+            ApplicationConstants.Environment.PWD.$(),
+            MRJobConfig.JOB_JAR + Path.SEPARATOR + MRJobConfig.JOB_JAR,
+            MRJobConfig.JOB_JAR + Path.SEPARATOR + "classes" + Path.SEPARATOR,
+            MRJobConfig.JOB_JAR + Path.SEPARATOR + "lib" + Path.SEPARATOR + "*",
+            ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + "*"
+    );
+
+    @VisibleForTesting
+    public static void setUsingMiniYarnCluster(boolean useMiniYarnCluster) {
+        usingMiniYarnCluster = useMiniYarnCluster;
+    }
+
+    // Adapted from MRApps#setClasspath.  Adds Yarn, HDFS, Common, and distributed cache jars.
+    public static void setupClasspath(Map<String, String> env, Configuration conf) throws IOException {
+        // Propagate the system classpath when using the mini cluster
+        if (usingMiniYarnCluster) {
+            MRApps.addToEnvironment(
+                    env,
+                    ApplicationConstants.Environment.CLASSPATH.name(),
+                    System.getProperty("java.class.path"), conf);
+        }
+
+        for (String entry : CLASSPATH_ENTRIES) {
+            MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(), entry, conf);
+        }
+
+        // a * in the classpath will only find a .jar, so we need to filter out
+        // all .jars and add everything else
+        addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getFileClassPaths(conf),
+                org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheFiles(conf),
+                conf,
+                env, ApplicationConstants.Environment.PWD.$());
+        addToClasspathIfNotJar(org.apache.hadoop.mapreduce.filecache.DistributedCache.getArchiveClassPaths(conf),
+                org.apache.hadoop.mapreduce.filecache.DistributedCache.getCacheArchives(conf),
+                conf,
+                env, ApplicationConstants.Environment.PWD.$());
+
+
+        boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+                MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+        for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                crossPlatform
+                        ? YarnConfiguration.DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH
+                        : YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
+            MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+                    c.trim(), conf);
+        }
+    }
+
+    // Adapted from MRApps#setClasspath
+    public static void addMapReduceToClasspath(Map<String, String> env, Configuration conf) {
+        boolean crossPlatform = conf.getBoolean(MRConfig.MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM,
+                MRConfig.DEFAULT_MAPREDUCE_APP_SUBMISSION_CROSS_PLATFORM);
+
+        for (String c : conf.getStrings(MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
+                crossPlatform ?
+                        StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_CROSS_PLATFORM_APPLICATION_CLASSPATH)
+                        : StringUtils.getStrings(MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH))) {
+            MRApps.addToEnvironment(env, ApplicationConstants.Environment.CLASSPATH.name(),
+                    c.trim(), conf);
+        }
+    }
+
+    // Borrowed from MRApps#addToClasspathIfNotJar
+    private static void addToClasspathIfNotJar(Path[] paths,
+                                               URI[] withLinks, Configuration conf,
+                                               Map<String, String> environment,
+                                               String classpathEnvVar) throws IOException {
+        if (paths != null) {
+            HashMap<Path, String> linkLookup = new HashMap<Path, String>();
+            if (withLinks != null) {
+                for (URI u: withLinks) {
+                    Path p = new Path(u);
+                    FileSystem remoteFS = p.getFileSystem(conf);
+                    p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+                            remoteFS.getWorkingDirectory()));
+                    String name = (null == u.getFragment())
+                            ? p.getName() : u.getFragment();
+                    if (!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+                        linkLookup.put(p, name);
+                    }
+                }
+            }
+
+            for (Path p : paths) {
+                FileSystem remoteFS = p.getFileSystem(conf);
+                p = remoteFS.resolvePath(p.makeQualified(remoteFS.getUri(),
+                        remoteFS.getWorkingDirectory()));
+                String name = linkLookup.get(p);
+                if (name == null) {
+                    name = p.getName();
+                }
+                if(!name.toLowerCase(Locale.ENGLISH).endsWith(".jar")) {
+                    MRApps.addToEnvironment(
+                            environment,
+                            classpathEnvVar,
+                            ApplicationConstants.Environment.PWD.$() + Path.SEPARATOR + name, conf);
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 6c2f7d8..5f4645c 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1782,31 +1782,6 @@ will be the requeue interval for the actions which are waiting for a long time w
     </property>
 
     <property>
-        <name>oozie.action.launcher.mapreduce.job.ubertask.enable</name>
-        <value>true</value>
-        <description>
-            Enables Uber Mode for the launcher job in YARN/Hadoop 2 (no effect in Hadoop 1) for all action types by default.
-            This can be overridden on a per-action-type basis by setting
-            oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site.xml (where #action-type# is the action
-            type; for example, "pig").  And that can be overridden on a per-action basis by setting
-            oozie.launcher.mapreduce.job.ubertask.enable in an action's configuration section in a workflow.  In summary, the
-            priority is this:
-            1. action's configuration section in a workflow
-            2. oozie.action.#action-type#.launcher.mapreduce.job.ubertask.enable in oozie-site
-            3. oozie.action.launcher.mapreduce.job.ubertask.enable in oozie-site
-        </description>
-    </property>
-
-    <property>
-        <name>oozie.action.shell.launcher.mapreduce.job.ubertask.enable</name>
-        <value>false</value>
-        <description>
-            The Shell action may have issues with the $PATH environment when using Uber Mode, and so Uber Mode is disabled by
-            default for it.  See oozie.action.launcher.mapreduce.job.ubertask.enable
-        </description>
-    </property>
-
-    <property>
         <name>oozie.action.shell.setup.hadoop.conf.dir</name>
         <value>false</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/fea512cf/core/src/test/java/org/apache/oozie/QueryServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/QueryServlet.java b/core/src/test/java/org/apache/oozie/QueryServlet.java
new file mode 100644
index 0000000..8789438
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/QueryServlet.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URLDecoder;
+
+/**
+ * Servlet that keeps track of the last query string it recieved
+ */
+public class QueryServlet extends HttpServlet {
+
+    public static String lastQueryString = null;
+
+    protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
+        lastQueryString = URLDecoder.decode(request.getQueryString(), "UTF-8");
+        response.setStatus(HttpServletResponse.SC_OK);
+    }
+
+}