You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:27:57 UTC

[05/10] oozie git commit: OOZIE-1770 Create Oozie Application Master for YARN (asasvari, pbacsko, rkanter, gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..1f7e5b2
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.oozie.QueryServlet;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
+import org.apache.oozie.command.wf.HangServlet;
+import org.apache.oozie.test.EmbeddedServletContainer;
+import org.apache.oozie.test.XTestCase;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+import java.net.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+// A lot of this adapted from org.apache.hadoop.mapreduce.v2.app.TestJobEndNotifier and org.apache.hadoop.mapred.TestJobEndNotifier
+public class TestLauncherAMCallbackNotifier extends XTestCase {
+    private EmbeddedServletContainer container;
+
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        QueryServlet.lastQueryString = null;
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        if (container != null) {
+            container.stop();
+        }
+
+        super.tearDown();
+    }
+
+    public void testConfiguration() throws Exception {
+        Configuration conf = new Configuration(false);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "0");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "10");
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(0, cn.numTries);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(1, cn.numTries);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "20");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(11, cn.numTries);  //11 because number of _retries_ is 10
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(1000, cn.waitInterval);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "10000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(5000, cn.waitInterval);
+        //Test negative numbers are set to default
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "-10");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(5000, cn.waitInterval);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_TIMEOUT, "1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(1000, cn.timeout);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:someport");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "socks@somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "SOCKS@somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "sfafn@somehost:1000");
+        cn = new LauncherAMCallbackNotifier(conf);
+        assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+    }
+
+    public void testNotifyRetries() throws InterruptedException {
+        Configuration conf = new Configuration(false);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, "http://nonexistent");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+        LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+
+        long start = System.currentTimeMillis();
+        cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
+        long end = System.currentTimeMillis();
+        Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+        Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "3");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "3");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "3000");
+
+        cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+        start = System.currentTimeMillis();
+        cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
+        end = System.currentTimeMillis();
+        Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce();
+        Assert.assertTrue("Should have taken more than 9 seconds but it only took " + (end - start), end - start >= 9000);
+    }
+
+    public void testNotifyTimeout() throws Exception {
+        Map<String, String> params = new HashMap<String, String>();
+        params.put(HangServlet.SLEEP_TIME_MS, "1000000");
+        Configuration conf = setupEmbeddedContainer(HangServlet.class, "/hang/*", "/hang/*", params);
+
+        LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+        long start = System.currentTimeMillis();
+        cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
+        long end = System.currentTimeMillis();
+        Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+        Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+    }
+
+    public void testNotify() throws Exception {
+        Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null);
+
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+        assertNull(QueryServlet.lastQueryString);
+        cn.notifyURL(OozieActionResult.SUCCEEDED);
+        waitForCallbackAndCheckResult(FinalApplicationStatus.SUCCEEDED.toString());
+    }
+
+    public void testNotifyBackgroundActionWhenSubmitSucceeds() throws Exception {
+        Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null);
+
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+        assertNull(QueryServlet.lastQueryString);
+        cn.notifyURL(OozieActionResult.RUNNING);
+        waitForCallbackAndCheckResult(OozieActionResult.RUNNING.toString());
+    }
+
+    public void testNotifyBackgroundActionWhenSubmitFailsWithFailed() throws Exception {
+        Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null);
+
+        LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+        assertNull(QueryServlet.lastQueryString);
+        cn.notifyURL(OozieActionResult.FAILED);
+        waitForCallbackAndCheckResult(FinalApplicationStatus.FAILED.toString());
+    }
+
+    private Configuration setupEmbeddedContainer(Class<?> servletClass, String servletEndPoint,
+            String servletUrl, Map<String, String> params) throws Exception {
+        container = new EmbeddedServletContainer("test");
+        if (servletEndPoint != null) {
+            if (params != null) {
+                container.addServletEndpoint(servletEndPoint, servletClass, params);
+            } else {
+                container.addServletEndpoint(servletEndPoint, servletClass);
+            }
+        }
+        container.start();
+
+        Configuration conf = new Configuration(false);
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL(servletUrl));
+        conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+        return conf;
+    }
+
+    private void waitForCallbackAndCheckResult(final String expectedResult) {
+        waitFor(5000, new Predicate() {
+            @Override
+            public boolean evaluate() throws Exception {
+                return ("status=" + expectedResult).equals(QueryServlet.lastQueryString);
+            }
+        });
+
+        assertEquals("status="  + expectedResult, QueryServlet.lastQueryString);
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
deleted file mode 100644
index 4cda615..0000000
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.action.hadoop;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.WorkflowAppService;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.jdom.Element;
-
-import java.io.File;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.StringReader;
-import java.io.Writer;
-import java.io.OutputStreamWriter;
-
-public class TestMapReduceActionError extends ActionExecutorTestCase {
-
-    @Override
-    protected void setSystemProps() throws Exception {
-        super.setSystemProps();
-        setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName());
-    }
-
-    private Context createContext(String actionXml) throws Exception {
-        JavaActionExecutor ae = new JavaActionExecutor();
-
-        Path appJarPath = new Path("lib/test.jar");
-        File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class);
-        InputStream is = new FileInputStream(jarFile);
-        OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar"));
-        IOUtils.copyStream(is, os);
-
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-        action.setConf(actionXml);
-
-        return new Context(wf, action);
-    }
-
-    private RunningJob submitAction(Context context) throws Exception {
-        MapReduceActionExecutor ae = new MapReduceActionExecutor();
-
-        WorkflowAction action = context.getAction();
-
-        ae.prepareActionDir(getFileSystem(), context);
-        ae.submitLauncher(getFileSystem(), context, action);
-
-        String jobId = action.getExternalId();
-        String jobTracker = action.getTrackerUri();
-        String consoleUrl = action.getConsoleUrl();
-        assertNotNull(jobId);
-        assertNotNull(jobTracker);
-        assertNotNull(consoleUrl);
-
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf =
-                new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString()));
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        conf.set("mapreduce.framework.name", "yarn");
-        JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
-    }
-
-    private void _testSubmit(String actionXml) throws Exception {
-
-        Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);
-        String launcherId = context.getAction().getExternalId();
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-
-        MapReduceActionExecutor ae = new MapReduceActionExecutor();
-        ae.check(context, context.getAction());
-
-        JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
-        String user = conf.get("user.name");
-        String group = conf.get("group.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-        final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalId()));
-
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return mrJob.isComplete();
-            }
-        });
-        ae.check(context, context.getAction());
-
-        assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
-
-        ae.end(context, context.getAction());
-        assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
-        assertTrue(context.getAction().getErrorMessage().contains("already exists"));
-    }
-
-    public void testMapReduce() throws Exception {
-        FileSystem fs = getFileSystem();
-
-        Path inputDir = new Path(getFsTestCaseDir(), "input");
-        Path outputDir = new Path(getFsTestCaseDir(), "output1");
-
-        Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
-        w.write("dummy\n");
-        w.write("dummy\n");
-        Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt")));
-        ow.write("dummy\n");
-        ow.write("dummy\n");
-        ow.close();
-
-        String actionXml = "<map-reduce>" +
-                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
-                "<name-node>" + getNameNodeUri() + "</name-node>" +
-                "<configuration>" +
-                "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() +
-                "</value></property>" +
-                "<property><name>mapred.reducer.class</name><value>" + MapperReducerForTest.class.getName() +
-                "</value></property>" +
-                "<property><name>mapred.input.dir</name><value>" + inputDir + "</value></property>" +
-                "<property><name>mapred.output.dir</name><value>" + outputDir + "</value></property>" +
-                "</configuration>" +
-                "</map-reduce>";
-        _testSubmit(actionXml);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
index 5bc7d00..551adff 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
@@ -33,19 +33,12 @@ import java.util.regex.Matcher;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorJobBean;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
-import org.apache.oozie.action.hadoop.MapperReducerForTest;
-import org.apache.oozie.action.hadoop.OozieJobInfo;
 import org.apache.oozie.client.Job;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.command.CommandException;
@@ -53,19 +46,16 @@ import org.apache.oozie.command.bundle.BundleStartXCommand;
 import org.apache.oozie.command.bundle.BundleSubmitXCommand;
 import org.apache.oozie.command.wf.ActionXCommand;
 import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
-import org.apache.oozie.command.wf.JobXCommand;
 import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
+import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
 import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
-import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
-import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.UUIDService.ApplicationType;
 import org.apache.oozie.test.XDataTestCase;
 import org.apache.oozie.util.IOUtils;
@@ -163,14 +153,12 @@ public class TestOozieJobInfo extends XDataTestCase {
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfbean, actionList.get(1), false,
                 false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf()));
+        Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf()));
         String user = conf.get("user.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-        String launcherId = actionList.get(1).getExternalId();
 
-        final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
-        FileSystem fs = context.getAppFileSystem();
-        Configuration jobXmlConf = new XConfiguration(fs.open(new Path(launcherJob.getJobFile())));
+        FileSystem fs = getFileSystem();
+        Configuration jobXmlConf = new XConfiguration(fs.open(getPathToWorkflowResource(
+                user, wfbean, services, context, LauncherAM.LAUNCHER_JOB_CONF_XML)));
         String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY);
 
         // BUNDLE_ID;BUNDLE_NAME;COORDINATOR_NAME;COORDINATOR_NOMINAL_TIME;
@@ -186,7 +174,6 @@ public class TestOozieJobInfo extends XDataTestCase {
         assertTrue(jobInfo.contains(",testing=test,"));
         assertTrue(jobInfo.contains(",coord.nominal.time="));
         assertTrue(jobInfo.contains("launcher=true"));
-
     }
 
     protected void setCoordConf(Configuration jobConf) throws IOException {

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
index df9e939..89aeab6 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
@@ -24,6 +24,9 @@ import org.apache.oozie.service.Services;
 import org.apache.oozie.test.XFsTestCase;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.mapred.JobConf;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
 
 public class TestPrepareActionsDriver extends XFsTestCase {
 
@@ -40,7 +43,7 @@ public class TestPrepareActionsDriver extends XFsTestCase {
     }
 
     // Test to check if prepare action is performed as expected when the prepare XML block is a valid one
-    public void testDoOperationsWithValidXML() throws LauncherException, IOException {
+    public void testDoOperationsWithValidXML() throws LauncherException, IOException, ParserConfigurationException, SAXException {
         Path actionDir = getFsTestCaseDir();
         FileSystem fs = getFileSystem();
         Path newDir = new Path(actionDir, "newDir");
@@ -52,12 +55,12 @@ public class TestPrepareActionsDriver extends XFsTestCase {
         }
 
         JobConf conf = createJobConf();
-        LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
+        LauncherHelper.setupLauncherURIHandlerConf(conf);
         PrepareActionsDriver.doOperations(prepareXML, conf);
         assertTrue(fs.exists(actionDir));
     }
 
-    // Test to check if LauncherException is thrown when the prepare XML block is invalid
+    // Test to check if Exception is thrown when the prepare XML block is invalid
     public void testDoOperationsWithInvalidXML() throws LauncherException, IOException {
         Path actionDir = getFsTestCaseDir();
         FileSystem fs = getFileSystem();
@@ -72,14 +75,12 @@ public class TestPrepareActionsDriver extends XFsTestCase {
         try {
             prepareXML = "prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>";
             JobConf conf = createJobConf();
-            LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
+            LauncherHelper.setupLauncherURIHandlerConf(conf);
             PrepareActionsDriver.doOperations(prepareXML, conf);
             fail("Expected to catch an exception but did not encounter any");
-        } catch (LauncherException le) {
-            assertEquals(le.getCause().getClass(), org.xml.sax.SAXParseException.class);
-            assertEquals(le.getMessage(), "Content is not allowed in prolog.");
-        } catch(Exception ex){
-            fail("Expected a LauncherException but received an Exception");
+        } catch (Exception ex) {
+            assertEquals(ex.getClass(), org.xml.sax.SAXParseException.class);
+            assertEquals(ex.getMessage(), "Content is not allowed in prolog.");
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
index f12927b..72be0a2 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
@@ -26,16 +26,10 @@ import java.util.Map;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.util.Shell;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.ActionService;
-import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.util.PropertiesUtils;
@@ -294,14 +288,8 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
 
         Context context = createContext(actionXml);
         // Submit the action
-        final RunningJob launcherJob = submitAction(context);
-        waitFor(180 * 1000, new Predicate() { // Wait for the external job to
-                    // finish
-                    public boolean evaluate() throws Exception {
-                        return launcherJob.isComplete();
-                    }
-                });
-
+        final String launcherId = submitAction(context);
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
         ShellActionExecutor ae = new ShellActionExecutor();
         WorkflowAction action = context.getAction();
         ae.check(context, action);
@@ -323,24 +311,14 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
     private WorkflowAction _testSubmit(String actionXml, boolean checkForSuccess, String capture_output) throws Exception {
 
         Context context = createContext(actionXml);
-        final RunningJob launcherJob = submitAction(context);// Submit the
-        // action
-        String launcherId = context.getAction().getExternalId(); // Get LM id
-        waitFor(180 * 1000, new Predicate() { // Wait for the external job to
-                    // finish
-                    public boolean evaluate() throws Exception {
-                        return launcherJob.isComplete();
-                    }
-                });
-        // Thread.sleep(2000);
-        assertTrue(launcherJob.isSuccessful());
-
-        sleep(2000);// Wait more to make sure no ID swap happens
+        final String launcherId = submitAction(context);// Submit the action
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
         Configuration conf = new XConfiguration();
         conf.set("user.name", getTestUser());
-        Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+        assertFalse(LauncherHelper.hasIdSwap(actionData));
 
         ShellActionExecutor ae = new ShellActionExecutor();
         ae.check(context, context.getAction());
@@ -399,14 +377,13 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
      * @return The RunningJob of the Launcher Mapper
      * @throws Exception
      */
-    private RunningJob submitAction(Context context) throws Exception {
+    private String submitAction(Context context) throws Exception {
         ShellActionExecutor ae = new ShellActionExecutor();
 
         WorkflowAction action = context.getAction();
 
         ae.prepareActionDir(getFileSystem(), context);
-        ae.submitLauncher(getFileSystem(), context, action); // Submit the
-        // Launcher Mapper
+        ae.submitLauncher(getFileSystem(), context, action); // Submit the action
 
         String jobId = action.getExternalId();
         String jobTracker = action.getTrackerUri();
@@ -416,41 +393,6 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
         assertNotNull(jobTracker);
         assertNotNull(consoleUrl);
 
-        Element e = XmlUtils.parseXml(action.getConf());
-        XConfiguration conf = new XConfiguration();
-        conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
-        conf.set("fs.default.name", e.getChildTextTrim("name-node"));
-        conf.set("user.name", context.getProtoActionConf().get("user.name"));
-        conf.set("group.name", getTestGroup());
-
-        JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
-        XConfiguration.copy(conf, jobConf);
-        String user = jobConf.get("user.name");
-        String group = jobConf.get("group.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
-        final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
-        assertNotNull(runningJob);
-        return runningJob;
-    }
-
-    public void testShellMainPathInUber() throws Exception {
-        Services.get().getConf().setBoolean("oozie.action.shell.launcher.mapreduce.job.ubertask.enable", true);
-
-        Element actionXml = XmlUtils.parseXml("<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
-                + "<name-node>" + getNameNodeUri() + "</name-node>" + "<exec>script.sh</exec>"
-                + "<argument>a=A</argument>" + "<argument>b=B</argument>" + "</shell>");
-        ShellActionExecutor ae = new ShellActionExecutor();
-        XConfiguration protoConf = new XConfiguration();
-        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
-        WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
-        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
-        action.setType(ae.getType());
-
-        Context context = new Context(wf, action);
-        JobConf launcherConf = new JobConf();
-        launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, launcherConf);
-        // env
-        assertEquals("PATH=.:$PATH", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
+        return jobId;
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
index e757e54..a7d6c18 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
@@ -25,7 +25,6 @@ import java.io.FileWriter;
 import java.io.Writer;
 import java.util.Properties;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.oozie.util.XConfiguration;
 
 //Test cases are mainly implemented in the Base class
@@ -53,8 +52,8 @@ public class TestShellMain extends ShellTestCase {
 
         jobConf.set(ShellMain.CONF_OOZIE_SHELL_EXEC, SHELL_COMMAND_NAME);
         String[] args = new String[] { SHELL_COMMAND_SCRIPTFILE_OPTION, script.toString(), "A", "B" };
-        MapReduceMain.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ARGS, args);
-        MapReduceMain.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ENVS,
+        ActionUtils.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ARGS, args);
+        ActionUtils.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ENVS,
                 new String[] { "var1=value1", "var2=value2" });
 
         File actionXml = new File(getTestCaseDir(), "action.xml");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
index dbc160f..9c7064b 100644
--- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
+++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
@@ -135,8 +135,8 @@ public class TestOozieCLI extends DagServletTestCase {
         String path = getTestCaseDir() + "/" + getName() + ".properties";
         Properties props = new Properties();
         props.setProperty(OozieClient.USER_NAME, getTestUser());
-        props.setProperty(XOozieClient.NN, "localhost:9000");
-        props.setProperty(XOozieClient.JT, "localhost:9001");
+        props.setProperty(XOozieClient.NN, "localhost:8020");
+        props.setProperty(XOozieClient.RM, "localhost:8032");
         props.setProperty("oozie.libpath", appPath);
         props.setProperty("mapred.output.dir", appPath);
         props.setProperty("a", "A");
@@ -155,7 +155,7 @@ public class TestOozieCLI extends DagServletTestCase {
         props.setProperty(OozieClient.APP_PATH, appPath);
         props.setProperty(OozieClient.RERUN_SKIP_NODES, "node");
         props.setProperty(XOozieClient.NN, "localhost:9000");
-        props.setProperty(XOozieClient.JT, "localhost:9001");
+        props.setProperty(XOozieClient.RM, "localhost:9001");
         if (useNewAPI) {
             props.setProperty("mapreduce.map.class", "mapper.class");
             props.setProperty("mapreduce.reduce.class", "reducer.class");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
index 51ae9e8..b4bce60 100644
--- a/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
+++ b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
@@ -29,8 +29,6 @@ import org.apache.oozie.servlet.MockDagEngineService;
 import org.apache.oozie.servlet.V1JobsServlet;
 import org.apache.oozie.servlet.V1AdminServlet;
 
-import java.io.File;
-
 public class TestWorkflowXClient extends DagServletTestCase {
 
     static {
@@ -60,7 +58,7 @@ public class TestWorkflowXClient extends DagServletTestCase {
                 Path libPath = new Path(getFsTestCaseDir(), "lib");
                 getFileSystem().mkdirs(libPath);
                 conf.setProperty(OozieClient.LIBPATH, libPath.toString());
-                conf.setProperty(XOozieClient.JT, "localhost:9001");
+                conf.setProperty(XOozieClient.RM, "localhost:9001");
                 conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000");
                 String[] params = new String[]{"INPUT=input.txt"};
 
@@ -90,7 +88,7 @@ public class TestWorkflowXClient extends DagServletTestCase {
                 getFileSystem().mkdirs(libPath);
                 System.out.println(libPath.toString());
                 conf.setProperty(OozieClient.LIBPATH, libPath.toString());
-                conf.setProperty(XOozieClient.JT, "localhost:9001");
+                conf.setProperty(XOozieClient.RM, "localhost:9001");
                 conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000");
                 String[] params = new String[]{"NAME=test"};
 
@@ -120,7 +118,7 @@ public class TestWorkflowXClient extends DagServletTestCase {
                 getFileSystem().mkdirs(libPath);
                 System.out.println(libPath.toString());
                 conf.setProperty(OozieClient.LIBPATH, libPath.toString());
-                conf.setProperty(XOozieClient.JT, "localhost:9001");
+                conf.setProperty(XOozieClient.RM, "localhost:9001");
                 conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000");
 
                 assertEquals(MockDagEngineService.JOB_ID + wfCount + MockDagEngineService.JOB_ID_END,
@@ -154,9 +152,9 @@ public class TestWorkflowXClient extends DagServletTestCase {
                     fail("submit client without JT should throw exception");
                 }
                 catch (RuntimeException exception) {
-                    assertEquals("java.lang.RuntimeException: jobtracker is not specified in conf", exception.toString());
+                    assertEquals("java.lang.RuntimeException: Resource manager is not specified in conf", exception.toString());
                 }
-                conf.setProperty(XOozieClient.JT, "localhost:9001");
+                conf.setProperty(XOozieClient.RM, "localhost:9001");
                 try {
                     wc.submitMapReduce(conf);
                     fail("submit client without NN should throw exception");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index c071000..b8eb15d 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -19,8 +19,10 @@
 package org.apache.oozie.command.coord;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.oozie.CoordinatorActionBean;
 import org.apache.oozie.CoordinatorJobBean;
@@ -84,6 +86,11 @@ public class TestCoordChangeXCommand extends XDataTestCase {
 
         @Override
         public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay){return false;}
+
+        @Override
+        public Set<String> getInterruptTypes() {
+            return Collections.emptySet();
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
index 3344cf9..c1bca16 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
@@ -18,6 +18,8 @@
 
 package org.apache.oozie.command.wf;
 
+import org.apache.oozie.util.XLog;
+
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
@@ -25,14 +27,27 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.IOException;
 
 /**
- * Servlet that 'hangs' for 200 ms. Used by TestNotificationXCommand
+ * Servlet that 'hangs' for some amount of time (200ms) by default.
+ * The time can be configured by setting {@link HangServlet#SLEEP_TIME_MS} as an init parameter for the servlet.
  */
 public class HangServlet extends HttpServlet {
 
+    public static final String SLEEP_TIME_MS = "sleep_time_ms";
+
     protected void doGet(HttpServletRequest request, HttpServletResponse response)
         throws ServletException, IOException {
         try {
-            Thread.sleep(200);
+            long time = 200;
+            String sleeptime = getInitParameter(SLEEP_TIME_MS);
+            if (sleeptime != null) {
+                try {
+                    time = Long.parseLong(sleeptime);
+                } catch (NumberFormatException nfe) {
+                    XLog.getLog(HangServlet.class).error("Invalid sleep time, using default (200)", nfe);
+                }
+            }
+            XLog.getLog(HangServlet.class).info("Sleeping for {0} ms", time);
+            Thread.sleep(time);
         }
         catch (Exception ex) {
             //NOP

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index 5898d1a..a5128a8 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -23,18 +23,17 @@ import java.io.Writer;
 import java.util.Date;
 import java.util.Map;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
-import org.apache.oozie.ForTestingActionExecutor;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.action.ActionExecutor;
 import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.hadoop.LauncherMapperHelper;
+import org.apache.oozie.action.hadoop.LauncherHelper;
 import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
 import org.apache.oozie.client.WorkflowAction;
@@ -45,7 +44,6 @@ import org.apache.oozie.executor.jpa.JPAExecutorException;
 import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.service.ActionCheckerService;
 import org.apache.oozie.service.ActionService;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorService;
@@ -259,30 +257,25 @@ public class TestActionCheckXCommand extends XDataTestCase {
 
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
+        Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
         String user = conf.get("user.name");
         JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
 
         String launcherId = action.getExternalId();
 
-        final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         new ActionCheckXCommand(action.getId()).call();
         action = jpaService.execute(wfActionGetCmd);
-        String mapperId = action.getExternalId();
+        String externalId = action.getExternalId();
         String childId = action.getExternalChildIDs();
 
-        assertTrue(launcherId.equals(mapperId));
+        assertEquals("LauncherId", launcherId, externalId);
+        assertNotNull(childId);
 
         final RunningJob mrJob = jobClient.getJob(JobID.forName(childId));
 
@@ -297,7 +290,6 @@ public class TestActionCheckXCommand extends XDataTestCase {
         action = jpaService.execute(wfActionGetCmd);
 
         assertEquals("SUCCEEDED", action.getExternalStatus());
-
     }
 
     private static class ErrorCheckActionExecutor extends ActionExecutor {
@@ -416,7 +408,7 @@ public class TestActionCheckXCommand extends XDataTestCase {
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job2, action1, false, false);
         WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf()));
+        Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf()));
         String user = conf.get("user.name");
         JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
 
@@ -434,9 +426,9 @@ public class TestActionCheckXCommand extends XDataTestCase {
             }
         });
         assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         new ActionCheckXCommand(actionId).call();
         WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
@@ -488,7 +480,7 @@ public class TestActionCheckXCommand extends XDataTestCase {
 
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job0, action1, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
+        Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
         String user = conf.get("user.name");
         JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
 
@@ -501,9 +493,9 @@ public class TestActionCheckXCommand extends XDataTestCase {
             }
         });
         assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         new ActionCheckXCommand(action1.getId()).call();
         WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
@@ -568,9 +560,9 @@ public class TestActionCheckXCommand extends XDataTestCase {
         });
 
         assertTrue(launcherJob2.isSuccessful());
-        actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
 
         new ActionCheckXCommand(actionId).call();
         WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
index ea90c08..80c5d54 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
@@ -28,13 +28,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.LauncherMapperHelper;
+import org.apache.oozie.action.hadoop.LauncherHelper;
 import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
 import org.apache.oozie.client.OozieClient;
@@ -50,7 +46,6 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQ
 import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
 import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.InstrumentationService;
 import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.LiteWorkflowStoreService;
@@ -162,23 +157,14 @@ public class TestActionStartXCommand extends XDataTestCase {
 
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
-        String user = conf.get("user.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+        Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
 
         String launcherId = action.getExternalId();
 
-        final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
-
-        waitFor(120 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
     }
 
     public void testActionStartToCheckRetry() throws Exception {
@@ -238,26 +224,15 @@ public class TestActionStartXCommand extends XDataTestCase {
 
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
+        Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
         String user = conf.get("user.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
 
         String launcherId = action.getExternalId();
 
-        // retrieve launcher job
-        final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
-
-        // time out after 120 seconds unless launcher job succeeds
-        waitFor(240 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        // check if launcher job succeeds
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
index 43edf5e..98c94a7 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
@@ -21,7 +21,7 @@ package org.apache.oozie.command.wf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
 import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.test.XFsTestCase;
 import org.apache.oozie.util.XLog;
@@ -45,7 +45,7 @@ public class TestSubmitHiveXCommand extends XFsTestCase {
     public void testWFXmlGeneration() throws Exception {
         Configuration conf = new Configuration();
 
-        conf.set(XOozieClient.JT, "jobtracker");
+        conf.set(XOozieClient.RM, "jobtracker");
         conf.set(XOozieClient.NN, "namenode");
         conf.set(OozieClient.LIBPATH, "libpath");
 
@@ -54,9 +54,9 @@ public class TestSubmitHiveXCommand extends XFsTestCase {
 
         String hiveArgsStr = "-a aaa -b bbb -c ccc -M -Da=aaa -Db=bbb -param input=abc";
         String[] args = hiveArgsStr.split(" ");
-        MapReduceMain.setStrings(conf, XOozieClient.HIVE_OPTIONS, args);
+        ActionUtils.setStrings(conf, XOozieClient.HIVE_OPTIONS, args);
         String[] params = new String[]{"INPUT=/some/path", "OUTPUT=/some/other/path", "abc=xyz"};
-        MapReduceMain.setStrings(conf, XOozieClient.HIVE_SCRIPT_PARAMS, params);
+        ActionUtils.setStrings(conf, XOozieClient.HIVE_SCRIPT_PARAMS, params);
 
         SubmitHiveXCommand submitHiveCmd = new SubmitHiveXCommand(conf);
         String xml = submitHiveCmd.getWorkflowXml(conf);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
index 5bc5747..388ff94 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
@@ -51,7 +51,7 @@ public class TestSubmitMRXCommand extends XFsTestCase {
     public void testWFXmlGeneration() throws Exception {
         Configuration conf = new Configuration(false);
 
-        conf.set(XOozieClient.JT, "jobtracker");
+        conf.set(XOozieClient.RM, "jobtracker");
         conf.set(XOozieClient.NN, "namenode");
         conf.set(OozieClient.LIBPATH, "libpath");
 
@@ -97,7 +97,7 @@ public class TestSubmitMRXCommand extends XFsTestCase {
     public void testWFXmlGenerationNegative1() throws Exception {
         Configuration conf = new Configuration();
 
-        conf.set(XOozieClient.JT, "jobtracker");
+        conf.set(XOozieClient.RM, "jobtracker");
         conf.set(XOozieClient.NN, "namenode");
         // conf.set(XOozieClient.LIBPATH, "libpath");
 
@@ -118,8 +118,8 @@ public class TestSubmitMRXCommand extends XFsTestCase {
     public void testWFXmlGenerationNewConfigProps() throws Exception {
         try {
             Configuration conf = new Configuration(false);
-            conf.set(XOozieClient.NN_2, "new_NN");
-            conf.set(XOozieClient.JT_2, "new_JT");
+            conf.set(XOozieClient.NN, "new_NN");
+            conf.set(XOozieClient.RM, "new_JT");
             conf.set("mapred.mapper.class", "TestMapper");
             conf.set("mapred.reducer.class", "TestReducer");
             conf.set("mapred.input.dir", "testInput");

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
index 5a1de25..c3cd1aa 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
@@ -21,7 +21,7 @@ package org.apache.oozie.command.wf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
 import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.test.XFsTestCase;
 import org.apache.oozie.util.XLog;
@@ -46,7 +46,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
     public void testWFXmlGeneration1() throws Exception {
         Configuration conf = new Configuration();
 
-        conf.set(XOozieClient.JT, "jobtracker");
+        conf.set(XOozieClient.RM, "jobtracker");
         conf.set(XOozieClient.NN, "namenode");
         conf.set(OozieClient.LIBPATH, "libpath");
 
@@ -55,9 +55,9 @@ public class TestSubmitPigXCommand extends XFsTestCase {
 
         String pigArgsStr = "-a aaa -b bbb -c ccc -M -Da=aaa -Db=bbb -param input=abc";
         String[] args = pigArgsStr.split(" ");
-        MapReduceMain.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
+        ActionUtils.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
         String[] params = new String[]{"INPUT=/some/path", "OUTPUT=/some/other/path", "abc=xyz"};
-        MapReduceMain.setStrings(conf, XOozieClient.PIG_SCRIPT_PARAMS, params);
+        ActionUtils.setStrings(conf, XOozieClient.PIG_SCRIPT_PARAMS, params);
 
         SubmitPigXCommand submitPigCmd = new SubmitPigXCommand(conf);
         String xml = submitPigCmd.getWorkflowXml(conf);
@@ -118,7 +118,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
     public void testWFXmlGeneration2() throws Exception {
         Configuration conf = new Configuration();
 
-        conf.set(XOozieClient.JT, "jobtracker");
+        conf.set(XOozieClient.RM, "jobtracker");
         conf.set(XOozieClient.NN, "namenode");
         conf.set(OozieClient.LIBPATH, "libpath");
 
@@ -128,7 +128,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
         String[] args = new String[2];
         args[0] = "-a";
         args[1] = "aaa bbb";
-        MapReduceMain.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
+        ActionUtils.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
 
         SubmitPigXCommand submitPigCmd = new SubmitPigXCommand(conf);
         String xml = submitPigCmd.getWorkflowXml(conf);
@@ -169,7 +169,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
     public void testWFXmlGenerationNegative1() throws Exception {
         Configuration conf = new Configuration();
 
-        conf.set(XOozieClient.JT, "jobtracker");
+        conf.set(XOozieClient.RM, "jobtracker");
         conf.set(XOozieClient.NN, "namenode");
         // conf.set(XOozieClient.LIBPATH, "libpath");
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
index 49b5028..f2f248a 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
@@ -21,7 +21,7 @@ package org.apache.oozie.command.wf;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.client.OozieClient;
 import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
 import org.apache.oozie.client.XOozieClient;
 import org.apache.oozie.test.XFsTestCase;
 import org.apache.oozie.util.XLog;
@@ -46,7 +46,7 @@ public class TestSubmitSqoopXCommand extends XFsTestCase {
     public void testWFXmlGeneration() throws Exception {
         Configuration conf = new Configuration();
 
-        conf.set(XOozieClient.JT, "jobtracker");
+        conf.set(XOozieClient.RM, "jobtracker");
         conf.set(XOozieClient.NN, "namenode");
         conf.set(OozieClient.LIBPATH, "libpath");
 
@@ -54,7 +54,7 @@ public class TestSubmitSqoopXCommand extends XFsTestCase {
 
         String sqoopArgsStr = "-Da=aaa -Db=bbb";
         String[] args = sqoopArgsStr.split(" ");
-        MapReduceMain.setStrings(conf, XOozieClient.SQOOP_OPTIONS, args);
+        ActionUtils.setStrings(conf, XOozieClient.SQOOP_OPTIONS, args);
 
         SubmitSqoopXCommand submitSqoopCmd = new SubmitSqoopXCommand(conf);
         String xml = submitSqoopCmd.getWorkflowXml(conf);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index 72f0114..ef75f14 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -18,19 +18,18 @@
 
 package org.apache.oozie.command.wf;
 
-import java.io.StringReader;
 import java.net.URI;
 import java.util.Date;
+import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.examples.SleepJob;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.hadoop.LauncherMain;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
 import org.apache.oozie.client.WorkflowAction;
 import org.apache.oozie.client.WorkflowJob;
@@ -42,11 +41,10 @@ import org.apache.oozie.service.JPAService;
 import org.apache.oozie.service.Services;
 import org.apache.oozie.service.UUIDService;
 import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.test.XTestCase.Predicate;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
 import org.apache.oozie.workflow.WorkflowInstance;
 
+import com.google.common.collect.Sets;
+
 public class TestWorkflowActionKillXCommand extends XDataTestCase {
     private Services services;
 
@@ -117,26 +115,6 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
         assertEquals(action.getExternalStatus(), "RUNNING");
     }
 
-    public void testWfActionKillChildJob() throws Exception {
-        String externalJobID = launchSleepJob(1000);
-        String childId = launchSleepJob(1000000);
-
-        WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
-        WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), externalJobID, "1",
-                WorkflowAction.Status.KILLED, childId);
-
-        new ActionKillXCommand(action.getId()).call();
-        JobClient jobClient = createJobClient();
-
-        final RunningJob mrJob = jobClient.getJob(JobID.forName(childId));
-        waitFor(60 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return mrJob.isComplete();
-            }
-        });
-        assertEquals(mrJob.getJobState(), JobStatus.KILLED);
-    }
-
     protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName,
             WorkflowAction.Status status, String childID) throws Exception {
         WorkflowActionBean action = new WorkflowActionBean();
@@ -189,9 +167,18 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
         SleepJob sleepjob = new SleepJob();
         sleepjob.setConf(jobConf);
         jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1);
+        jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob");
+        jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob");
+        System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
+
+        jobClient.submitJob(jobConf);
+        Set<ApplicationId> apps = Sets.newHashSet();
+        apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL);
+        assertEquals("Number of YARN apps", apps.size(), 1);
+
+        sleepjob.close();
 
-        final RunningJob runningJob = jobClient.submitJob(jobConf);
-        return runningJob.getID().toString();
+        return apps.iterator().next().toString();
     }
 
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
index 3c6525d..5957ad6 100644
--- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
@@ -19,7 +19,7 @@
 package org.apache.oozie.service;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.action.hadoop.CredentialsProvider;
+import org.apache.oozie.action.hadoop.CredentialsProviderFactory;
 import org.apache.oozie.action.hadoop.DistcpActionExecutor;
 import org.apache.oozie.action.hadoop.JavaActionExecutor;
 import org.apache.oozie.action.hadoop.LauncherMapper;
@@ -210,13 +210,10 @@ public class TestConfigurationService extends XTestCase {
 
         assertEquals(2048, ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA));
         assertEquals("http://0.0.0.0:11000/oozie?job=", ConfigurationService.get(JobXCommand.CONF_CONSOLE_URL));
-        assertEquals(true, ConfigurationService.getBoolean(JavaActionExecutor.CONF_HADOOP_YARN_UBER_MODE));
-        assertEquals(false, ConfigurationService.getBoolean(
-                "oozie.action.shell.launcher." + JavaActionExecutor.HADOOP_YARN_UBER_MODE));
         assertEquals(false, ConfigurationService.getBoolean(HadoopAccessorService.KERBEROS_AUTH_ENABLED));
 
         assertEquals(0, ConfigurationService.getStrings("no.defined").length);
-        assertEquals(0, ConfigurationService.getStrings(CredentialsProvider.CRED_KEY).length);
+        assertEquals(0, ConfigurationService.getStrings(CredentialsProviderFactory.CRED_KEY).length);
         assertEquals(1, ConfigurationService.getStrings(DistcpActionExecutor.CLASS_NAMES).length);
         assertEquals("distcp=org.apache.hadoop.tools.DistCp",
                 ConfigurationService.getStrings(DistcpActionExecutor.CLASS_NAMES)[0]);

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
index bbca479..a1ee004 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
@@ -18,7 +18,15 @@
 
 package org.apache.oozie.service;
 
-import org.apache.oozie.test.XTestCase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.test.XFsTestCase;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +42,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.util.XConfiguration;
 
-public class TestHadoopAccessorService extends XTestCase {
+public class TestHadoopAccessorService extends XFsTestCase {
 
     protected void setUp() throws Exception {
         super.setUp();
@@ -140,49 +148,83 @@ public class TestHadoopAccessorService extends XTestCase {
          */
         assertEquals("100", conf.get("action.testprop"));
         assertEquals("1", conf.get("default.testprop"));
-
-        // Check that properties load correctly
         assertEquals("org.apache.log4j.ConsoleAppender", conf.get("log4j.appender.oozie"));
         assertEquals("NONE, null", conf.get("log4j.logger.a"));
+    }
+
+    public void testCreateJobClient() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        JobConf conf = has.createJobConf(getJobTrackerUri());
+
+        JobClient jc = has.createJobClient(getTestUser(), conf);
+        assertNotNull(jc);
+        jc.getAllJobs();
 
+        JobConf conf2 = new JobConf(false);
+        conf2.set("mapred.job.tracker", getJobTrackerUri());
+        try {
+            has.createJobClient(getTestUser(), conf2);
+            fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+        }
+        catch (HadoopAccessorException ex) {
+            assertEquals(ErrorCode.E0903, ex.getErrorCode());
+        }
     }
 
-    public void testAccessor() throws Exception {
-        Services services = Services.get();
-        HadoopAccessorService has = services.get(HadoopAccessorService.class);
+    public void testCreateYarnClient() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
         JobConf conf = has.createJobConf(getJobTrackerUri());
-        conf.set("mapred.job.tracker", getJobTrackerUri());
-        conf.set("fs.default.name", getNameNodeUri());
 
-        URI uri = new URI(getNameNodeUri());
+        YarnClient yc = has.createYarnClient(getTestUser(), conf);
+        assertNotNull(yc);
+        yc.getApplications();
 
-        //valid user
-        String user = getTestUser();
-        String group = getTestGroup();
+        try {
+            yc = has.createYarnClient("invalid-user", conf);
+            assertNotNull(yc);
+            yc.getApplications();
+            fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
+        }
+        catch (AuthorizationException ex) {
+        }
 
-        JobClient jc = has.createJobClient(user, conf);
-        assertNotNull(jc);
-        FileSystem fs = has.createFileSystem(user, new URI(getNameNodeUri()), conf);
-        assertNotNull(fs);
-        fs = has.createFileSystem(user, uri, conf);
-        assertNotNull(fs);
+        JobConf conf2 = new JobConf(false);
+        conf2.set("yarn.resourcemanager.address", getJobTrackerUri());
+        try {
+            has.createYarnClient(getTestUser(), conf2);
+            fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+        }
+        catch (HadoopAccessorException ex) {
+            assertEquals(ErrorCode.E0903, ex.getErrorCode());
+        }
+    }
 
-        //invalid user
+    public void testCreateFileSystem() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        JobConf conf = has.createJobConf(getJobTrackerUri());
 
-        user = "invalid";
+        FileSystem fs = has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf);
+        assertNotNull(fs);
+        fs.exists(new Path(getNameNodeUri(), "/foo"));
 
         try {
-            has.createJobClient(user, conf);
-            fail();
+            fs = has.createFileSystem("invalid-user", new URI(getNameNodeUri()), conf);
+            assertNotNull(fs);
+            fs.exists(new Path(getNameNodeUri(), "/foo"));
+            fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
         }
-        catch (Throwable ex) {
+        catch (RemoteException ex) {
+            assertEquals(AuthorizationException.class.getName(), ex.getClassName());
         }
 
+        JobConf conf2 = new JobConf(false);
+        conf2.set("fs.default.name", getNameNodeUri());
         try {
-            has.createFileSystem(user, uri, conf);
-            fail();
+            has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf2);
+            fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
         }
-        catch (Throwable ex) {
+        catch (HadoopAccessorException ex) {
+            assertEquals(ErrorCode.E0903, ex.getErrorCode());
         }
     }
 
@@ -190,7 +232,7 @@ public class TestHadoopAccessorService extends XTestCase {
         HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
         JobConf jobConf = new JobConf(false);
         assertEquals(new Text("oozie mr token"), has.getMRTokenRenewerInternal(jobConf));
-        jobConf.set("mapred.job.tracker", "localhost:50300");
+        jobConf.set("yarn.resourcemanager.address", "localhost:50300");
         jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
         assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
         jobConf = new JobConf(false);
@@ -298,4 +340,21 @@ public class TestHadoopAccessorService extends XTestCase {
         }
         has.destroy();
     }
+
+    public void testCreateLocalResourceForConfigurationFile() throws Exception {
+        HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+        String filename = "foo.xml";
+        Configuration conf = has.createJobConf(getNameNodeUri());
+        conf.set("foo", "bar");
+        LocalResource lRes = has.createLocalResourceForConfigurationFile(filename, getTestUser(), conf, getFileSystem().getUri(),
+                getFsTestCaseDir());
+        assertNotNull(lRes);
+        assertEquals(LocalResourceType.FILE, lRes.getType());
+        assertEquals(LocalResourceVisibility.APPLICATION, lRes.getVisibility());
+        Path resPath = ConverterUtils.getPathFromYarnURL(lRes.getResource());
+        assertEquals(new Path(getFsTestCaseDir(), "foo.xml"), resPath);
+        Configuration conf2 = new Configuration(false);
+        conf2.addResource(getFileSystem().open(resPath));
+        assertEquals("bar", conf2.get("foo"));
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 8fd0c2d..ce04c6d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -21,10 +21,6 @@ package org.apache.oozie.service;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.BundleActionBean;
 import org.apache.oozie.BundleJobBean;
 import org.apache.oozie.CoordinatorActionBean;
@@ -34,7 +30,7 @@ import org.apache.oozie.DagEngine;
 import org.apache.oozie.ForTestingActionExecutor;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.LauncherMapperHelper;
+import org.apache.oozie.action.hadoop.LauncherHelper;
 import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
 import org.apache.oozie.action.hadoop.MapperReducerForTest;
 import org.apache.oozie.client.CoordinatorAction;
@@ -249,24 +245,14 @@ public class TestRecoveryService extends XDataTestCase {
 
         ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false);
         MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
-        JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
-        String user = conf.get("user.name");
-        String group = conf.get("group.name");
-        JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-
+        Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
         String launcherId = action1.getExternalId();
 
-        final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+        waitUntilYarnAppDoneAndAssertSuccess(launcherId);
 
-        waitFor(240 * 1000, new Predicate() {
-            public boolean evaluate() throws Exception {
-                return launcherJob.isComplete();
-            }
-        });
-        assertTrue(launcherJob.isSuccessful());
-        Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+        Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
                 conf);
-        assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+        assertTrue(LauncherHelper.hasIdSwap(actionData));
     }
 
     /**
@@ -274,10 +260,8 @@ public class TestRecoveryService extends XDataTestCase {
      * @throws Exception
      */
     public void testBundleRecoveryCoordCreate() throws Exception {
-        final BundleActionBean bundleAction;
-        final BundleJobBean bundle;
-        bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
-        bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
+        final BundleJobBean bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+        addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
         final JPAService jpaService = Services.get().get(JPAService.class);
 
         sleep(3000);
@@ -290,7 +274,7 @@ public class TestRecoveryService extends XDataTestCase {
                         jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
                 try {
                     if (mybundleAction.getCoordId() != null) {
-                        CoordinatorJobBean coord = jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
+                        jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
                         return true;
                     }
                 } catch (Exception e) {
@@ -345,12 +329,11 @@ public class TestRecoveryService extends XDataTestCase {
      * @throws Exception
      */
     public void testBundleRecoveryCoordExists() throws Exception {
-        final BundleActionBean bundleAction;
         final BundleJobBean bundle;
         final CoordinatorJob coord;
         bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
         coord = addRecordToCoordJobTable(Job.Status.PREP, false, false);
-        bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
+        addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
         final JPAService jpaService = Services.get().get(JPAService.class);
 
         sleep(3000);