You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/05/26 09:27:57 UTC
[05/10] oozie git commit: OOZIE-1770 Create Oozie Application Master
for YARN (asasvari, pbacsko, rkanter, gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
new file mode 100644
index 0000000..1f7e5b2
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestLauncherAMCallbackNotifier.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.oozie.QueryServlet;
+import org.apache.oozie.action.hadoop.LauncherAM.OozieActionResult;
+import org.apache.oozie.command.wf.HangServlet;
+import org.apache.oozie.test.EmbeddedServletContainer;
+import org.apache.oozie.test.XTestCase;
+import org.junit.Assert;
+import org.mockito.Mockito;
+
+import java.net.Proxy;
+import java.util.HashMap;
+import java.util.Map;
+
+// A lot of this adapted from org.apache.hadoop.mapreduce.v2.app.TestJobEndNotifier and org.apache.hadoop.mapred.TestJobEndNotifier
+public class TestLauncherAMCallbackNotifier extends XTestCase {
+ private EmbeddedServletContainer container;
+
+ @Override
+ public void setUp() throws Exception {
+ super.setUp();
+ QueryServlet.lastQueryString = null;
+ }
+
+ @Override
+ public void tearDown() throws Exception {
+ if (container != null) {
+ container.stop();
+ }
+
+ super.tearDown();
+ }
+
+ public void testConfiguration() throws Exception {
+ Configuration conf = new Configuration(false);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "0");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "10");
+ LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(0, cn.numTries);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(1, cn.numTries);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "20");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(11, cn.numTries); //11 because number of _retries_ is 10
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(1000, cn.waitInterval);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "10000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(5000, cn.waitInterval);
+ //Test negative numbers are set to default
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "-10");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(5000, cn.waitInterval);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_TIMEOUT, "1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(1000, cn.timeout);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:someport");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals(Proxy.Type.DIRECT, cn.proxyToUse.type());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "socks@somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "SOCKS@somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("SOCKS @ somehost:1000", cn.proxyToUse.toString());
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_PROXY, "sfafn@somehost:1000");
+ cn = new LauncherAMCallbackNotifier(conf);
+ assertEquals("HTTP @ somehost:1000", cn.proxyToUse.toString());
+ }
+
+ public void testNotifyRetries() throws InterruptedException {
+ Configuration conf = new Configuration(false);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, "http://nonexistent");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+ LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+
+ long start = System.currentTimeMillis();
+ cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
+ long end = System.currentTimeMillis();
+ Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+ Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "3");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "3");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "3000");
+
+ cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+ start = System.currentTimeMillis();
+ cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
+ end = System.currentTimeMillis();
+ Mockito.verify(cnSpy, Mockito.times(3)).notifyURLOnce();
+ Assert.assertTrue("Should have taken more than 9 seconds but it only took " + (end - start), end - start >= 9000);
+ }
+
+ public void testNotifyTimeout() throws Exception {
+ Map<String, String> params = new HashMap<String, String>();
+ params.put(HangServlet.SLEEP_TIME_MS, "1000000");
+ Configuration conf = setupEmbeddedContainer(HangServlet.class, "/hang/*", "/hang/*", params);
+
+ LauncherAMCallbackNotifier cnSpy = Mockito.spy(new LauncherAMCallbackNotifier(conf));
+ long start = System.currentTimeMillis();
+ cnSpy.notifyURL(OozieActionResult.SUCCEEDED);
+ long end = System.currentTimeMillis();
+ Mockito.verify(cnSpy, Mockito.times(1)).notifyURLOnce();
+ Assert.assertTrue("Should have taken more than 5 seconds but it only took " + (end - start), end - start >= 5000);
+ }
+
+ public void testNotify() throws Exception {
+ Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null);
+
+ LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+ assertNull(QueryServlet.lastQueryString);
+ cn.notifyURL(OozieActionResult.SUCCEEDED);
+ waitForCallbackAndCheckResult(FinalApplicationStatus.SUCCEEDED.toString());
+ }
+
+ public void testNotifyBackgroundActionWhenSubmitSucceeds() throws Exception {
+ Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null);
+
+ LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+ assertNull(QueryServlet.lastQueryString);
+ cn.notifyURL(OozieActionResult.RUNNING);
+ waitForCallbackAndCheckResult(OozieActionResult.RUNNING.toString());
+ }
+
+ public void testNotifyBackgroundActionWhenSubmitFailsWithFailed() throws Exception {
+ Configuration conf = setupEmbeddedContainer(QueryServlet.class, "/count/*", "/count/?status=$jobStatus", null);
+
+ LauncherAMCallbackNotifier cn = new LauncherAMCallbackNotifier(conf);
+
+ assertNull(QueryServlet.lastQueryString);
+ cn.notifyURL(OozieActionResult.FAILED);
+ waitForCallbackAndCheckResult(FinalApplicationStatus.FAILED.toString());
+ }
+
+ private Configuration setupEmbeddedContainer(Class<?> servletClass, String servletEndPoint,
+ String servletUrl, Map<String, String> params) throws Exception {
+ container = new EmbeddedServletContainer("test");
+ if (servletEndPoint != null) {
+ if (params != null) {
+ container.addServletEndpoint(servletEndPoint, servletClass, params);
+ } else {
+ container.addServletEndpoint(servletEndPoint, servletClass);
+ }
+ }
+ container.start();
+
+ Configuration conf = new Configuration(false);
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_ATTEMPTS, "0");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_MAX_ATTEMPTS, "1");
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_URL, container.getServletURL(servletUrl));
+ conf.set(LauncherAMCallbackNotifier.OOZIE_LAUNCHER_CALLBACK_RETRY_INTERVAL, "5000");
+
+ return conf;
+ }
+
+ private void waitForCallbackAndCheckResult(final String expectedResult) {
+ waitFor(5000, new Predicate() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ("status=" + expectedResult).equals(QueryServlet.lastQueryString);
+ }
+ });
+
+ assertEquals("status=" + expectedResult, QueryServlet.lastQueryString);
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
deleted file mode 100644
index 4cda615..0000000
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestMapReduceActionError.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.oozie.action.hadoop;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.RunningJob;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.oozie.WorkflowActionBean;
-import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.HadoopAccessorService;
-import org.apache.oozie.service.Services;
-import org.apache.oozie.service.WorkflowAppService;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
-import org.apache.oozie.util.IOUtils;
-import org.jdom.Element;
-
-import java.io.File;
-import java.io.OutputStream;
-import java.io.InputStream;
-import java.io.FileInputStream;
-import java.io.StringReader;
-import java.io.Writer;
-import java.io.OutputStreamWriter;
-
-public class TestMapReduceActionError extends ActionExecutorTestCase {
-
- @Override
- protected void setSystemProps() throws Exception {
- super.setSystemProps();
- setSystemProperty("oozie.service.ActionService.executor.classes", MapReduceActionExecutor.class.getName());
- }
-
- private Context createContext(String actionXml) throws Exception {
- JavaActionExecutor ae = new JavaActionExecutor();
-
- Path appJarPath = new Path("lib/test.jar");
- File jarFile = IOUtils.createJar(new File(getTestCaseDir()), "test.jar", MapperReducerForTest.class);
- InputStream is = new FileInputStream(jarFile);
- OutputStream os = getFileSystem().create(new Path(getAppPath(), "lib/test.jar"));
- IOUtils.copyStream(is, os);
-
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- protoConf.setStrings(WorkflowAppService.APP_LIB_PATH_LIST, appJarPath.toString());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "mr-action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
- action.setConf(actionXml);
-
- return new Context(wf, action);
- }
-
- private RunningJob submitAction(Context context) throws Exception {
- MapReduceActionExecutor ae = new MapReduceActionExecutor();
-
- WorkflowAction action = context.getAction();
-
- ae.prepareActionDir(getFileSystem(), context);
- ae.submitLauncher(getFileSystem(), context, action);
-
- String jobId = action.getExternalId();
- String jobTracker = action.getTrackerUri();
- String consoleUrl = action.getConsoleUrl();
- assertNotNull(jobId);
- assertNotNull(jobTracker);
- assertNotNull(consoleUrl);
-
- Element e = XmlUtils.parseXml(action.getConf());
- XConfiguration conf =
- new XConfiguration(new StringReader(XmlUtils.prettyPrint(e.getChild("configuration")).toString()));
- conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
- conf.set("fs.default.name", e.getChildTextTrim("name-node"));
- conf.set("user.name", context.getProtoActionConf().get("user.name"));
- conf.set("group.name", getTestGroup());
-
- conf.set("mapreduce.framework.name", "yarn");
- JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
- XConfiguration.copy(conf, jobConf);
- String user = jobConf.get("user.name");
- String group = jobConf.get("group.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
- final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
- assertNotNull(runningJob);
- return runningJob;
- }
-
- private void _testSubmit(String actionXml) throws Exception {
-
- Context context = createContext(actionXml);
- final RunningJob launcherJob = submitAction(context);
- String launcherId = context.getAction().getExternalId();
- waitFor(60 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
-
- MapReduceActionExecutor ae = new MapReduceActionExecutor();
- ae.check(context, context.getAction());
-
- JobConf conf = ae.createBaseHadoopConf(context, XmlUtils.parseXml(actionXml));
- String user = conf.get("user.name");
- String group = conf.get("group.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
- final RunningJob mrJob = jobClient.getJob(JobID.forName(context.getAction().getExternalId()));
-
- waitFor(60 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return mrJob.isComplete();
- }
- });
- ae.check(context, context.getAction());
-
- assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
-
- ae.end(context, context.getAction());
- assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
- assertTrue(context.getAction().getErrorMessage().contains("already exists"));
- }
-
- public void testMapReduce() throws Exception {
- FileSystem fs = getFileSystem();
-
- Path inputDir = new Path(getFsTestCaseDir(), "input");
- Path outputDir = new Path(getFsTestCaseDir(), "output1");
-
- Writer w = new OutputStreamWriter(fs.create(new Path(inputDir, "data.txt")));
- w.write("dummy\n");
- w.write("dummy\n");
- Writer ow = new OutputStreamWriter(fs.create(new Path(outputDir, "data.txt")));
- ow.write("dummy\n");
- ow.write("dummy\n");
- ow.close();
-
- String actionXml = "<map-reduce>" +
- "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
- "<name-node>" + getNameNodeUri() + "</name-node>" +
- "<configuration>" +
- "<property><name>mapred.mapper.class</name><value>" + MapperReducerForTest.class.getName() +
- "</value></property>" +
- "<property><name>mapred.reducer.class</name><value>" + MapperReducerForTest.class.getName() +
- "</value></property>" +
- "<property><name>mapred.input.dir</name><value>" + inputDir + "</value></property>" +
- "<property><name>mapred.output.dir</name><value>" + outputDir + "</value></property>" +
- "</configuration>" +
- "</map-reduce>";
- _testSubmit(actionXml);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
index 5bc7d00..551adff 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestOozieJobInfo.java
@@ -33,19 +33,12 @@ import java.util.regex.Matcher;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
-import org.apache.oozie.action.hadoop.MapperReducerForTest;
-import org.apache.oozie.action.hadoop.OozieJobInfo;
import org.apache.oozie.client.Job;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.command.CommandException;
@@ -53,19 +46,16 @@ import org.apache.oozie.command.bundle.BundleStartXCommand;
import org.apache.oozie.command.bundle.BundleSubmitXCommand;
import org.apache.oozie.command.wf.ActionXCommand;
import org.apache.oozie.command.wf.ActionXCommand.ActionExecutorContext;
-import org.apache.oozie.command.wf.JobXCommand;
import org.apache.oozie.executor.jpa.BundleActionQueryExecutor;
+import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
import org.apache.oozie.executor.jpa.BundleJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.CoordJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionsGetForJobJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobsGetFromCoordParentIdJPAExecutor;
-import org.apache.oozie.executor.jpa.BundleActionQueryExecutor.BundleActionQuery;
-import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
-import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.UUIDService.ApplicationType;
import org.apache.oozie.test.XDataTestCase;
import org.apache.oozie.util.IOUtils;
@@ -163,14 +153,12 @@ public class TestOozieJobInfo extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(wfbean, actionList.get(1), false,
false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf()));
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(actionList.get(1).getConf()));
String user = conf.get("user.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
- String launcherId = actionList.get(1).getExternalId();
- final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
- FileSystem fs = context.getAppFileSystem();
- Configuration jobXmlConf = new XConfiguration(fs.open(new Path(launcherJob.getJobFile())));
+ FileSystem fs = getFileSystem();
+ Configuration jobXmlConf = new XConfiguration(fs.open(getPathToWorkflowResource(
+ user, wfbean, services, context, LauncherAM.LAUNCHER_JOB_CONF_XML)));
String jobInfo = jobXmlConf.get(OozieJobInfo.JOB_INFO_KEY);
// BUNDLE_ID;BUNDLE_NAME;COORDINATOR_NAME;COORDINATOR_NOMINAL_TIME;
@@ -186,7 +174,6 @@ public class TestOozieJobInfo extends XDataTestCase {
assertTrue(jobInfo.contains(",testing=test,"));
assertTrue(jobInfo.contains(",coord.nominal.time="));
assertTrue(jobInfo.contains("launcher=true"));
-
}
protected void setCoordConf(Configuration jobConf) throws IOException {
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
index df9e939..89aeab6 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestPrepareActionsDriver.java
@@ -24,6 +24,9 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.test.XFsTestCase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
+import org.xml.sax.SAXException;
+
+import javax.xml.parsers.ParserConfigurationException;
public class TestPrepareActionsDriver extends XFsTestCase {
@@ -40,7 +43,7 @@ public class TestPrepareActionsDriver extends XFsTestCase {
}
// Test to check if prepare action is performed as expected when the prepare XML block is a valid one
- public void testDoOperationsWithValidXML() throws LauncherException, IOException {
+ public void testDoOperationsWithValidXML() throws LauncherException, IOException, ParserConfigurationException, SAXException {
Path actionDir = getFsTestCaseDir();
FileSystem fs = getFileSystem();
Path newDir = new Path(actionDir, "newDir");
@@ -52,12 +55,12 @@ public class TestPrepareActionsDriver extends XFsTestCase {
}
JobConf conf = createJobConf();
- LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
+ LauncherHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
assertTrue(fs.exists(actionDir));
}
- // Test to check if LauncherException is thrown when the prepare XML block is invalid
+ // Test to check if Exception is thrown when the prepare XML block is invalid
public void testDoOperationsWithInvalidXML() throws LauncherException, IOException {
Path actionDir = getFsTestCaseDir();
FileSystem fs = getFileSystem();
@@ -72,14 +75,12 @@ public class TestPrepareActionsDriver extends XFsTestCase {
try {
prepareXML = "prepare>" + "<mkdir path='" + newDir + "'/>" + "</prepare>";
JobConf conf = createJobConf();
- LauncherMapperHelper.setupLauncherURIHandlerConf(conf);
+ LauncherHelper.setupLauncherURIHandlerConf(conf);
PrepareActionsDriver.doOperations(prepareXML, conf);
fail("Expected to catch an exception but did not encounter any");
- } catch (LauncherException le) {
- assertEquals(le.getCause().getClass(), org.xml.sax.SAXParseException.class);
- assertEquals(le.getMessage(), "Content is not allowed in prolog.");
- } catch(Exception ex){
- fail("Expected a LauncherException but received an Exception");
+ } catch (Exception ex) {
+ assertEquals(ex.getClass(), org.xml.sax.SAXParseException.class);
+ assertEquals(ex.getMessage(), "Content is not allowed in prolog.");
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
index f12927b..72be0a2 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellActionExecutor.java
@@ -26,16 +26,10 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.Shell;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowAction;
-import org.apache.oozie.service.ActionService;
-import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.PropertiesUtils;
@@ -294,14 +288,8 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
Context context = createContext(actionXml);
// Submit the action
- final RunningJob launcherJob = submitAction(context);
- waitFor(180 * 1000, new Predicate() { // Wait for the external job to
- // finish
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
-
+ final String launcherId = submitAction(context);
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
ShellActionExecutor ae = new ShellActionExecutor();
WorkflowAction action = context.getAction();
ae.check(context, action);
@@ -323,24 +311,14 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
private WorkflowAction _testSubmit(String actionXml, boolean checkForSuccess, String capture_output) throws Exception {
Context context = createContext(actionXml);
- final RunningJob launcherJob = submitAction(context);// Submit the
- // action
- String launcherId = context.getAction().getExternalId(); // Get LM id
- waitFor(180 * 1000, new Predicate() { // Wait for the external job to
- // finish
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- // Thread.sleep(2000);
- assertTrue(launcherJob.isSuccessful());
-
- sleep(2000);// Wait more to make sure no ID swap happens
+ final String launcherId = submitAction(context);// Submit the action
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+
Configuration conf = new XConfiguration();
conf.set("user.name", getTestUser());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertFalse(LauncherMapperHelper.hasIdSwap(actionData));
+ assertFalse(LauncherHelper.hasIdSwap(actionData));
ShellActionExecutor ae = new ShellActionExecutor();
ae.check(context, context.getAction());
@@ -399,14 +377,13 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
* @return The RunningJob of the Launcher Mapper
* @throws Exception
*/
- private RunningJob submitAction(Context context) throws Exception {
+ private String submitAction(Context context) throws Exception {
ShellActionExecutor ae = new ShellActionExecutor();
WorkflowAction action = context.getAction();
ae.prepareActionDir(getFileSystem(), context);
- ae.submitLauncher(getFileSystem(), context, action); // Submit the
- // Launcher Mapper
+ ae.submitLauncher(getFileSystem(), context, action); // Submit the action
String jobId = action.getExternalId();
String jobTracker = action.getTrackerUri();
@@ -416,41 +393,6 @@ public class TestShellActionExecutor extends ActionExecutorTestCase {
assertNotNull(jobTracker);
assertNotNull(consoleUrl);
- Element e = XmlUtils.parseXml(action.getConf());
- XConfiguration conf = new XConfiguration();
- conf.set("mapred.job.tracker", e.getChildTextTrim("job-tracker"));
- conf.set("fs.default.name", e.getChildTextTrim("name-node"));
- conf.set("user.name", context.getProtoActionConf().get("user.name"));
- conf.set("group.name", getTestGroup());
-
- JobConf jobConf = Services.get().get(HadoopAccessorService.class).createJobConf(jobTracker);
- XConfiguration.copy(conf, jobConf);
- String user = jobConf.get("user.name");
- String group = jobConf.get("group.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, jobConf);
- final RunningJob runningJob = jobClient.getJob(JobID.forName(jobId));
- assertNotNull(runningJob);
- return runningJob;
- }
-
- public void testShellMainPathInUber() throws Exception {
- Services.get().getConf().setBoolean("oozie.action.shell.launcher.mapreduce.job.ubertask.enable", true);
-
- Element actionXml = XmlUtils.parseXml("<shell>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>"
- + "<name-node>" + getNameNodeUri() + "</name-node>" + "<exec>script.sh</exec>"
- + "<argument>a=A</argument>" + "<argument>b=B</argument>" + "</shell>");
- ShellActionExecutor ae = new ShellActionExecutor();
- XConfiguration protoConf = new XConfiguration();
- protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
-
- WorkflowJobBean wf = createBaseWorkflow(protoConf, "action");
- WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
- action.setType(ae.getType());
-
- Context context = new Context(wf, action);
- JobConf launcherConf = new JobConf();
- launcherConf = ae.createLauncherConf(getFileSystem(), context, action, actionXml, launcherConf);
- // env
- assertEquals("PATH=.:$PATH", launcherConf.get(JavaActionExecutor.YARN_AM_ENV));
+ return jobId;
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
index e757e54..a7d6c18 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java
@@ -25,7 +25,6 @@ import java.io.FileWriter;
import java.io.Writer;
import java.util.Properties;
-import org.apache.hadoop.fs.Path;
import org.apache.oozie.util.XConfiguration;
//Test cases are mainly implemented in the Base class
@@ -53,8 +52,8 @@ public class TestShellMain extends ShellTestCase {
jobConf.set(ShellMain.CONF_OOZIE_SHELL_EXEC, SHELL_COMMAND_NAME);
String[] args = new String[] { SHELL_COMMAND_SCRIPTFILE_OPTION, script.toString(), "A", "B" };
- MapReduceMain.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ARGS, args);
- MapReduceMain.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ENVS,
+ ActionUtils.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ARGS, args);
+ ActionUtils.setStrings(jobConf, ShellMain.CONF_OOZIE_SHELL_ENVS,
new String[] { "var1=value1", "var2=value2" });
File actionXml = new File(getTestCaseDir(), "action.xml");
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
index dbc160f..9c7064b 100644
--- a/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
+++ b/core/src/test/java/org/apache/oozie/client/TestOozieCLI.java
@@ -135,8 +135,8 @@ public class TestOozieCLI extends DagServletTestCase {
String path = getTestCaseDir() + "/" + getName() + ".properties";
Properties props = new Properties();
props.setProperty(OozieClient.USER_NAME, getTestUser());
- props.setProperty(XOozieClient.NN, "localhost:9000");
- props.setProperty(XOozieClient.JT, "localhost:9001");
+ props.setProperty(XOozieClient.NN, "localhost:8020");
+ props.setProperty(XOozieClient.RM, "localhost:8032");
props.setProperty("oozie.libpath", appPath);
props.setProperty("mapred.output.dir", appPath);
props.setProperty("a", "A");
@@ -155,7 +155,7 @@ public class TestOozieCLI extends DagServletTestCase {
props.setProperty(OozieClient.APP_PATH, appPath);
props.setProperty(OozieClient.RERUN_SKIP_NODES, "node");
props.setProperty(XOozieClient.NN, "localhost:9000");
- props.setProperty(XOozieClient.JT, "localhost:9001");
+ props.setProperty(XOozieClient.RM, "localhost:9001");
if (useNewAPI) {
props.setProperty("mapreduce.map.class", "mapper.class");
props.setProperty("mapreduce.reduce.class", "reducer.class");
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
index 51ae9e8..b4bce60 100644
--- a/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
+++ b/core/src/test/java/org/apache/oozie/client/TestWorkflowXClient.java
@@ -29,8 +29,6 @@ import org.apache.oozie.servlet.MockDagEngineService;
import org.apache.oozie.servlet.V1JobsServlet;
import org.apache.oozie.servlet.V1AdminServlet;
-import java.io.File;
-
public class TestWorkflowXClient extends DagServletTestCase {
static {
@@ -60,7 +58,7 @@ public class TestWorkflowXClient extends DagServletTestCase {
Path libPath = new Path(getFsTestCaseDir(), "lib");
getFileSystem().mkdirs(libPath);
conf.setProperty(OozieClient.LIBPATH, libPath.toString());
- conf.setProperty(XOozieClient.JT, "localhost:9001");
+ conf.setProperty(XOozieClient.RM, "localhost:9001");
conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000");
String[] params = new String[]{"INPUT=input.txt"};
@@ -90,7 +88,7 @@ public class TestWorkflowXClient extends DagServletTestCase {
getFileSystem().mkdirs(libPath);
System.out.println(libPath.toString());
conf.setProperty(OozieClient.LIBPATH, libPath.toString());
- conf.setProperty(XOozieClient.JT, "localhost:9001");
+ conf.setProperty(XOozieClient.RM, "localhost:9001");
conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000");
String[] params = new String[]{"NAME=test"};
@@ -120,7 +118,7 @@ public class TestWorkflowXClient extends DagServletTestCase {
getFileSystem().mkdirs(libPath);
System.out.println(libPath.toString());
conf.setProperty(OozieClient.LIBPATH, libPath.toString());
- conf.setProperty(XOozieClient.JT, "localhost:9001");
+ conf.setProperty(XOozieClient.RM, "localhost:9001");
conf.setProperty(XOozieClient.NN, "hdfs://localhost:9000");
assertEquals(MockDagEngineService.JOB_ID + wfCount + MockDagEngineService.JOB_ID_END,
@@ -154,9 +152,9 @@ public class TestWorkflowXClient extends DagServletTestCase {
fail("submit client without JT should throw exception");
}
catch (RuntimeException exception) {
- assertEquals("java.lang.RuntimeException: jobtracker is not specified in conf", exception.toString());
+ assertEquals("java.lang.RuntimeException: Resource manager is not specified in conf", exception.toString());
}
- conf.setProperty(XOozieClient.JT, "localhost:9001");
+ conf.setProperty(XOozieClient.RM, "localhost:9001");
try {
wc.submitMapReduce(conf);
fail("submit client without NN should throw exception");
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
index c071000..b8eb15d 100644
--- a/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/coord/TestCoordChangeXCommand.java
@@ -19,8 +19,10 @@
package org.apache.oozie.command.coord;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Date;
import java.util.List;
+import java.util.Set;
import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
@@ -84,6 +86,11 @@ public class TestCoordChangeXCommand extends XDataTestCase {
@Override
public synchronized boolean queueSerial(List<? extends XCallable<?>> callables, long delay){return false;}
+
+ @Override
+ public Set<String> getInterruptTypes() {
+ return Collections.emptySet();
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
index 3344cf9..c1bca16 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/HangServlet.java
@@ -18,6 +18,8 @@
package org.apache.oozie.command.wf;
+import org.apache.oozie.util.XLog;
+
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
@@ -25,14 +27,27 @@ import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
/**
- * Servlet that 'hangs' for 200 ms. Used by TestNotificationXCommand
+ * Servlet that 'hangs' for some amount of time (200ms) by default.
+ * The time can be configured by setting {@link HangServlet#SLEEP_TIME_MS} as an init parameter for the servlet.
*/
public class HangServlet extends HttpServlet {
+ public static final String SLEEP_TIME_MS = "sleep_time_ms";
+
protected void doGet(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {
try {
- Thread.sleep(200);
+ long time = 200;
+ String sleeptime = getInitParameter(SLEEP_TIME_MS);
+ if (sleeptime != null) {
+ try {
+ time = Long.parseLong(sleeptime);
+ } catch (NumberFormatException nfe) {
+ XLog.getLog(HangServlet.class).error("Invalid sleep time, using default (200)", nfe);
+ }
+ }
+ XLog.getLog(HangServlet.class).info("Sleeping for {0} ms", time);
+ Thread.sleep(time);
}
catch (Exception ex) {
//NOP
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
index 5898d1a..a5128a8 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
@@ -23,18 +23,17 @@ import java.io.Writer;
import java.util.Date;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
-import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
-import org.apache.oozie.action.hadoop.LauncherMapperHelper;
+import org.apache.oozie.action.hadoop.LauncherHelper;
import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.WorkflowAction;
@@ -45,7 +44,6 @@ import org.apache.oozie.executor.jpa.JPAExecutorException;
import org.apache.oozie.executor.jpa.WorkflowActionGetJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowActionInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobGetJPAExecutor;
-import org.apache.oozie.service.ActionCheckerService;
import org.apache.oozie.service.ActionService;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorService;
@@ -259,30 +257,25 @@ public class TestActionCheckXCommand extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
String user = conf.get("user.name");
JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
String launcherId = action.getExternalId();
- final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
- waitFor(120 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+ assertTrue(LauncherHelper.hasIdSwap(actionData));
new ActionCheckXCommand(action.getId()).call();
action = jpaService.execute(wfActionGetCmd);
- String mapperId = action.getExternalId();
+ String externalId = action.getExternalId();
String childId = action.getExternalChildIDs();
- assertTrue(launcherId.equals(mapperId));
+ assertEquals("LauncherId", launcherId, externalId);
+ assertNotNull(childId);
final RunningJob mrJob = jobClient.getJob(JobID.forName(childId));
@@ -297,7 +290,6 @@ public class TestActionCheckXCommand extends XDataTestCase {
action = jpaService.execute(wfActionGetCmd);
assertEquals("SUCCEEDED", action.getExternalStatus());
-
}
private static class ErrorCheckActionExecutor extends ActionExecutor {
@@ -416,7 +408,7 @@ public class TestActionCheckXCommand extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job2, action1, false, false);
WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf()));
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action2.getConf()));
String user = conf.get("user.name");
JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
@@ -434,9 +426,9 @@ public class TestActionCheckXCommand extends XDataTestCase {
}
});
assertTrue(launcherJob.isSuccessful());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+ assertTrue(LauncherHelper.hasIdSwap(actionData));
new ActionCheckXCommand(actionId).call();
WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
@@ -488,7 +480,7 @@ public class TestActionCheckXCommand extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job0, action1, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
String user = conf.get("user.name");
JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
@@ -501,9 +493,9 @@ public class TestActionCheckXCommand extends XDataTestCase {
}
});
assertTrue(launcherJob.isSuccessful());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+ assertTrue(LauncherHelper.hasIdSwap(actionData));
new ActionCheckXCommand(action1.getId()).call();
WorkflowActionBean action2 = jpaService.execute(wfActionGetCmd);
@@ -568,9 +560,9 @@ public class TestActionCheckXCommand extends XDataTestCase {
});
assertTrue(launcherJob2.isSuccessful());
- actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+ assertTrue(LauncherHelper.hasIdSwap(actionData));
new ActionCheckXCommand(actionId).call();
WorkflowActionBean action4 = jpaService.execute(wfActionGetCmd);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
index ea90c08..80c5d54 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
@@ -28,13 +28,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.LauncherMapperHelper;
+import org.apache.oozie.action.hadoop.LauncherHelper;
import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.OozieClient;
@@ -50,7 +46,6 @@ import org.apache.oozie.executor.jpa.WorkflowActionQueryExecutor.WorkflowActionQ
import org.apache.oozie.executor.jpa.WorkflowJobInsertJPAExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor;
import org.apache.oozie.executor.jpa.WorkflowJobQueryExecutor.WorkflowJobQuery;
-import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.LiteWorkflowStoreService;
@@ -162,23 +157,14 @@ public class TestActionStartXCommand extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
- String user = conf.get("user.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
String launcherId = action.getExternalId();
- final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
-
- waitFor(120 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+ assertTrue(LauncherHelper.hasIdSwap(actionData));
}
public void testActionStartToCheckRetry() throws Exception {
@@ -238,26 +224,15 @@ public class TestActionStartXCommand extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job, action, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action.getConf()));
String user = conf.get("user.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
String launcherId = action.getExternalId();
- // retrieve launcher job
- final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
-
- // time out after 120 seconds unless launcher job succeeds
- waitFor(240 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- // check if launcher job succeeds
- assertTrue(launcherJob.isSuccessful());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+ assertTrue(LauncherHelper.hasIdSwap(actionData));
}
/**
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
index 43edf5e..98c94a7 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitHiveXCommand.java
@@ -21,7 +21,7 @@ package org.apache.oozie.command.wf;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.test.XFsTestCase;
import org.apache.oozie.util.XLog;
@@ -45,7 +45,7 @@ public class TestSubmitHiveXCommand extends XFsTestCase {
public void testWFXmlGeneration() throws Exception {
Configuration conf = new Configuration();
- conf.set(XOozieClient.JT, "jobtracker");
+ conf.set(XOozieClient.RM, "jobtracker");
conf.set(XOozieClient.NN, "namenode");
conf.set(OozieClient.LIBPATH, "libpath");
@@ -54,9 +54,9 @@ public class TestSubmitHiveXCommand extends XFsTestCase {
String hiveArgsStr = "-a aaa -b bbb -c ccc -M -Da=aaa -Db=bbb -param input=abc";
String[] args = hiveArgsStr.split(" ");
- MapReduceMain.setStrings(conf, XOozieClient.HIVE_OPTIONS, args);
+ ActionUtils.setStrings(conf, XOozieClient.HIVE_OPTIONS, args);
String[] params = new String[]{"INPUT=/some/path", "OUTPUT=/some/other/path", "abc=xyz"};
- MapReduceMain.setStrings(conf, XOozieClient.HIVE_SCRIPT_PARAMS, params);
+ ActionUtils.setStrings(conf, XOozieClient.HIVE_SCRIPT_PARAMS, params);
SubmitHiveXCommand submitHiveCmd = new SubmitHiveXCommand(conf);
String xml = submitHiveCmd.getWorkflowXml(conf);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
index 5bc5747..388ff94 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitMRXCommand.java
@@ -51,7 +51,7 @@ public class TestSubmitMRXCommand extends XFsTestCase {
public void testWFXmlGeneration() throws Exception {
Configuration conf = new Configuration(false);
- conf.set(XOozieClient.JT, "jobtracker");
+ conf.set(XOozieClient.RM, "jobtracker");
conf.set(XOozieClient.NN, "namenode");
conf.set(OozieClient.LIBPATH, "libpath");
@@ -97,7 +97,7 @@ public class TestSubmitMRXCommand extends XFsTestCase {
public void testWFXmlGenerationNegative1() throws Exception {
Configuration conf = new Configuration();
- conf.set(XOozieClient.JT, "jobtracker");
+ conf.set(XOozieClient.RM, "jobtracker");
conf.set(XOozieClient.NN, "namenode");
// conf.set(XOozieClient.LIBPATH, "libpath");
@@ -118,8 +118,8 @@ public class TestSubmitMRXCommand extends XFsTestCase {
public void testWFXmlGenerationNewConfigProps() throws Exception {
try {
Configuration conf = new Configuration(false);
- conf.set(XOozieClient.NN_2, "new_NN");
- conf.set(XOozieClient.JT_2, "new_JT");
+ conf.set(XOozieClient.NN, "new_NN");
+ conf.set(XOozieClient.RM, "new_JT");
conf.set("mapred.mapper.class", "TestMapper");
conf.set("mapred.reducer.class", "TestReducer");
conf.set("mapred.input.dir", "testInput");
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
index 5a1de25..c3cd1aa 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitPigXCommand.java
@@ -21,7 +21,7 @@ package org.apache.oozie.command.wf;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.test.XFsTestCase;
import org.apache.oozie.util.XLog;
@@ -46,7 +46,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
public void testWFXmlGeneration1() throws Exception {
Configuration conf = new Configuration();
- conf.set(XOozieClient.JT, "jobtracker");
+ conf.set(XOozieClient.RM, "jobtracker");
conf.set(XOozieClient.NN, "namenode");
conf.set(OozieClient.LIBPATH, "libpath");
@@ -55,9 +55,9 @@ public class TestSubmitPigXCommand extends XFsTestCase {
String pigArgsStr = "-a aaa -b bbb -c ccc -M -Da=aaa -Db=bbb -param input=abc";
String[] args = pigArgsStr.split(" ");
- MapReduceMain.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
+ ActionUtils.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
String[] params = new String[]{"INPUT=/some/path", "OUTPUT=/some/other/path", "abc=xyz"};
- MapReduceMain.setStrings(conf, XOozieClient.PIG_SCRIPT_PARAMS, params);
+ ActionUtils.setStrings(conf, XOozieClient.PIG_SCRIPT_PARAMS, params);
SubmitPigXCommand submitPigCmd = new SubmitPigXCommand(conf);
String xml = submitPigCmd.getWorkflowXml(conf);
@@ -118,7 +118,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
public void testWFXmlGeneration2() throws Exception {
Configuration conf = new Configuration();
- conf.set(XOozieClient.JT, "jobtracker");
+ conf.set(XOozieClient.RM, "jobtracker");
conf.set(XOozieClient.NN, "namenode");
conf.set(OozieClient.LIBPATH, "libpath");
@@ -128,7 +128,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
String[] args = new String[2];
args[0] = "-a";
args[1] = "aaa bbb";
- MapReduceMain.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
+ ActionUtils.setStrings(conf, XOozieClient.PIG_OPTIONS, args);
SubmitPigXCommand submitPigCmd = new SubmitPigXCommand(conf);
String xml = submitPigCmd.getWorkflowXml(conf);
@@ -169,7 +169,7 @@ public class TestSubmitPigXCommand extends XFsTestCase {
public void testWFXmlGenerationNegative1() throws Exception {
Configuration conf = new Configuration();
- conf.set(XOozieClient.JT, "jobtracker");
+ conf.set(XOozieClient.RM, "jobtracker");
conf.set(XOozieClient.NN, "namenode");
// conf.set(XOozieClient.LIBPATH, "libpath");
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
index 49b5028..f2f248a 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestSubmitSqoopXCommand.java
@@ -21,7 +21,7 @@ package org.apache.oozie.command.wf;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.local.LocalOozie;
-import org.apache.oozie.action.hadoop.MapReduceMain;
+import org.apache.oozie.action.hadoop.ActionUtils;
import org.apache.oozie.client.XOozieClient;
import org.apache.oozie.test.XFsTestCase;
import org.apache.oozie.util.XLog;
@@ -46,7 +46,7 @@ public class TestSubmitSqoopXCommand extends XFsTestCase {
public void testWFXmlGeneration() throws Exception {
Configuration conf = new Configuration();
- conf.set(XOozieClient.JT, "jobtracker");
+ conf.set(XOozieClient.RM, "jobtracker");
conf.set(XOozieClient.NN, "namenode");
conf.set(OozieClient.LIBPATH, "libpath");
@@ -54,7 +54,7 @@ public class TestSubmitSqoopXCommand extends XFsTestCase {
String sqoopArgsStr = "-Da=aaa -Db=bbb";
String[] args = sqoopArgsStr.split(" ");
- MapReduceMain.setStrings(conf, XOozieClient.SQOOP_OPTIONS, args);
+ ActionUtils.setStrings(conf, XOozieClient.SQOOP_OPTIONS, args);
SubmitSqoopXCommand submitSqoopCmd = new SubmitSqoopXCommand(conf);
String xml = submitSqoopCmd.getWorkflowXml(conf);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
index 72f0114..ef75f14 100644
--- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
+++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java
@@ -18,19 +18,18 @@
package org.apache.oozie.command.wf;
-import java.io.StringReader;
import java.net.URI;
import java.util.Date;
+import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
+import org.apache.oozie.action.hadoop.LauncherMain;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;
@@ -42,11 +41,10 @@ import org.apache.oozie.service.JPAService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.test.XDataTestCase;
-import org.apache.oozie.test.XTestCase.Predicate;
-import org.apache.oozie.util.XConfiguration;
-import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.workflow.WorkflowInstance;
+import com.google.common.collect.Sets;
+
public class TestWorkflowActionKillXCommand extends XDataTestCase {
private Services services;
@@ -117,26 +115,6 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
assertEquals(action.getExternalStatus(), "RUNNING");
}
- public void testWfActionKillChildJob() throws Exception {
- String externalJobID = launchSleepJob(1000);
- String childId = launchSleepJob(1000000);
-
- WorkflowJobBean job = this.addRecordToWfJobTable(WorkflowJob.Status.KILLED, WorkflowInstance.Status.KILLED);
- WorkflowActionBean action = this.addRecordToWfActionTable(job.getId(), externalJobID, "1",
- WorkflowAction.Status.KILLED, childId);
-
- new ActionKillXCommand(action.getId()).call();
- JobClient jobClient = createJobClient();
-
- final RunningJob mrJob = jobClient.getJob(JobID.forName(childId));
- waitFor(60 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return mrJob.isComplete();
- }
- });
- assertEquals(mrJob.getJobState(), JobStatus.KILLED);
- }
-
protected WorkflowActionBean addRecordToWfActionTable(String wfId, String externalJobID, String actionName,
WorkflowAction.Status status, String childID) throws Exception {
WorkflowActionBean action = new WorkflowActionBean();
@@ -189,9 +167,18 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase {
SleepJob sleepjob = new SleepJob();
sleepjob.setConf(jobConf);
jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1);
+ jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob");
+ jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob");
+ System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis()));
+
+ jobClient.submitJob(jobConf);
+ Set<ApplicationId> apps = Sets.newHashSet();
+ apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL);
+ assertEquals("Number of YARN apps", apps.size(), 1);
+
+ sleepjob.close();
- final RunningJob runningJob = jobClient.submitJob(jobConf);
- return runningJob.getID().toString();
+ return apps.iterator().next().toString();
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
index 3c6525d..5957ad6 100644
--- a/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestConfigurationService.java
@@ -19,7 +19,7 @@
package org.apache.oozie.service;
import org.apache.hadoop.conf.Configuration;
-import org.apache.oozie.action.hadoop.CredentialsProvider;
+import org.apache.oozie.action.hadoop.CredentialsProviderFactory;
import org.apache.oozie.action.hadoop.DistcpActionExecutor;
import org.apache.oozie.action.hadoop.JavaActionExecutor;
import org.apache.oozie.action.hadoop.LauncherMapper;
@@ -210,13 +210,10 @@ public class TestConfigurationService extends XTestCase {
assertEquals(2048, ConfigurationService.getInt(LauncherMapper.CONF_OOZIE_ACTION_MAX_OUTPUT_DATA));
assertEquals("http://0.0.0.0:11000/oozie?job=", ConfigurationService.get(JobXCommand.CONF_CONSOLE_URL));
- assertEquals(true, ConfigurationService.getBoolean(JavaActionExecutor.CONF_HADOOP_YARN_UBER_MODE));
- assertEquals(false, ConfigurationService.getBoolean(
- "oozie.action.shell.launcher." + JavaActionExecutor.HADOOP_YARN_UBER_MODE));
assertEquals(false, ConfigurationService.getBoolean(HadoopAccessorService.KERBEROS_AUTH_ENABLED));
assertEquals(0, ConfigurationService.getStrings("no.defined").length);
- assertEquals(0, ConfigurationService.getStrings(CredentialsProvider.CRED_KEY).length);
+ assertEquals(0, ConfigurationService.getStrings(CredentialsProviderFactory.CRED_KEY).length);
assertEquals(1, ConfigurationService.getStrings(DistcpActionExecutor.CLASS_NAMES).length);
assertEquals("distcp=org.apache.hadoop.tools.DistCp",
ConfigurationService.getStrings(DistcpActionExecutor.CLASS_NAMES)[0]);
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
index bbca479..a1ee004 100644
--- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java
@@ -18,7 +18,15 @@
package org.apache.oozie.service;
-import org.apache.oozie.test.XTestCase;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+import org.apache.oozie.test.XFsTestCase;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.fs.FileSystem;
@@ -34,7 +42,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.util.XConfiguration;
-public class TestHadoopAccessorService extends XTestCase {
+public class TestHadoopAccessorService extends XFsTestCase {
protected void setUp() throws Exception {
super.setUp();
@@ -140,49 +148,83 @@ public class TestHadoopAccessorService extends XTestCase {
*/
assertEquals("100", conf.get("action.testprop"));
assertEquals("1", conf.get("default.testprop"));
-
- // Check that properties load correctly
assertEquals("org.apache.log4j.ConsoleAppender", conf.get("log4j.appender.oozie"));
assertEquals("NONE, null", conf.get("log4j.logger.a"));
+ }
+
+ public void testCreateJobClient() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ JobConf conf = has.createJobConf(getJobTrackerUri());
+
+ JobClient jc = has.createJobClient(getTestUser(), conf);
+ assertNotNull(jc);
+ jc.getAllJobs();
+ JobConf conf2 = new JobConf(false);
+ conf2.set("mapred.job.tracker", getJobTrackerUri());
+ try {
+ has.createJobClient(getTestUser(), conf2);
+ fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+ }
+ catch (HadoopAccessorException ex) {
+ assertEquals(ErrorCode.E0903, ex.getErrorCode());
+ }
}
- public void testAccessor() throws Exception {
- Services services = Services.get();
- HadoopAccessorService has = services.get(HadoopAccessorService.class);
+ public void testCreateYarnClient() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
JobConf conf = has.createJobConf(getJobTrackerUri());
- conf.set("mapred.job.tracker", getJobTrackerUri());
- conf.set("fs.default.name", getNameNodeUri());
- URI uri = new URI(getNameNodeUri());
+ YarnClient yc = has.createYarnClient(getTestUser(), conf);
+ assertNotNull(yc);
+ yc.getApplications();
- //valid user
- String user = getTestUser();
- String group = getTestGroup();
+ try {
+ yc = has.createYarnClient("invalid-user", conf);
+ assertNotNull(yc);
+ yc.getApplications();
+ fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
+ }
+ catch (AuthorizationException ex) {
+ }
- JobClient jc = has.createJobClient(user, conf);
- assertNotNull(jc);
- FileSystem fs = has.createFileSystem(user, new URI(getNameNodeUri()), conf);
- assertNotNull(fs);
- fs = has.createFileSystem(user, uri, conf);
- assertNotNull(fs);
+ JobConf conf2 = new JobConf(false);
+ conf2.set("yarn.resourcemanager.address", getJobTrackerUri());
+ try {
+ has.createYarnClient(getTestUser(), conf2);
+ fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
+ }
+ catch (HadoopAccessorException ex) {
+ assertEquals(ErrorCode.E0903, ex.getErrorCode());
+ }
+ }
- //invalid user
+ public void testCreateFileSystem() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ JobConf conf = has.createJobConf(getJobTrackerUri());
- user = "invalid";
+ FileSystem fs = has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf);
+ assertNotNull(fs);
+ fs.exists(new Path(getNameNodeUri(), "/foo"));
try {
- has.createJobClient(user, conf);
- fail();
+ fs = has.createFileSystem("invalid-user", new URI(getNameNodeUri()), conf);
+ assertNotNull(fs);
+ fs.exists(new Path(getNameNodeUri(), "/foo"));
+ fail("Should have thrown exception because not allowed to impersonate 'invalid-user'");
}
- catch (Throwable ex) {
+ catch (RemoteException ex) {
+ assertEquals(AuthorizationException.class.getName(), ex.getClassName());
}
+ JobConf conf2 = new JobConf(false);
+ conf2.set("fs.default.name", getNameNodeUri());
try {
- has.createFileSystem(user, uri, conf);
- fail();
+ has.createFileSystem(getTestUser(), new URI(getNameNodeUri()), conf2);
+ fail("Should have thrown exception because Configuration not created by HadoopAccessorService");
}
- catch (Throwable ex) {
+ catch (HadoopAccessorException ex) {
+ assertEquals(ErrorCode.E0903, ex.getErrorCode());
}
}
@@ -190,7 +232,7 @@ public class TestHadoopAccessorService extends XTestCase {
HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
JobConf jobConf = new JobConf(false);
assertEquals(new Text("oozie mr token"), has.getMRTokenRenewerInternal(jobConf));
- jobConf.set("mapred.job.tracker", "localhost:50300");
+ jobConf.set("yarn.resourcemanager.address", "localhost:50300");
jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_HOST@KDC.DOMAIN.COM");
assertEquals(new Text("mapred/localhost@KDC.DOMAIN.COM"), has.getMRTokenRenewerInternal(jobConf));
jobConf = new JobConf(false);
@@ -298,4 +340,21 @@ public class TestHadoopAccessorService extends XTestCase {
}
has.destroy();
}
+
+ public void testCreateLocalResourceForConfigurationFile() throws Exception {
+ HadoopAccessorService has = Services.get().get(HadoopAccessorService.class);
+ String filename = "foo.xml";
+ Configuration conf = has.createJobConf(getNameNodeUri());
+ conf.set("foo", "bar");
+ LocalResource lRes = has.createLocalResourceForConfigurationFile(filename, getTestUser(), conf, getFileSystem().getUri(),
+ getFsTestCaseDir());
+ assertNotNull(lRes);
+ assertEquals(LocalResourceType.FILE, lRes.getType());
+ assertEquals(LocalResourceVisibility.APPLICATION, lRes.getVisibility());
+ Path resPath = ConverterUtils.getPathFromYarnURL(lRes.getResource());
+ assertEquals(new Path(getFsTestCaseDir(), "foo.xml"), resPath);
+ Configuration conf2 = new Configuration(false);
+ conf2.addResource(getFileSystem().open(resPath));
+ assertEquals("bar", conf2.get("foo"));
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/21761f5b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
index 8fd0c2d..ce04c6d 100644
--- a/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
@@ -21,10 +21,6 @@ package org.apache.oozie.service;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.BundleActionBean;
import org.apache.oozie.BundleJobBean;
import org.apache.oozie.CoordinatorActionBean;
@@ -34,7 +30,7 @@ import org.apache.oozie.DagEngine;
import org.apache.oozie.ForTestingActionExecutor;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
-import org.apache.oozie.action.hadoop.LauncherMapperHelper;
+import org.apache.oozie.action.hadoop.LauncherHelper;
import org.apache.oozie.action.hadoop.MapReduceActionExecutor;
import org.apache.oozie.action.hadoop.MapperReducerForTest;
import org.apache.oozie.client.CoordinatorAction;
@@ -249,24 +245,14 @@ public class TestRecoveryService extends XDataTestCase {
ActionExecutorContext context = new ActionXCommand.ActionExecutorContext(job1, action1, false, false);
MapReduceActionExecutor actionExecutor = new MapReduceActionExecutor();
- JobConf conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
- String user = conf.get("user.name");
- String group = conf.get("group.name");
- JobClient jobClient = Services.get().get(HadoopAccessorService.class).createJobClient(user, conf);
-
+ Configuration conf = actionExecutor.createBaseHadoopConf(context, XmlUtils.parseXml(action1.getConf()));
String launcherId = action1.getExternalId();
- final RunningJob launcherJob = jobClient.getJob(JobID.forName(launcherId));
+ waitUntilYarnAppDoneAndAssertSuccess(launcherId);
- waitFor(240 * 1000, new Predicate() {
- public boolean evaluate() throws Exception {
- return launcherJob.isComplete();
- }
- });
- assertTrue(launcherJob.isSuccessful());
- Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
+ Map<String, String> actionData = LauncherHelper.getActionData(getFileSystem(), context.getActionDir(),
conf);
- assertTrue(LauncherMapperHelper.hasIdSwap(actionData));
+ assertTrue(LauncherHelper.hasIdSwap(actionData));
}
/**
@@ -274,10 +260,8 @@ public class TestRecoveryService extends XDataTestCase {
* @throws Exception
*/
public void testBundleRecoveryCoordCreate() throws Exception {
- final BundleActionBean bundleAction;
- final BundleJobBean bundle;
- bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
- bundleAction = addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
+ final BundleJobBean bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
+ addRecordToBundleActionTable(bundle.getId(), "coord1", 1, Job.Status.PREP);
final JPAService jpaService = Services.get().get(JPAService.class);
sleep(3000);
@@ -290,7 +274,7 @@ public class TestRecoveryService extends XDataTestCase {
jpaService.execute(new BundleActionGetJPAExecutor(bundle.getId(), "coord1"));
try {
if (mybundleAction.getCoordId() != null) {
- CoordinatorJobBean coord = jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
+ jpaService.execute(new CoordJobGetJPAExecutor(mybundleAction.getCoordId()));
return true;
}
} catch (Exception e) {
@@ -345,12 +329,11 @@ public class TestRecoveryService extends XDataTestCase {
* @throws Exception
*/
public void testBundleRecoveryCoordExists() throws Exception {
- final BundleActionBean bundleAction;
final BundleJobBean bundle;
final CoordinatorJob coord;
bundle = addRecordToBundleJobTable(Job.Status.RUNNING, false);
coord = addRecordToCoordJobTable(Job.Status.PREP, false, false);
- bundleAction = addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
+ addRecordToBundleActionTable(bundle.getId(), coord.getId(), "coord1", 1, Job.Status.PREP);
final JPAService jpaService = Services.get().get(JPAService.class);
sleep(3000);