You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by as...@apache.org on 2017/09/18 10:58:30 UTC
[1/2] oozie git commit: OOZIE-2687 Create XML schema for launcher
configurations (asasvari)
Repository: oozie
Updated Branches:
refs/heads/master 69c5091ad -> 8b247f28f
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/service/TestSchemaService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestSchemaService.java b/core/src/test/java/org/apache/oozie/service/TestSchemaService.java
index 88a10db..940868a 100644
--- a/core/src/test/java/org/apache/oozie/service/TestSchemaService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestSchemaService.java
@@ -18,11 +18,8 @@
package org.apache.oozie.service;
-import org.apache.oozie.service.Services;
import org.apache.oozie.service.SchemaService.SchemaName;
import org.apache.oozie.test.XTestCase;
-import org.apache.oozie.util.XmlUtils;
-import org.jdom.Element;
import javax.xml.validation.Validator;
import javax.xml.transform.stream.StreamSource;
@@ -32,26 +29,32 @@ import java.io.StringReader;
public class TestSchemaService extends XTestCase {
private static final String APP1 = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='app'>" +
- "<start to='end'/>" +
- "<end name='end'/>" +
+ " <start to='end'/>" +
+ " <end name='end'/>" +
"</workflow-app>";
private static final String APP_V2 = "<workflow-app xmlns='uri:oozie:workflow:0.2' name='app'>" + "<start to='end'/>"
+ "<end name='end'/>" + "</workflow-app>";
- private static final String APP_V25 = "<workflow-app xmlns='uri:oozie:workflow:0.2.5' name='app'>" + "<credentials></credentials>"+"<start to='end'/>"
- + "<end name='end'/>" + "</workflow-app>";
+ private static final String APP_V25 = "<?xml version=\"1.0\"?>\n" +
+ "<workflow-app xmlns=\"uri:oozie:workflow:0.2.5\" name=\"app\">\n" +
+ " <credentials/>\n" +
+ " <start to=\"end\"/>\n" +
+ " <end name=\"end\"/>\n" +
+ "</workflow-app>";
- private static final String WF_SLA_APP = "<workflow-app xmlns='uri:oozie:workflow:0.2' name='app' xmlns:sla='uri:oozie:sla:0.1'>"
- + "<start to='end'/>"
- + "<end name='end'/>"
- + "<sla:info> <sla:app-name>5</sla:app-name> <sla:nominal-time>2009-03-06T010:00Z</sla:nominal-time> "
- + "<sla:should-start>5</sla:should-start> <sla:should-end>50</sla:should-end> "
+ private static final String WF_SLA_APP = "<workflow-app xmlns='uri:oozie:workflow:0.2' name='app'"
+ + " xmlns:sla='uri:oozie:sla:0.1'>"
+ + " <start to='end'/>"
+ + " <end name='end'/>"
+ + " <sla:info><sla:app-name>5</sla:app-name> <sla:nominal-time>2009-03-06T010:00Z</sla:nominal-time> "
+ + " <sla:should-start>5</sla:should-start> <sla:should-end>50</sla:should-end> "
+ "<sla:alert-contact>abc@example.com</sla:alert-contact> <sla:dev-contact>abc@example.com</sla:dev-contact>"
+ " <sla:qa-contact>abc@example.com</sla:qa-contact> <sla:se-contact>abc@example.com</sla:se-contact>"
+ "</sla:info>" + "</workflow-app>";
- private static final String WF_SLA_APP_NW = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='app' xmlns:sla='uri:oozie:sla:0.1'>"
+ private static final String WF_SLA_APP_NW = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='app'" +
+ " xmlns:sla='uri:oozie:sla:0.1'>"
+ "<start to='end'/>"
+ "<end name='end'/>"
+ "<sla:info> <sla:app-name>5</sla:app-name> <sla:nominal-time>2009-03-06T010:00Z</sla:nominal-time> "
@@ -60,14 +63,6 @@ public class TestSchemaService extends XTestCase {
+ " <sla:qa-contact>abc@example.com</sla:qa-contact> <sla:se-contact>abc@example.com</sla:se-contact>"
+ "</sla:info>" + "</workflow-app>";
- private static final String COORD_APP1 = "<coordinator-app name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"${start}\" end=\"${end}\" timezone=\"${timezone}\" xmlns=\"uri:oozie:coordinator:0.2\">"
- + "<controls> <timeout>${timeout}</timeout> <concurrency>${concurrency_level}</concurrency> <execution>${execution_order}</execution> </controls>"
- + "<datasets> <include>${include_ds_files}</include> <!-- Synchronous datasets --> <dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"${start}\" timezone=\"${timezone}\"> "
- + "<uri-template>${baseFsURI}/${YEAR}/${DAY}</uri-template> </dataset> </datasets> "
- + "<input-events> <data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> <data-in name=\"B\" dataset=\"b\"> <start-instance>${coord:current(-2)}</start-instance> <end-instance>${coord:current(0)}</end-instance> </data-in> </input-events> <output-events> "
- + "<data-out name=\"LOCAL_A\" dataset=\"local_a\"> <instance>${coord:current(0)}</instance> </data-out> </output-events> <action> <workflow> <app-path>${app_path}</app-path> <configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> <property> <name>inputB</name> <value>${coord:dataIn('B')}</value> </property> <property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> </property> <property> <name>TESTING</name> <value>${start}</value> </property> </configuration> </workflow> </action> </coordinator-app>";
-
-
private static final String APP2 = "<workflow-app xmlns='uri:oozie:workflow:0.1' name='app'>" +
"<start to='a'/>" +
"<action name='a'>" +
@@ -101,10 +96,230 @@ public class TestSchemaService extends XTestCase {
"<end name='end'/>" +
"</workflow-app>";
+ private static final String WF_GLOBAL_LAUNCHER_CONF = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"test-wf\">\n" +
+ " <global>\n" +
+ " <launcher>\n" +
+ " <memory.mb>1024</memory.mb>\n" +
+ " <vcores>2</vcores>\n" +
+ " <java-opts>dummyJavaOpts</java-opts>\n" +
+ " <env>dummyEnv</env>\n" +
+ " <queue>dummyQueue</queue>\n" +
+ " <sharelib>a,b,c</sharelib>\n" +
+ " </launcher>\n" +
+ " </global>\n" +
+ " <start to=\"a\"/>\n" +
+ " <action name=\"a\">\n" +
+ " <fs>\n" +
+ " <mkdir path='/tmp'/>\n" +
+ " </fs>\n" +
+ " <ok to=\"e\"/>\n" +
+ " <error to=\"k\"/>\n" +
+ " </action>\n" +
+ " <kill name=\"k\">\n" +
+ " <message>kill</message>\n" +
+ " </kill>\n" +
+ " <end name=\"e\"/>\n" +
+ "</workflow-app>\n";
+
+ private static final String HIVE_ACTION_LAUNCHER_CONF =
+ "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"hive-wf\">\n" +
+ " <start to=\"hive-node\"/>\n" +
+ " <action name=\"hive-node\">\n" +
+ " <hive xmlns=\"uri:oozie:hive-action:1.0\">\n" +
+ " <job-tracker>${jobTracker}</job-tracker>\n" +
+ " <name-node>${nameNode}</name-node>\n" +
+ " <prepare>\n" +
+ " <delete path=\"${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/hive\"/>\n" +
+ " <mkdir path=\"${nameNode}/user/${wf:user()}/${examplesRoot}/output-data\"/>\n" +
+ " </prepare>\n" +
+ " <launcher>\n" +
+ " <memory.mb>1024</memory.mb>\n" +
+ " <vcores>2</vcores>\n" +
+ " <java-opts>dummyJavaOpts</java-opts>\n" +
+ " <env>dummyEnv</env>\n" +
+ " <queue>dummyQueue</queue>\n" +
+ " <sharelib>a,b,c</sharelib>\n" +
+ " </launcher>\n" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>mapred.job.queue.name</name>\n" +
+ " <value>${queueName}</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " <script>script.q</script>\n" +
+ " <param>INPUT=/user/${wf:user()}/${examplesRoot}/input-data/table</param>\n" +
+ " <param>OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/hive</param>\n" +
+ " </hive>\n" +
+ " <ok to=\"end\"/>\n" +
+ " <error to=\"fail\"/>\n" +
+ " </action>\n" +
+ " <kill name=\"fail\">\n" +
+ " <message>Hive failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>\n" +
+ " </kill>\n" +
+ " <end name=\"end\"/>\n" +
+ "</workflow-app>\n";
+
+ private static final String HIVE2_ACTION_LAUNCHER_CONF =
+ "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"hive2-wf\">\n" +
+ " <start to=\"hive2-node\"/>\n" +
+ " <action name=\"hive2-node\">\n" +
+ " <hive2 xmlns=\"uri:oozie:hive2-action:1.0\">\n" +
+ " <job-tracker>${jobTracker}</job-tracker>\n" +
+ " <name-node>${nameNode}</name-node>\n" +
+ " <prepare>\n" +
+ " <delete path=\"${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/hive2\"/>\n" +
+ " <mkdir path=\"${nameNode}/user/${wf:user()}/${examplesRoot}/output-data\"/>\n" +
+ " </prepare>\n" +
+ " <launcher>\n" +
+ " <memory.mb>1024</memory.mb>\n" +
+ " <vcores>2</vcores>\n" +
+ " <java-opts>dummyJavaOpts</java-opts>\n" +
+ " <env>dummyEnv</env>\n" +
+ " <queue>dummyQueue</queue>\n" +
+ " <sharelib>a,b,c</sharelib>\n" +
+ " </launcher>\n" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>mapred.job.queue.name</name>\n" +
+ " <value>${queueName}</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " <jdbc-url>${jdbcURL}</jdbc-url>\n" +
+ " <script>script.q</script>\n" +
+ " <param>INPUT=/user/${wf:user()}/${examplesRoot}/input-data/table</param>\n" +
+ " <param>OUTPUT=/user/${wf:user()}/${examplesRoot}/output-data/hive2</param>\n" +
+ " </hive2>\n" +
+ " <ok to=\"end\"/>\n" +
+ " <error to=\"fail\"/>\n" +
+ " </action>\n" +
+ " <kill name=\"fail\">\n" +
+ " <message>Hive2 (Beeline) action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>\n" +
+ " </kill>\n" +
+ " <end name=\"end\"/>\n" +
+ "</workflow-app>\n";
+
+ private static final String SHELL_ACTION_LAUNCHER_CONF = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" " +
+ " name=\"shell-wf\">\n" +
+ " <start to=\"shell-node\"/>\n" +
+ " <action name=\"shell-node\">\n" +
+ " <shell xmlns=\"uri:oozie:shell-action:1.0\">\n" +
+ " <job-tracker>${jobTracker}</job-tracker>\n" +
+ " <name-node>${nameNode}</name-node>\n" +
+ " <launcher>\n" +
+ " <memory.mb>1024</memory.mb>\n" +
+ " </launcher>\n" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>mapred.job.queue.name</name>\n" +
+ " <value>${queueName}</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " <exec>echo</exec>\n" +
+ " <argument>my_output=Hello Oozie</argument>\n" +
+ " <capture-output/>\n" +
+ " </shell>\n" +
+ " <ok to=\"check-output\"/>\n" +
+ " <error to=\"fail\"/>\n" +
+ " </action>\n" +
+ " <decision name=\"check-output\">\n" +
+ " <switch>\n" +
+ " <case to=\"end\">\n" +
+ " ${wf:actionData('shell-node')['my_output'] eq 'Hello Oozie'}\n" +
+ " </case>\n" +
+ " <default to=\"fail-output\"/>\n" +
+ " </switch>\n" +
+ " </decision>\n" +
+ " <kill name=\"fail\">\n" +
+ " <message>Shell action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>\n" +
+ " </kill>\n" +
+ " <kill name=\"fail-output\">\n" +
+ " <message>Incorrect output</message>\n" +
+ " </kill>\n" +
+ " <end name=\"end\"/>\n" +
+ "</workflow-app>";
+
+ private static final String SQQP_ACTION_LAUNCHER_CONF = "<workflow-app xmlns=\"uri:oozie:workflow:1.0\" name=\"sqoop-wf\">\n" +
+ " <start to=\"sqoop-node\"/>\n" +
+ " <action name=\"sqoop-node\">\n" +
+ " <sqoop xmlns=\"uri:oozie:sqoop-action:1.0\">\n" +
+ " <job-tracker>${jobTracker}</job-tracker>\n" +
+ " <name-node>${nameNode}</name-node>\n" +
+ " <prepare>\n" +
+ " <delete path=\"${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/sqoop\"/>\n" +
+ " <mkdir path=\"${nameNode}/user/${wf:user()}/${examplesRoot}/output-data\"/>\n" +
+ " </prepare>\n" +
+ " <launcher>\n" +
+ " <memory.mb>1024</memory.mb>\n" +
+ " </launcher>\n" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>mapred.job.queue.name</name>\n" +
+ " <value>${queueName}</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " <command>import --connect jdbc:hsqldb:file:db.hsqldb --table TT --target-dir " +
+ "/user/${wf:user()}/${examplesRoot}/output-data/sqoop -m 1</command>\n" +
+ " <file>db.hsqldb.properties#db.hsqldb.properties</file>\n" +
+ " <file>db.hsqldb.script#db.hsqldb.script</file>\n" +
+ " </sqoop>\n" +
+ " <ok to=\"end\"/>\n" +
+ " <error to=\"fail\"/>\n" +
+ " </action>\n" +
+ " <kill name=\"fail\">\n" +
+ " <message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>\n" +
+ " </kill>\n" +
+ " <end name=\"end\"/>\n" +
+ "</workflow-app>";
+
+ private static final String SPARK_ACTION_LAUNCHER_CONF = "<workflow-app xmlns='uri:oozie:workflow:1.0' " +
+ "name='SparkFileCopy'>\n" +
+ " <start to='spark-node' />\n" +
+ " <action name='spark-node'>\n" +
+ " <spark xmlns=\"uri:oozie:spark-action:1.0\">\n" +
+ " <job-tracker>${jobTracker}</job-tracker>\n" +
+ " <name-node>${nameNode}</name-node>\n" +
+ " <prepare>\n" +
+ " <delete path=\"${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark\"/>\n" +
+ " </prepare>\n" +
+ " <launcher>\n" +
+ " <memory.mb>1024</memory.mb>\n" +
+ " <vcores>2</vcores>\n" +
+ " <java-opts>dummyJavaOpts</java-opts>\n" +
+ " <env>dummyEnv</env>\n" +
+ " <queue>dummyQueue</queue>\n" +
+ " <sharelib>a,b,c</sharelib>\n" +
+ " </launcher>\n" +
+ " <master>${master}</master>\n" +
+ " <name>Spark-FileCopy</name>\n" +
+ " <class>org.apache.oozie.example.SparkFileCopy</class>\n" +
+ " <jar>${nameNode}/user/${wf:user()}/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar>\n" +
+ " <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data/text/data.txt</arg>\n" +
+ " <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark</arg>\n" +
+ " </spark>\n" +
+ " <ok to=\"end\" />\n" +
+ " <error to=\"fail\" />\n" +
+ " </action>\n" +
+ " <kill name=\"fail\">\n" +
+ " <message>Workflow failed, error\n" +
+ " message[${wf:errorMessage(wf:lastErrorNode())}]\n" +
+ " </message>\n" +
+ " </kill>\n" +
+ " <end name='end' />\n" +
+ "</workflow-app>\n";
+
+ private SchemaService wss;
+ private Validator workflowValidator;
+ private Validator coordinatorValidator;
+ private Validator bundleValidator;
+
@Override
protected void setUp() throws Exception {
super.setUp();
new Services().init();
+ wss = Services.get().get(SchemaService.class);
+ workflowValidator = wss.getSchema(SchemaName.WORKFLOW).newValidator();
+ coordinatorValidator = wss.getSchema(SchemaName.COORDINATOR).newValidator();
+ bundleValidator = wss.getSchema(SchemaName.BUNDLE).newValidator();
}
@Override
@@ -118,27 +333,19 @@ public class TestSchemaService extends XTestCase {
}
public void testWfSchema() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.WORKFLOW).newValidator();
- validator.validate(new StreamSource(new StringReader(APP1)));
+ workflowValidator.validate(new StreamSource(new StringReader(APP1)));
}
public void testWfMultipleJavaOpts() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.WORKFLOW).newValidator();
- validator.validate(new StreamSource(new StringReader(WF_4_MULTIPLE_JAVA_OPTS)));
+ workflowValidator.validate(new StreamSource(new StringReader(WF_4_MULTIPLE_JAVA_OPTS)));
}
public void testWfSchemaV2() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.WORKFLOW).newValidator();
- validator.validate(new StreamSource(new StringReader(APP_V2)));
+ workflowValidator.validate(new StreamSource(new StringReader(APP_V2)));
}
public void testWfSchemaV25() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.WORKFLOW).newValidator();
- validator.validate(new StreamSource(new StringReader(APP_V25)));
+ workflowValidator.validate(new StreamSource(new StringReader(APP_V25)));
}
public void testExtSchema() throws Exception {
@@ -151,16 +358,12 @@ public class TestSchemaService extends XTestCase {
}
public void testWfSLASchema() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.WORKFLOW).newValidator();
- validator.validate(new StreamSource(new StringReader(WF_SLA_APP)));
+ workflowValidator.validate(new StreamSource(new StringReader(WF_SLA_APP)));
}
public void testWfSLASchemaNW() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.WORKFLOW).newValidator();
try {
- validator.validate(new StreamSource(new StringReader(WF_SLA_APP_NW)));
+ workflowValidator.validate(new StreamSource(new StringReader(WF_SLA_APP_NW)));
fail("Schema service check does not work");
}
catch (Exception ex) {
@@ -169,59 +372,199 @@ public class TestSchemaService extends XTestCase {
}
public void testCoordSchema() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.COORDINATOR).newValidator();
- String COORD_APP1 = "<coordinator-app name='NAME' frequency='${coord:days(1)}' start='2009-02-01T01:00Z' end='2009-02-03T23:59Z' timezone='UTC' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='uri:oozie:coordinator:0.1' xmlns:sla='uri:oozie:sla:0.1'> "
- + "<controls> <timeout>10</timeout> <concurrency>2</concurrency> <execution>LIFO</execution> </controls> <datasets> <dataset name='a' frequency='${coord:days(7)}' initial-instance='2009-02-01T01:00Z' timezone='UTC'> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> <dataset name='local_a' frequency='${coord:days(7)}' initial-instance='2009-02-01T01:00Z' timezone='UTC'> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> </datasets> <input-events> <data-in name='A' dataset='a'> <instance>${coord:latest(0)}</instance> </data-in> </input-events> <output-events> <data-out name='LOCAL_A' dataset='local_a'> <instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> <configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> <property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> </p
roperty></configuration> </workflow> "
- + "</action> </coordinator-app>";
-
- Element e = XmlUtils.parseXml(COORD_APP1);
- //System.out.println("XML :"+ XmlUtils.prettyPrint(e));
- validator.validate(new StreamSource(new StringReader(COORD_APP1)));
+ String COORD_APP1 = "<?xml version=\"1.0\"?>\n" +
+ "<coordinator-app xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns=\"uri:oozie:coordinator:0.1\"" +
+ " xmlns:sla=\"uri:oozie:sla:0.1\" name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"2009-02-01T01:00Z\"" +
+ " end=\"2009-02-03T23:59Z\" timezone=\"UTC\">\n" +
+ " <controls>\n" +
+ " <timeout>10</timeout>\n" +
+ " <concurrency>2</concurrency>\n" +
+ " <execution>LIFO</execution>\n" +
+ " </controls>\n" +
+ " <datasets>\n" +
+ " <dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\"" +
+ " timezone=\"UTC\">\n" +
+ " <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>\n" +
+ " </dataset>\n" +
+ " <dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\"" +
+ " timezone=\"UTC\">\n" +
+ " <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>\n" +
+ " </dataset>\n" +
+ " </datasets>\n" +
+ " <input-events>\n" +
+ " <data-in name=\"A\" dataset=\"a\">\n" +
+ " <instance>${coord:latest(0)}</instance>\n" +
+ " </data-in>\n" +
+ " </input-events>\n" +
+ " <output-events>\n" +
+ " <data-out name=\"LOCAL_A\" dataset=\"local_a\">\n" +
+ " <instance>${coord:current(-1)}</instance>\n" +
+ " </data-out>\n" +
+ " </output-events>\n" +
+ " <action>\n" +
+ " <workflow>\n" +
+ " <app-path>hdfs:///tmp/workflows/</app-path>\n" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>inputA</name>\n" +
+ " <value>${coord:dataIn('A')}</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>inputB</name>\n" +
+ " <value>${coord:dataOut('LOCAL_A')}</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " </workflow>\n" +
+ " </action>\n" +
+ "</coordinator-app>";
+
+ coordinatorValidator.validate(new StreamSource(new StringReader(COORD_APP1)));
}
public void testCoordSchema2() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.COORDINATOR).newValidator();
- String COORD_APP1 = "<coordinator-app name='NAME' frequency='${coord:days(1)}' start='2009-02-01T01:00Z' end='2009-02-03T23:59Z' timezone='UTC' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='uri:oozie:coordinator:0.2' xmlns:sla='uri:oozie:sla:0.1'> "
- + "<controls> <timeout>10</timeout> <concurrency>2</concurrency> <execution>LIFO</execution> <throttle>3</throttle></controls> <datasets> <dataset name='a' frequency='${coord:days(7)}' initial-instance='2009-02-01T01:00Z' timezone='UTC'> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> <dataset name='local_a' frequency='${coord:days(7)}' initial-instance='2009-02-01T01:00Z' timezone='UTC'> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> </datasets> <input-events> <data-in name='A' dataset='a'> <instance>${coord:latest(0)}</instance> </data-in> </input-events> <output-events> <data-out name='LOCAL_A' dataset='local_a'> <instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> <configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> <property> <name>inputB</name> <value>${coord:dataOut(
'LOCAL_A')}</value> </property></configuration> </workflow> "
- + "</action> </coordinator-app>";
-
- Element e = XmlUtils.parseXml(COORD_APP1);
- //System.out.println("XML :"+ XmlUtils.prettyPrint(e));
- validator.validate(new StreamSource(new StringReader(COORD_APP1)));
+ String COORD_APP1 = "<coordinator-app xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"" +
+ " xmlns=\"uri:oozie:coordinator:0.2\" xmlns:sla=\"uri:oozie:sla:0.1\" name=\"NAME\"" +
+ " frequency=\"${coord:days(1)}\" start=\"2009-02-01T01:00Z\" end=\"2009-02-03T23:59Z\" timezone=\"UTC\">\n" +
+ " <controls>\n" +
+ " <timeout>10</timeout>\n" +
+ " <concurrency>2</concurrency>\n" +
+ " <execution>LIFO</execution>\n" +
+ " <throttle>3</throttle>\n" +
+ " </controls>\n" +
+ " <datasets>\n" +
+ " <dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\"" +
+ " timezone=\"UTC\">\n" +
+ " <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>\n" +
+ " </dataset>\n" +
+ " <dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\"" +
+ " timezone=\"UTC\">\n" +
+ " <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>\n" +
+ " </dataset>\n" +
+ " </datasets>\n" +
+ " <input-events>\n" +
+ " <data-in name=\"A\" dataset=\"a\">\n" +
+ " <instance>${coord:latest(0)}</instance>\n" +
+ " </data-in>\n" +
+ " </input-events>\n" +
+ " <output-events>\n" +
+ " <data-out name=\"LOCAL_A\" dataset=\"local_a\">\n" +
+ " <instance>${coord:current(-1)}</instance>\n" +
+ " </data-out>\n" +
+ " </output-events>\n" +
+ " <action>\n" +
+ " <workflow>\n" +
+ " <app-path>hdfs:///tmp/workflows/</app-path>\n" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>inputA</name>\n" +
+ " <value>${coord:dataIn('A')}</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>inputB</name>\n" +
+ " <value>${coord:dataOut('LOCAL_A')}</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " </workflow>\n" +
+ " </action>\n" +
+ "</coordinator-app>";
+
+ coordinatorValidator.validate(new StreamSource(new StringReader(COORD_APP1)));
}
public void testCoordSLASchema() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.COORDINATOR)
- .newValidator();
- String COORD_APP1 = "<coordinator-app name='NAME' frequency='${coord:days(1)}' start='2009-02-01T01:00Z' end='2009-02-03T23:59Z' timezone='UTC' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='uri:oozie:coordinator:0.2' xmlns:sla='uri:oozie:sla:0.1'> "
- + "<controls> <timeout>10</timeout> <concurrency>2</concurrency> <execution>LIFO</execution> </controls> <datasets> <dataset name='a' frequency='${coord:days(7)}' initial-instance='2009-02-01T01:00Z' timezone='UTC'> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> <dataset name='local_a' frequency='${coord:days(7)}' initial-instance='2009-02-01T01:00Z' timezone='UTC'> <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template> </dataset> </datasets> <input-events> <data-in name='A' dataset='a'> <instance>${coord:latest(0)}</instance> </data-in> </input-events> <output-events> <data-out name='LOCAL_A' dataset='local_a'> <instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> <app-path>hdfs:///tmp/workflows/</app-path> <configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> <property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> </p
roperty></configuration> </workflow> "
- + "<sla:info> <sla:app-name>5</sla:app-name> <sla:nominal-time>2009-03-06T010:00Z</sla:nominal-time> "
- + "<sla:should-start>5</sla:should-start> <sla:should-end>50</sla:should-end> "
- + "<sla:alert-contact>abc@example.com</sla:alert-contact> <sla:dev-contact>abc@example.com</sla:dev-contact>"
- + " <sla:qa-contact>abc@example.com</sla:qa-contact> <sla:se-contact>abc@example.com</sla:se-contact>"
- + "</sla:info>" + "</action> </coordinator-app>";
-
- Element e = XmlUtils.parseXml(COORD_APP1);
- // System.out.println("XML :"+ XmlUtils.prettyPrint(e));
- validator.validate(new StreamSource(new StringReader(COORD_APP1)));
+ String COORD_APP1 = "<?xml version=\"1.0\"?>\n" +
+ "<coordinator-app xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\" xmlns=\"uri:oozie:coordinator:0.2\"" +
+ " xmlns:sla=\"uri:oozie:sla:0.1\" name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"2009-02-01T01:00Z\"" +
+ " end=\"2009-02-03T23:59Z\" timezone=\"UTC\">\n" +
+ " <controls>\n" +
+ " <timeout>10</timeout>\n" +
+ " <concurrency>2</concurrency>\n" +
+ " <execution>LIFO</execution>\n" +
+ " </controls>\n" +
+ " <datasets>\n" +
+ " <dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\"" +
+ " timezone=\"UTC\">\n" +
+ " <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>\n" +
+ " </dataset>\n" +
+ " <dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\"" +
+ " timezone=\"UTC\">\n" +
+ " <uri-template>file:///tmp/coord/workflows/${YEAR}/${DAY}</uri-template>\n" +
+ " </dataset>\n" +
+ " </datasets>\n" +
+ " <input-events>\n" +
+ " <data-in name=\"A\" dataset=\"a\">\n" +
+ " <instance>${coord:latest(0)}</instance>\n" +
+ " </data-in>\n" +
+ " </input-events>\n" +
+ " <output-events>\n" +
+ " <data-out name=\"LOCAL_A\" dataset=\"local_a\">\n" +
+ " <instance>${coord:current(-1)}</instance>\n" +
+ " </data-out>\n" +
+ " </output-events>\n" +
+ " <action>\n" +
+ " <workflow>\n" +
+ " <app-path>hdfs:///tmp/workflows/</app-path>\n" +
+ " <configuration>\n" +
+ " <property>\n" +
+ " <name>inputA</name>\n" +
+ " <value>${coord:dataIn('A')}</value>\n" +
+ " </property>\n" +
+ " <property>\n" +
+ " <name>inputB</name>\n" +
+ " <value>${coord:dataOut('LOCAL_A')}</value>\n" +
+ " </property>\n" +
+ " </configuration>\n" +
+ " </workflow>\n" +
+ " <sla:info>\n" +
+ " <sla:app-name>5</sla:app-name>\n" +
+ " <sla:nominal-time>2009-03-06T010:00Z</sla:nominal-time>\n" +
+ " <sla:should-start>5</sla:should-start>\n" +
+ " <sla:should-end>50</sla:should-end>\n" +
+ " <sla:alert-contact>abc@example.com</sla:alert-contact>\n" +
+ " <sla:dev-contact>abc@example.com</sla:dev-contact>\n" +
+ " <sla:qa-contact>abc@example.com</sla:qa-contact>\n" +
+ " <sla:se-contact>abc@example.com</sla:se-contact>\n" +
+ " </sla:info>\n" +
+ " </action>\n" +
+ "</coordinator-app>";
+
+ coordinatorValidator.validate(new StreamSource(new StringReader(COORD_APP1)));
}
public void testBundleSchema() throws Exception {
- SchemaService wss = Services.get().get(SchemaService.class);
- Validator validator = wss.getSchema(SchemaName.BUNDLE).newValidator();
- String BUNDLE_APP = "<bundle-app name='NAME' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' xmlns='uri:oozie:bundle:0.1'> "
+ String BUNDLE_APP = "<bundle-app name='NAME' xmlns:xsi='http://www.w3.org/2001/XMLSchema-instance' "
+ + " xmlns='uri:oozie:bundle:0.1'> "
+ "<controls> <kick-off-time>2009-02-02T00:00Z</kick-off-time> </controls> "
+ "<coordinator name='c12'> "
+ "<app-path>hdfs://localhost:9001/tmp/bundle-apps/coordinator1.xml</app-path>"
+ "<configuration> "
+ "<property> <name>START_TIME</name> <value>2009-02-01T00:00Z</value> </property> </configuration> "
+ "</coordinator></bundle-app>";
- Element e = XmlUtils.parseXml(BUNDLE_APP);
- // System.out.println("XML :"+ XmlUtils.prettyPrint(e));
- validator.validate(new StreamSource(new StringReader(BUNDLE_APP)));
+
+ bundleValidator.validate(new StreamSource(new StringReader(BUNDLE_APP)));
}
+ public void testWfLauncherConfig() throws Exception {
+ workflowValidator.validate(new StreamSource(new StringReader(WF_GLOBAL_LAUNCHER_CONF)));
+ }
+
+ public void testHiveActionLauncherConfig() throws Exception {
+ workflowValidator.validate(new StreamSource(new StringReader(HIVE_ACTION_LAUNCHER_CONF)));
+ }
+
+ public void testHive2ActionLauncherConfig() throws Exception {
+ workflowValidator.validate(new StreamSource(new StringReader(HIVE2_ACTION_LAUNCHER_CONF)));
+ }
+
+ public void testShellActionLauncherConfig() throws Exception {
+ workflowValidator.validate(new StreamSource(new StringReader(SHELL_ACTION_LAUNCHER_CONF)));
+ }
+
+ public void testSqoopActionLauncherConfig() throws Exception {
+ workflowValidator.validate(new StreamSource(new StringReader(SQQP_ACTION_LAUNCHER_CONF)));
+ }
+
+ public void testSparkActionLauncherConfig() throws Exception {
+ workflowValidator.validate(new StreamSource(new StringReader(SPARK_ACTION_LAUNCHER_CONF)));
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/util/TestMetricsInstrumentation.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/util/TestMetricsInstrumentation.java b/core/src/test/java/org/apache/oozie/util/TestMetricsInstrumentation.java
index a882c82..ab97443 100644
--- a/core/src/test/java/org/apache/oozie/util/TestMetricsInstrumentation.java
+++ b/core/src/test/java/org/apache/oozie/util/TestMetricsInstrumentation.java
@@ -228,11 +228,11 @@ public class TestMetricsInstrumentation extends XTestCase {
//Setting the id of the VM unique, so we can find it.
String uniqueId = UUID.randomUUID().toString();
- System.setProperty("process.unique.id", uniqueId);
+ System.setProperty("processSettings.unique.id", uniqueId);
//Finding our own VM by the id.
for(VirtualMachineDescriptor d : VirtualMachine.list()) {
- String remoteUniqueId = VirtualMachine.attach(d).getSystemProperties().getProperty("process.unique.id");
+ String remoteUniqueId = VirtualMachine.attach(d).getSystemProperties().getProperty("processSettings.unique.id");
if(remoteUniqueId != null && remoteUniqueId.equals(uniqueId))
{
descriptor = d;
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
index 2133240..a361078 100644
--- a/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
+++ b/core/src/test/java/org/apache/oozie/workflow/lite/TestLiteWorkflowAppParser.java
@@ -25,15 +25,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
+import org.apache.oozie.action.hadoop.LauncherAM;
import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.LiteWorkflowStoreService;
import org.apache.oozie.service.SchemaService;
import org.apache.oozie.service.Services;
import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
import org.apache.oozie.workflow.WorkflowException;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestActionNodeHandler;
import org.apache.oozie.workflow.lite.TestLiteWorkflowLib.TestDecisionNodeHandler;
+import org.jdom.Element;
+import org.jdom.Namespace;
public class TestLiteWorkflowAppParser extends XTestCase {
public static String dummyConf = "<java></java>";
@@ -652,6 +657,42 @@ public class TestLiteWorkflowAppParser extends XTestCase {
}
}
+ public void testParserGlobalLauncherAM() throws Exception {
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp workflowApp = parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-global-launcherconf.xml", -1), new Configuration());
+
+ XConfiguration xconf = extractConfig(workflowApp, "action1");
+
+ assertEquals("Vcores", 2, xconf.getInt(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY, Integer.MIN_VALUE));
+ assertEquals("Memory", 1024, xconf.getInt(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY, Integer.MIN_VALUE));
+ assertEquals("Env", "dummyEnv", xconf.get(LauncherAM.OOZIE_LAUNCHER_ENV_PROPERTY));
+ assertEquals("Queue", "dummyQueue", xconf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY));
+ assertEquals("Java opts", "dummyJavaOpts", xconf.get(LauncherAM.OOZIE_LAUNCHER_JAVAOPTS_PROPERTY));
+ assertEquals("Sharelib", "a,b,c", xconf.get(LauncherAM.OOZIE_LAUNCHER_SHARELIB_PROPERTY));
+ }
+
+ public void testParserGlobalLauncherAMOverridden() throws Exception {
+ LiteWorkflowAppParser parser = new LiteWorkflowAppParser(null,
+ LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ LiteWorkflowStoreService.LiteDecisionHandler.class,
+ LiteWorkflowStoreService.LiteActionHandler.class);
+
+ LiteWorkflowApp workflowApp = parser.validateAndParse(IOUtils.getResourceAsReader("wf-schema-global-launcherconf-override.xml", -1), new Configuration());
+
+ XConfiguration xconf = extractConfig(workflowApp, "a");
+
+ assertEquals("Vcores", 1, xconf.getInt(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY, Integer.MIN_VALUE));
+ assertEquals("Memory", 2048, xconf.getInt(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY, Integer.MIN_VALUE));
+ assertEquals("Java opts", "dummyJavaOpts", xconf.get(LauncherAM.OOZIE_LAUNCHER_JAVAOPTS_PROPERTY));
+ assertNull("Queue", xconf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY));
+ assertNull("Env", xconf.get(LauncherAM.OOZIE_LAUNCHER_ENV_PROPERTY));
+ assertNull("Sharelib", xconf.get(LauncherAM.OOZIE_LAUNCHER_SHARELIB_PROPERTY));
+ }
+
/*
* 1->ok->2
* 2->ok->end
@@ -1604,4 +1645,14 @@ public class TestLiteWorkflowAppParser extends XTestCase {
assertEquals(app.getNode("retry").getUserRetryInterval(), "10");
}
+ private XConfiguration extractConfig(LiteWorkflowApp app, String actionNode) throws Exception {
+ String confXML = app.getNode(actionNode).getConf();
+ Element confElement = XmlUtils.parseXml(confXML);
+ Namespace ns = confElement.getNamespace();
+ String configSection = XmlUtils.prettyPrint(confElement.getChild("configuration", ns)).toString();
+ XConfiguration xconf = new XConfiguration(new StringReader(configSection));
+
+ return xconf;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index db18f30..efccc34 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
<jline.version>0.9.94</jline.version>
<openjpa.version>2.4.2</openjpa.version>
- <xerces.version>2.10.0</xerces.version>
+ <xerces.version>2.11.0</xerces.version>
<curator.version>2.5.0</curator.version>
<jackson.version>1.9.13</jackson.version>
<log4j.version>1.2.17</log4j.version>
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 1fb1a1d..0a4b600 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.0.0 release (trunk - unreleased)
+OOZIE-2687 Create XML schema for launcher configurations (asasvari)
OOZIE-3041 TestWorkflowActionRetryInfoXCommand fails in oozie core module (andras.piros via gezapeti)
OOZIE-2916 Set a job name for the MR Action's child job (asasvari)
OOZIE-2858 HiveMain, ShellMain and SparkMain should not overwrite properties and config files locally (gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
----------------------------------------------------------------------
diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
index 6a98d6e..3e6ffc9 100644
--- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
+++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/LauncherAM.java
@@ -49,6 +49,14 @@ public class LauncherAM {
public static final String OOZIE_ACTION_CONF_XML = "oozie.action.conf.xml";
public static final String OOZIE_LAUNCHER_JOB_ID = "oozie.launcher.job.id";
+ public static final String OOZIE_LAUNCHER_VCORES_PROPERTY = "oozie.launcher.vcores";
+ public static final String OOZIE_LAUNCHER_MEMORY_MB_PROPERTY = "oozie.launcher.memory.mb";
+ public static final String OOZIE_LAUNCHER_PRIORITY_PROPERTY = "oozie.launcher.priority";
+ public static final String OOZIE_LAUNCHER_QUEUE_PROPERTY = "oozie.launcher.queue";
+ public static final String OOZIE_LAUNCHER_JAVAOPTS_PROPERTY = "oozie.launcher.javaopts";
+ public static final String OOZIE_LAUNCHER_ENV_PROPERTY = "oozie.launcher.env";
+ public static final String OOZIE_LAUNCHER_SHARELIB_PROPERTY = "oozie.launcher.sharelib";
+
public static final String JAVA_CLASS_PATH = "java.class.path";
public static final String OOZIE_ACTION_ID = "oozie.action.id";
public static final String OOZIE_JOB_ID = "oozie.job.id";
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/UDFTester.java
----------------------------------------------------------------------
diff --git a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/UDFTester.java b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/UDFTester.java
index ff999ff..c0e2ec3 100644
--- a/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/UDFTester.java
+++ b/sharelib/pig/src/test/java/org/apache/oozie/action/hadoop/UDFTester.java
@@ -39,7 +39,7 @@ public class UDFTester extends EvalFunc<String> {
return query.toLowerCase().trim();
}
catch (Exception e) {
- System.err.println("ToLower: failed to process input; error - " + e.getMessage());
+ System.err.println("ToLower: failed to processSettings input; error - " + e.getMessage());
return null;
}
}
[2/2] oozie git commit: OOZIE-2687 Create XML schema for launcher
configurations (asasvari)
Posted by as...@apache.org.
OOZIE-2687 Create XML schema for launcher configurations (asasvari)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/8b247f28
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/8b247f28
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/8b247f28
Branch: refs/heads/master
Commit: 8b247f28f0496af3f217ec4becb3f5f931f08511
Parents: 69c5091
Author: Attila Sasvari <as...@cloudera.com>
Authored: Mon Sep 18 12:58:01 2017 +0200
Committer: Attila Sasvari <as...@cloudera.com>
Committed: Mon Sep 18 12:58:01 2017 +0200
----------------------------------------------------------------------
.../oozie/action/hadoop/JavaActionExecutor.java | 325 ++++++++----
.../action/hadoop/SqoopActionExecutor.java | 2 +-
.../oozie/action/ssh/SshActionExecutor.java | 10 +-
.../org/apache/oozie/jms/MessageReceiver.java | 2 +-
.../oozie/service/LiteWorkflowStoreService.java | 1 -
.../org/apache/oozie/service/SchemaService.java | 9 +-
.../org/apache/oozie/util/WritableUtils.java | 7 +
.../workflow/lite/LiteWorkflowAppParser.java | 30 +-
.../oozie/workflow/lite/LiteWorkflowLib.java | 2 -
core/src/main/resources/oozie-default.xml | 43 +-
.../action/hadoop/ActionExecutorTestCase.java | 39 +-
.../oozie/action/hadoop/LauncherMainTester.java | 9 +
.../action/hadoop/TestJavaActionExecutor.java | 212 +++++++-
.../apache/oozie/service/TestSchemaService.java | 497 ++++++++++++++++---
.../oozie/util/TestMetricsInstrumentation.java | 4 +-
.../lite/TestLiteWorkflowAppParser.java | 51 ++
pom.xml | 2 +-
release-log.txt | 1 +
.../apache/oozie/action/hadoop/LauncherAM.java | 8 +
.../apache/oozie/action/hadoop/UDFTester.java | 2 +-
20 files changed, 1042 insertions(+), 214 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index 49fd4b8..9d1afb5 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -22,6 +22,28 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Closeables;
import com.google.common.primitives.Ints;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.StringReader;
+import java.net.ConnectException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
@@ -41,6 +63,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -98,8 +121,6 @@ import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -107,12 +128,18 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Objects;
+import java.util.Properties;
import java.util.Properties;
import java.util.Set;
+import java.util.Set;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Closeables;
public class JavaActionExecutor extends ActionExecutor {
-
public static final String RUNNING = "RUNNING";
public static final String SUCCEEDED = "SUCCEEDED";
public static final String KILLED = "KILLED";
@@ -122,6 +149,11 @@ public class JavaActionExecutor extends ActionExecutor {
public static final String HADOOP_NAME_NODE = "fs.default.name";
public static final String OOZIE_COMMON_LIBDIR = "oozie";
+ public static final String DEFAULT_LAUNCHER_VCORES = "oozie.launcher.default.vcores";
+ public static final String DEFAULT_LAUNCHER_MEMORY_MB = "oozie.launcher.default.memory.mb";
+ public static final String DEFAULT_LAUNCHER_PRIORITY = "oozie.launcher.default.priority";
+ public static final String DEFAULT_LAUNCHER_QUEUE = "oozie.launcher.default.queue";
+
public static final String MAX_EXTERNAL_STATS_SIZE = "oozie.external.stats.max.size";
public static final String ACL_VIEW_JOB = "mapreduce.job.acl-view-job";
public static final String ACL_MODIFY_JOB = "mapreduce.job.acl-modify-job";
@@ -260,6 +292,12 @@ public class JavaActionExecutor extends ActionExecutor {
conf.set(HADOOP_NAME_NODE, nameNode);
conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "true");
+ // FIXME - think about this!
+ Element e = actionXml.getChild("config-class", ns);
+ if (e != null) {
+ conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
+ }
+
return conf;
}
@@ -299,6 +337,11 @@ public class JavaActionExecutor extends ActionExecutor {
throw convertException(ex);
}
XConfiguration.copy(launcherConf, conf);
+ // Inject config-class for launcher to use for action
+ Element e = actionXml.getChild("config-class", ns);
+ if (e != null) {
+ conf.set(LauncherAMUtils.OOZIE_ACTION_CONFIG_CLASS, e.getTextTrim());
+ }
checkForDisallowedProps(launcherConf, "launcher configuration");
return conf;
}
@@ -876,13 +919,20 @@ public class JavaActionExecutor extends ActionExecutor {
if (launcherJobConf.get(HADOOP_MAP_JAVA_OPTS) != null) {
opts.append(" ").append(launcherJobConf.get(HADOOP_MAP_JAVA_OPTS));
}
+
List<Element> javaopts = actionXml.getChildren("java-opt", ns);
- for (Element opt: javaopts) {
- opts.append(" ").append(opt.getTextTrim());
+
+ // Either one or more <java-opt> element or one <java-opts> can be present since oozie-workflow-0.4
+ if (!javaopts.isEmpty()) {
+ for (Element opt : javaopts) {
+ opts.append(" ").append(opt.getTextTrim());
+ }
}
- Element opt = actionXml.getChild("java-opts", ns);
- if (opt != null) {
- opts.append(" ").append(opt.getTextTrim());
+ else {
+ Element opt = actionXml.getChild("java-opts", ns);
+ if (opt != null) {
+ opts.append(" ").append(opt.getTextTrim());
+ }
}
launcherJobConf.set(HADOOP_CHILD_JAVA_OPTS, opts.toString().trim());
launcherJobConf.set(HADOOP_MAP_JAVA_OPTS, opts.toString().trim());
@@ -942,6 +992,7 @@ public class JavaActionExecutor extends ActionExecutor {
}
Element actionXml = XmlUtils.parseXml(action.getConf());
+ LOG.debug("ActionXML: {0}", action.getConf());
// action job configuration
Configuration actionConf = loadHadoopDefaultResources(context, actionXml);
@@ -950,6 +1001,14 @@ public class JavaActionExecutor extends ActionExecutor {
LOG.debug("Setting LibFilesArchives ");
setLibFilesArchives(context, actionXml, appPathRoot, actionConf);
+ String jobName = actionConf.get(HADOOP_JOB_NAME);
+ if (jobName == null || jobName.isEmpty()) {
+ jobName = XLog.format("oozie:action:T={0}:W={1}:A={2}:ID={3}",
+ getType(), context.getWorkflow().getAppName(),
+ action.getName(), context.getWorkflow().getId());
+ actionConf.set(HADOOP_JOB_NAME, jobName);
+ }
+
injectActionCallback(context, actionConf);
if(actionConf.get(ACL_MODIFY_JOB) == null || actionConf.get(ACL_MODIFY_JOB).trim().equals("")) {
@@ -1052,7 +1111,7 @@ public class JavaActionExecutor extends ActionExecutor {
ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId();
ApplicationSubmissionContext appContext =
createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(),
- credentials);
+ credentials, actionXml);
yarnClient.submitApplication(appContext);
launcherId = appId.toString();
@@ -1138,22 +1197,19 @@ public class JavaActionExecutor extends ActionExecutor {
}
private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf,
- String user, Context context, Configuration actionConf, String actionName,
- Credentials credentials)
+ String user, Context context, Configuration actionConf, String actionName,
+ Credentials credentials, Element actionXml)
throws IOException, HadoopAccessorException, URISyntaxException {
ApplicationSubmissionContext appContext = Records.newRecord(ApplicationSubmissionContext.class);
- String appName = getAppName(context);
-
+ setResources(launcherJobConf, appContext);
+ setPriority(launcherJobConf, appContext);
+ setQueue(launcherJobConf, appContext);
appContext.setApplicationId(appId);
- appContext.setApplicationName(appName);
+ setApplicationName(context, actionName, appContext);
appContext.setApplicationType("Oozie Launcher");
- Priority pri = Records.newRecord(Priority.class);
- int priority = 0; // TODO: OYA: Add a constant or a config
- pri.setPriority(priority);
- appContext.setPriority(pri);
- appContext.setQueue("default"); // TODO: will be possible to set in <launcher>
+
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
// Set the resources to localize
@@ -1170,40 +1226,13 @@ public class JavaActionExecutor extends ActionExecutor {
localResources.put(LauncherAM.ACTION_CONF_XML, actionConfLR);
amContainer.setLocalResources(localResources);
- // Set the environment variables
- Map<String, String> env = new HashMap<String, String>();
- // This adds the Hadoop jars to the classpath in the Launcher JVM
- ClasspathUtils.setupClasspath(env, launcherJobConf);
-
- if (needToAddMapReduceToClassPath()) {
- ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
- }
-
- addActionSpecificEnvVars(env);
- amContainer.setEnvironment(Collections.unmodifiableMap(env));
-
- // Set the command
- List<String> vargs = new ArrayList<String>(6);
- vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString())
- + "/bin/java");
-
- vargs.add("-Dlog4j.configuration=container-log4j.properties");
- vargs.add("-Dlog4j.debug=true");
- vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
- vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 0);
- vargs.add("-Dhadoop.root.logger=INFO,CLA");
- vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
- vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser());
-
- Path amTmpDir = new Path(Apps.crossPlatformify(ApplicationConstants.Environment.PWD.toString()),
- YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR);
- vargs.add("-Djava.io.tmpdir=" + amTmpDir);
+ setEnvironmentVariables(launcherJobConf, amContainer);
+ List<String> vargs = createCommand(context);
+ setJavaOpts(launcherJobConf, actionXml, vargs);
vargs.add(LauncherAM.class.getCanonicalName());
- vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
- Path.SEPARATOR + ApplicationConstants.STDOUT);
- vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR +
- Path.SEPARATOR + ApplicationConstants.STDERR);
+ vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDOUT);
+ vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + ApplicationConstants.STDERR);
StringBuilder mergedCommand = new StringBuilder();
for (CharSequence str : vargs) {
mergedCommand.append(str).append(" ");
@@ -1221,51 +1250,181 @@ public class JavaActionExecutor extends ActionExecutor {
amContainer.setTokens(ByteBuffer.wrap(dob.getData(), 0, dob.getLength()));
}
- // Set Resources
- // TODO: OYA: make resources allocated for the AM configurable and choose good defaults (memory MB, vcores)
- Resource resource = Resource.newInstance(2048, 1);
- appContext.setResource(resource);
appContext.setCancelTokensWhenComplete(true);
return appContext;
}
- Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context,
- final WorkflowAction action,
- final Configuration actionConf) throws Exception {
- if (context == null || action == null) {
- LOG.warn("context or action is null");
- return null;
+ private List<String> createCommand(Context context) {
+ List<String> vargs = new ArrayList<String>(6);
+ vargs.add(Apps.crossPlatformify(ApplicationConstants.Environment.JAVA_HOME.toString())
+ + "/bin/java");
+
+ vargs.add("-Dlog4j.configuration=container-log4j.properties");
+ vargs.add("-Dlog4j.debug=true");
+ vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_DIR + "=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR);
+ vargs.add("-D" + YarnConfiguration.YARN_APP_CONTAINER_LOG_SIZE + "=" + 1024 * 1024);
+ vargs.add("-Dhadoop.root.logger=INFO,CLA");
+ vargs.add("-Dhadoop.root.logfile=" + TaskLog.LogName.SYSLOG);
+ vargs.add("-Dsubmitter.user=" + context.getWorkflow().getUser());
+ return vargs;
+ }
+
+ private void setJavaOpts(Configuration launcherJobConf, Element actionXml, List<String> vargs) {
+ // Note: for backward compatibility reasons, we have to support the <java-opts> tag inside the <java> action
+ // If both java/java-opt(s) and launcher/java-opts are defined, we pick java/java-opts
+ // We also display a warning to let users know that they should migrate their workflow
+ StringBuilder javaOpts = new StringBuilder();
+ boolean oldJavaOpts = handleJavaOpts(actionXml, javaOpts);
+ if (oldJavaOpts) {
+ vargs.add(javaOpts.toString());
+ }
+
+ final String oozieLauncherJavaOpts = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_JAVAOPTS_PROPERTY);
+ if (oozieLauncherJavaOpts != null) {
+ if (oldJavaOpts) {
+ LOG.warn("<java-opts> was defined inside the <launcher> tag -- ignored");
+ } else {
+ vargs.add(oozieLauncherJavaOpts);
+ }
+ }
+ }
+
+ private boolean handleJavaOpts(Element actionXml, StringBuilder javaOpts) {
+ Namespace ns = actionXml.getNamespace();
+ boolean oldJavaOpts = false;
+ @SuppressWarnings("unchecked")
+ List<Element> javaopts = actionXml.getChildren("java-opt", ns);
+ for (Element opt: javaopts) {
+ javaOpts.append(" ").append(opt.getTextTrim());
+ oldJavaOpts = true;
+ }
+ Element opt = actionXml.getChild("java-opts", ns);
+ if (opt != null) {
+ javaOpts.append(" ").append(opt.getTextTrim());
+ oldJavaOpts = true;
+ }
+
+ if (oldJavaOpts) {
+ LOG.warn("Note: <java-opts> inside the action is used in the workflow. Please move <java-opts> tag under"
+ + " the <launcher> element. See the documentation for details");
+ }
+ return oldJavaOpts;
+ }
+
+ private void setApplicationName(Context context, String actionName, ApplicationSubmissionContext appContext) {
+ String jobName = XLog.format("oozie:launcher:T={0}:W={1}:A={2}:ID={3}", getType(),
+ context.getWorkflow().getAppName(), actionName,
+ context.getWorkflow().getId());
+ appContext.setApplicationName(jobName);
+ }
+
+ private void setEnvironmentVariables(Configuration launcherJobConf, ContainerLaunchContext amContainer) throws IOException {
+ Map<String, String> env = new HashMap<>();
+
+ final String oozieLauncherEnvProperty = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_ENV_PROPERTY);
+ if (oozieLauncherEnvProperty != null) {
+ Map<String, String> environmentVars = extractEnvVarsFromOozieLauncherProps(oozieLauncherEnvProperty);
+ env.putAll(environmentVars);
}
- if (Boolean.TRUE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) && !UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
- return null;
+ // This adds the Hadoop jars to the classpath in the Launcher JVM
+ ClasspathUtils.setupClasspath(env, launcherJobConf);
+
+ if (needToAddMapReduceToClassPath()) {
+ ClasspathUtils.addMapReduceToClasspath(env, launcherJobConf);
+ }
+
+ addActionSpecificEnvVars(env);
+ amContainer.setEnvironment(ImmutableMap.copyOf(env));
+ }
+
+ private void setQueue(Configuration launcherJobConf, ApplicationSubmissionContext appContext) {
+ String queue;
+ if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY) != null) {
+ queue = launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_QUEUE_PROPERTY);
+ } else {
+ queue = Preconditions.checkNotNull(ConfigurationService.get(DEFAULT_LAUNCHER_QUEUE), "Default queue is undefined");
+ }
+ appContext.setQueue(queue);
+ }
+
+ private void setPriority(Configuration launcherJobConf, ApplicationSubmissionContext appContext) {
+ int priority;
+ if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY) != null) {
+ priority = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_PRIORITY_PROPERTY, -1);
+ } else {
+ int defaultPriority = ConfigurationService.getInt(DEFAULT_LAUNCHER_PRIORITY);
+ priority = defaultPriority;
+ }
+ Priority pri = Records.newRecord(Priority.class);
+ pri.setPriority(priority);
+ appContext.setPriority(pri);
+ }
+
+ private void setResources(Configuration launcherJobConf, ApplicationSubmissionContext appContext) {
+ int memory;
+ if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY) != null) {
+ memory = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_MEMORY_MB_PROPERTY, -1);
+ Preconditions.checkArgument(memory > 0, "Launcher memory is 0 or negative");
+ } else {
+ int defaultMemory = ConfigurationService.getInt(DEFAULT_LAUNCHER_MEMORY_MB, -1);
+ Preconditions.checkArgument(defaultMemory > 0, "Default launcher memory is 0 or negative");
+ memory = defaultMemory;
}
- final XConfiguration wfJobConf = getWorkflowConf(context);
- if (!Boolean.FALSE.toString().equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) &&
- wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP)) &&
- !UserGroupInformation.isSecurityEnabled()) {
- LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
- return null;
+ int vcores;
+ if (launcherJobConf.get(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY) != null) {
+ vcores = launcherJobConf.getInt(LauncherAM.OOZIE_LAUNCHER_VCORES_PROPERTY, -1);
+ Preconditions.checkArgument(vcores > 0, "Launcher vcores is 0 or negative");
+ } else {
+ int defaultVcores = ConfigurationService.getInt(DEFAULT_LAUNCHER_VCORES);
+ Preconditions.checkArgument(defaultVcores > 0, "Default launcher vcores is 0 or negative");
+ vcores = defaultVcores;
}
+ Resource resource = Resource.newInstance(memory, vcores);
+ appContext.setResource(resource);
+ }
- final Map<String, CredentialsProperties> credPropertiesMap = getActionCredentialsProperties(context, action);
- if (credPropertiesMap.isEmpty()) {
- LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
- return credPropertiesMap;
+ private Map<String, String> extractEnvVarsFromOozieLauncherProps(String oozieLauncherEnvProperty) {
+ Map<String, String> envMap = new LinkedHashMap<>();
+ for (String envVar : StringUtils.split(oozieLauncherEnvProperty, File.pathSeparatorChar)) {
+ String[] env = StringUtils.split(envVar, '=');
+ Preconditions.checkArgument(env.length == 2, "Invalid launcher setting for environment variables: \"%s\". " +
+ "<env> should contain a list of ENV_VAR_NAME=VALUE separated by the '%s' character. " +
+ "Example on Unix: A=foo1:B=foo2", oozieLauncherEnvProperty, File.pathSeparator);
+ envMap.put(env[0], env[1]);
}
+ return envMap;
+ }
- for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
- if (entry.getValue() != null) {
- final CredentialsProperties prop = entry.getValue();
- LOG.debug("Credential Properties set for action : " + action.getId());
- for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) {
- final String key = propEntry.getKey();
- final String value = propEntry.getValue();
- actionConf.set(key, value);
- LOG.debug("property : '" + key + "', value : '" + value + "'");
+ protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context,
+ WorkflowAction action, Configuration actionConf) throws Exception {
+ HashMap<String, CredentialsProperties> credPropertiesMap = null;
+ if (context != null && action != null) {
+ if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) {
+ XConfiguration wfJobConf = getWorkflowConf(context);
+ if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) ||
+ !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) {
+ credPropertiesMap = getActionCredentialsProperties(context, action);
+ if (!credPropertiesMap.isEmpty()) {
+ for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) {
+ if (entry.getValue() != null) {
+ CredentialsProperties prop = entry.getValue();
+ LOG.debug("Credential Properties set for action : " + action.getId());
+ for (Entry<String, String> propEntry : prop.getProperties().entrySet()) {
+ String key = propEntry.getKey();
+ String value = propEntry.getValue();
+ actionConf.set(key, value);
+ LOG.debug("property : '" + key + "', value : '" + value + "'");
+ }
+ }
+ }
+ } else {
+ LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred());
+ }
+ } else {
+ LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)");
}
}
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
index 8fdc50c..a0dfd31 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SqoopActionExecutor.java
@@ -77,8 +77,8 @@ public class SqoopActionExecutor extends JavaActionExecutor {
if (e != null) {
String strConf = XmlUtils.prettyPrint(e).toString();
XConfiguration inlineConf = new XConfiguration(new StringReader(strConf));
- checkForDisallowedProps(inlineConf, "inline configuration");
XConfiguration.copy(inlineConf, actionConf);
+ checkForDisallowedProps(inlineConf, "inline configuration");
}
} catch (IOException ex) {
throw convertException(ex);
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
index 7e33485..5890b8c 100644
--- a/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/ssh/SshActionExecutor.java
@@ -329,8 +329,8 @@ public class SshActionExecutor extends ActionExecutor {
*
* @param command Command to execute as String.
* @return exit status of the execution.
- * @throws IOException if process exits with status nonzero.
- * @throws InterruptedException if process does not run properly.
+ * @throws IOException if processSettings exits with status nonzero.
+ * @throws InterruptedException if processSettings does not run properly.
*/
public int executeCommand(String command) throws IOException, InterruptedException {
Runtime runtime = Runtime.getRuntime();
@@ -396,7 +396,7 @@ public class SshActionExecutor extends ActionExecutor {
* @param action action object.
* @param recoveryId action id + run number to enable recovery in rerun
* @param preserveArgs tell the ssh scripts to preserve or flatten the arguments
- * @return process id of the running command.
+ * @return processSettings id of the running command.
* @throws IOException thrown if failed to run the command.
* @throws InterruptedException thrown if any interruption happens.
*/
@@ -468,7 +468,7 @@ public class SshActionExecutor extends ActionExecutor {
}
/**
- * Get the return value of a process.
+ * Get the return value of a processSettings.
*
* @param command command to be executed.
* @return zero if execution is successful and any non zero value for failure.
@@ -679,7 +679,7 @@ public class SshActionExecutor extends ActionExecutor {
* @param errorBuffer The buffer into which STDERR is to be read. Can be null if only draining is required.
* @param maxLength The maximum data length to be stored in these buffers. This is an indicative value, and the
* store content may exceed this length.
- * @return the exit value of the process.
+ * @return the exit value of the processSettings.
* @throws IOException
*/
private int drainBuffers(Process p, StringBuffer inputBuffer, StringBuffer errorBuffer, int maxLength)
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
index 47bfd2b..87d0c5e 100644
--- a/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
+++ b/core/src/main/java/org/apache/oozie/jms/MessageReceiver.java
@@ -49,7 +49,7 @@ public class MessageReceiver implements MessageListener {
}
/**
- * Get the MessageHandler that will process the message
+ * Get the MessageHandler that will processSettings the message
*
* @return message handler
*/
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
index ffc29af..97a75ff 100644
--- a/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
+++ b/core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
@@ -18,7 +18,6 @@
package org.apache.oozie.service;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.oozie.action.control.EndActionExecutor;
import org.apache.oozie.action.control.ForkActionExecutor;
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/service/SchemaService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/SchemaService.java b/core/src/main/java/org/apache/oozie/service/SchemaService.java
index 137e2c0..9d2a521 100644
--- a/core/src/main/java/org/apache/oozie/service/SchemaService.java
+++ b/core/src/main/java/org/apache/oozie/service/SchemaService.java
@@ -29,15 +29,17 @@ import javax.xml.transform.stream.StreamSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
-import org.apache.hadoop.conf.Configuration;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.schema.ResourceResolver;
import org.xml.sax.SAXException;
+
/**
* Service that loads Oozie workflow definition schema and registered extension
* schemas.
*/
+
public class SchemaService implements Service {
public static final String CONF_PREFIX = Service.CONF_PREFIX + "SchemaService.";
@@ -95,9 +97,12 @@ public class SchemaService implements Service {
}
List<StreamSource> sources = new ArrayList<StreamSource>();
for (String schemaName : schemaNames) {
- sources.add(new StreamSource(IOUtils.getResourceAsStream(schemaName, -1)));
+ StreamSource s = new StreamSource(IOUtils.getResourceAsStream(schemaName, -1));
+ s.setSystemId(schemaName);
+ sources.add(s);
}
SchemaFactory factory = SchemaFactory.newInstance(XMLConstants.W3C_XML_SCHEMA_NS_URI);
+ factory.setResourceResolver(new ResourceResolver());
return factory.newSchema(sources.toArray(new StreamSource[sources.size()]));
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/util/WritableUtils.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/util/WritableUtils.java b/core/src/main/java/org/apache/oozie/util/WritableUtils.java
index aa027e3..5a4cb24 100644
--- a/core/src/main/java/org/apache/oozie/util/WritableUtils.java
+++ b/core/src/main/java/org/apache/oozie/util/WritableUtils.java
@@ -151,6 +151,13 @@ public class WritableUtils {
}
}
+ /**
+ * Write string list.
+ *
+ * @param dataOutput the data output
+ * @param list the list
+ * @throws IOException Signals that an I/O exception has occurred.
+ */
public static void writeStringList(DataOutput dataOutput, List<String> list) throws IOException {
dataOutput.writeInt(list.size());
for (String str : list) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
index a74e5c7..aa0e06b 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
@@ -68,6 +68,7 @@ import org.xml.sax.SAXException;
*/
public class LiteWorkflowAppParser {
+ private static final String LAUNCHER_E = "launcher";
private static final String DECISION_E = "decision";
private static final String ACTION_E = "action";
private static final String END_E = "end";
@@ -214,6 +215,7 @@ public class LiteWorkflowAppParser {
private LiteWorkflowApp parse(String strDef, Element root, Configuration configDefault, Configuration jobConf)
throws WorkflowException {
Namespace ns = root.getNamespace();
+
LiteWorkflowApp def = null;
GlobalSectionData gData = jobConf.get(OOZIE_GLOBAL) == null ?
null : getGlobalFromString(jobConf.get(OOZIE_GLOBAL));
@@ -264,10 +266,10 @@ public class LiteWorkflowAppParser {
}
eActionConf = elem;
if (SUBWORKFLOW_E.equals(elem.getName())) {
- handleDefaultsAndGlobal(gData, null, elem);
+ handleDefaultsAndGlobal(gData, null, elem, ns);
}
else {
- handleDefaultsAndGlobal(gData, configDefault, elem);
+ handleDefaultsAndGlobal(gData, configDefault, elem, ns);
}
}
}
@@ -300,9 +302,11 @@ public class LiteWorkflowAppParser {
} else if (eNode.getName().equals(GLOBAL)) {
if(jobConf.get(OOZIE_GLOBAL) != null) {
gData = getGlobalFromString(jobConf.get(OOZIE_GLOBAL));
- handleDefaultsAndGlobal(gData, null, eNode);
+ handleDefaultsAndGlobal(gData, null, eNode, ns);
}
+
gData = parseGlobalSection(ns, eNode);
+
} else if (eNode.getName().equals(PARAMETERS)) {
// No operation is required
} else {
@@ -438,7 +442,7 @@ public class LiteWorkflowAppParser {
}
}
- Configuration globalConf = null;
+ Configuration globalConf = new XConfiguration();
Element globalConfigurationElement = global.getChild(CONFIGURATION, ns);
if (globalConfigurationElement != null) {
try {
@@ -447,12 +451,18 @@ public class LiteWorkflowAppParser {
throw new WorkflowException(ErrorCode.E0700, "Error while processing global section conf");
}
}
+
+ Element globalLauncherElement = global.getChild(LAUNCHER_E, ns);
+ if (globalLauncherElement != null) {
+ LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(globalConf, globalLauncherElement, ns);
+ launcherConfigHandler.processSettings();
+ }
gData = new GlobalSectionData(globalJobTracker, globalNameNode, globalJobXmls, globalConf);
}
return gData;
}
- private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement)
+ private void handleDefaultsAndGlobal(GlobalSectionData gData, Configuration configDefault, Element actionElement, Namespace ns)
throws WorkflowException {
ActionExecutor ae = Services.get().get(ActionService.class).getExecutor(actionElement.getName());
@@ -497,7 +507,7 @@ public class LiteWorkflowAppParser {
// If this is the global section or ActionExecutor.supportsConfigurationJobXML() returns true, we parse the action's
// <configuration> and <job-xml> fields. We also merge this with those from the <global> section, if given. If none are
// defined, empty values are placed. Exceptions are thrown if there's an error parsing, but not if they're not given.
- if ( GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) {
+ if (GLOBAL.equals(actionElement.getName()) || ae.supportsConfigurationJobXML()) {
@SuppressWarnings("unchecked")
List<Element> actionJobXmls = actionElement.getChildren(JOB_XML, actionNs);
if (gData != null && gData.jobXmls != null) {
@@ -524,12 +534,20 @@ public class LiteWorkflowAppParser {
if (gData != null && gData.conf != null) {
XConfiguration.copy(gData.conf, actionConf);
}
+
+ Element launcherConfiguration = actionElement.getChild(LAUNCHER_E, actionNs);
+ if (launcherConfiguration != null) {
+ LauncherConfigHandler launcherConfigHandler = new LauncherConfigHandler(actionConf, launcherConfiguration, actionNs);
+ launcherConfigHandler.processSettings();
+ }
+
Element actionConfiguration = actionElement.getChild(CONFIGURATION, actionNs);
if (actionConfiguration != null) {
//copy and override
XConfiguration.copy(new XConfiguration(new StringReader(XmlUtils.prettyPrint(actionConfiguration).toString())),
actionConf);
}
+
int position = actionElement.indexOf(actionConfiguration);
actionElement.removeContent(actionConfiguration); //replace with enhanced one
Element eConfXml = XmlUtils.parseXml(actionConf.toXmlString(false));
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
index 23df086..2e09889 100644
--- a/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
+++ b/core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowLib.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.conf.Configuration;
import javax.xml.validation.Schema;
import java.io.StringReader;
-import java.util.Date;
-import java.util.Map;
//TODO javadoc
public abstract class LiteWorkflowLib implements WorkflowLib {
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 0d174b5..2389b99 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -1607,17 +1607,18 @@ will be the requeue interval for the actions which are waiting for a long time w
<property>
<name>oozie.service.SchemaService.wf.schemas</name>
<value>
+ oozie-common-1.0.xsd,
oozie-workflow-0.1.xsd,oozie-workflow-0.2.xsd,oozie-workflow-0.2.5.xsd,oozie-workflow-0.3.xsd,oozie-workflow-0.4.xsd,
- oozie-workflow-0.4.5.xsd,oozie-workflow-0.5.xsd,
- shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd,
+ oozie-workflow-0.4.5.xsd,oozie-workflow-0.5.xsd,oozie-workflow-1.0.xsd,
+ shell-action-0.1.xsd,shell-action-0.2.xsd,shell-action-0.3.xsd,shell-action-1.0.xsd,
email-action-0.1.xsd,email-action-0.2.xsd,
- hive-action-0.2.xsd,hive-action-0.3.xsd,hive-action-0.4.xsd,hive-action-0.5.xsd,hive-action-0.6.xsd,
- sqoop-action-0.2.xsd,sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,
+ hive-action-0.2.xsd,hive-action-0.3.xsd,hive-action-0.4.xsd,hive-action-0.5.xsd,hive-action-0.6.xsd,hive-action-1.0.xsd,
+ sqoop-action-0.2.xsd,sqoop-action-0.3.xsd,sqoop-action-0.4.xsd,sqoop-action-1.0.xsd,
ssh-action-0.1.xsd,ssh-action-0.2.xsd,
distcp-action-0.1.xsd,distcp-action-0.2.xsd,
oozie-sla-0.1.xsd,oozie-sla-0.2.xsd,
- hive2-action-0.1.xsd, hive2-action-0.2.xsd,
- spark-action-0.1.xsd,spark-action-0.2.xsd
+ hive2-action-0.1.xsd,hive2-action-0.2.xsd,hive2-action-1.0.xsd,
+ spark-action-0.1.xsd,spark-action-0.2.xsd,spark-action-1.0.xsd
</value>
<description>
List of schemas for workflows (separated by commas).
@@ -3074,5 +3075,35 @@ will be the requeue interval for the actions which are waiting for a long time w
Most users should not have to change this.
</description>
</property>
+ <property>
+ <name>oozie.launcher.default.vcores</name>
+ <value>1</value>
+ <description>
+ The default number of vcores that are allocated for the Launcher AMs
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.launcher.default.memory.mb</name>
+ <value>2048</value>
+ <description>
+ The default amount of memory in MBs that is allocated for the Launcher AMs
+ </description>
+ </property>
+ <property>
+ <name>oozie.launcher.default.priority</name>
+ <value>0</value>
+ <description>
+ The default YARN priority of the Launcher AM
+ </description>
+ </property>
+
+ <property>
+ <name>oozie.launcher.default.queue</name>
+ <value>default</value>
+ <description>
+ The default YARN queue where the Launcher AM is placed
+ </description>
+ </property>
</configuration>
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
index d74160a..dd90536 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/ActionExecutorTestCase.java
@@ -35,9 +35,7 @@ import org.apache.oozie.service.Services;
import org.apache.oozie.service.UUIDService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.service.WorkflowStoreService;
-import org.apache.oozie.test.XFsTestCase;
import org.apache.oozie.test.XHCatTestCase;
-import org.apache.oozie.test.XTestCase;
import org.apache.oozie.util.ELEvaluator;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
@@ -244,7 +242,7 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase {
protected WorkflowJobBean createBaseWorkflow(XConfiguration protoConf, String actionName) throws Exception {
Path appUri = new Path(getAppPath(), "workflow.xml");
- String content = "<workflow-app xmlns='uri:oozie:workflow:0.1' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>";
+ String content = "<workflow-app xmlns='uri:oozie:workflow:1.0' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>";
content += "<start to='end' />";
content += "<end name='end' /></workflow-app>";
writeToFile(content, getAppPath(), "workflow.xml");
@@ -300,6 +298,41 @@ public abstract class ActionExecutorTestCase extends XHCatTestCase {
return workflow;
}
+ protected WorkflowJobBean createBaseWorkflowWithLauncherConfig(XConfiguration protoConf, String actionName) throws Exception {
+ Path appUri = new Path(getAppPath(), "workflow.xml");
+
+ String content = "<workflow-app xmlns='uri:oozie:workflow:1.0' xmlns:sla='uri:oozie:sla:0.1' name='no-op-wf'>";
+ content += "<global>"
+ + "<launcher>"
+ + " <vcores>2</vcores>"
+ + " <memory.mb>1024</memory.mb>"
+ + " <queue>default</queue>"
+ + " <priority>1</priority>"
+ + " <java-opts>-verbose:class</java-opts>"
+ + "</launcher>"
+ + "</global>";
+
+ content += "<start to='end' />";
+ content += "<end name='end' /></workflow-app>";
+ writeToFile(content, getAppPath(), "workflow.xml");
+
+ WorkflowApp app = new LiteWorkflowApp("testApp", "<workflow-app/>",
+ new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class,
+ "end"))
+ .addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
+ XConfiguration wfConf = new XConfiguration();
+ wfConf.set(OozieClient.USER_NAME, getTestUser());
+ wfConf.set(OozieClient.APP_PATH, appUri.toString());
+
+ WorkflowJobBean workflow = createWorkflow(app, wfConf, protoConf);
+
+ WorkflowActionBean action = new WorkflowActionBean();
+ action.setName(actionName);
+ action.setId(Services.get().get(UUIDService.class).generateChildId(workflow.getId(), actionName));
+ workflow.getActions().add(action);
+ return workflow;
+ }
+
private WorkflowJobBean createWorkflow(WorkflowApp app, Configuration conf, XConfiguration protoConf)
throws Exception {
WorkflowLib workflowLib = Services.get().get(WorkflowStoreService.class).getWorkflowLibWithNoDB();
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
index 6cee7a8..43c71b0 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/LauncherMainTester.java
@@ -31,6 +31,15 @@ public class LauncherMainTester {
System.out.println("Hello World!");
}
+ String testJavaOpts = System.getProperty("testJavaOpts");
+ if (testJavaOpts != null && Boolean.parseBoolean(testJavaOpts)) {
+ throw new RuntimeException("Failing on purpose");
+ }
+ String env = System.getenv("LAUNCHER_ENVIRON");
+ if (env != null && env.equals("foo1")) {
+ throw new RuntimeException("Failing on purpose");
+ }
+
if (args.length == 1) {
if (args[0].equals("throwable")) {
throw new Throwable("throwing throwable");
http://git-wip-us.apache.org/repos/asf/oozie/blob/8b247f28/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
index d1d78fd..02e60c0 100644
--- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
+++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java
@@ -29,14 +29,7 @@ import java.io.Writer;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
+import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.SleepJob;
@@ -51,6 +44,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
@@ -86,7 +80,9 @@ import org.junit.Test;
public class TestJavaActionExecutor extends ActionExecutorTestCase {
- public static final String YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address";
+ private static final String YARN_RESOURCEMANAGER_ADDRESS = "yarn.resourcemanager.address";
+ private static final String MAPRED_CHILD_JAVA_OPTS = "mapred.child.java.opts";
+ private static final String MAPREDUCE_MAP_JAVA_OPTS = "mapreduce.map.java.opts";
@Override
protected void beforeSetUp() throws Exception {
@@ -273,8 +269,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
ae.setupLauncherConf(conf, actionXml, getFsTestCaseDir(), context);
assertEquals("MAIN-CLASS", actionConf.get("oozie.action.java.main", "null"));
assertEquals("org.apache.oozie.action.hadoop.JavaMain", ae.getLauncherMain(conf, actionXml));
- assertTrue(conf.get("mapred.child.java.opts").contains("JAVA-OPTS"));
- assertTrue(conf.get("mapreduce.map.java.opts").contains("JAVA-OPTS"));
+ assertTrue(conf.get(MAPRED_CHILD_JAVA_OPTS).contains("JAVA-OPTS"));
+ assertTrue(conf.get(MAPREDUCE_MAP_JAVA_OPTS).contains("JAVA-OPTS"));
assertEquals(Arrays.asList("A1", "A2"), Arrays.asList(LauncherAMUtils.getMainArguments(conf)));
actionXml = XmlUtils.parseXml("<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
@@ -1114,7 +1110,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
WorkflowApp app = new LiteWorkflowApp("testApp", wfxml,
new StartNodeDef(LiteWorkflowStoreService.LiteControlNodeHandler.class, "start")).
addNode(new EndNodeDef("end", LiteWorkflowStoreService.LiteControlNodeHandler.class));
- Configuration conf = Services.get().get(HadoopAccessorService.class).
+ Configuration conf = getHadoopAccessorService().
createConfiguration(new URI(getNameNodeUri()).getAuthority());
conf.set(OozieClient.APP_PATH, getNameNodeUri() + "/testPath");
conf.set(OozieClient.LOG_TOKEN, "testToken");
@@ -1226,8 +1222,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
Configuration conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf);
- assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts"));
- assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts"));
+ assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS));
+ assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS));
actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -1251,8 +1247,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf);
- assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts"));
- assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts"));
+ assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS));
+ assertEquals("-Xmx200m JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS));
actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -1278,8 +1274,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf);
- assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts"));
- assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts"));
+ assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS));
+ assertEquals("JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS));
actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -1305,8 +1301,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf);
- assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts"));
- assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts"));
+ assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS));
+ assertEquals("-Xmx200m JAVA-OPT3 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS));
actionXml = "<java>" + "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" + "<name-node>"
+ getNameNodeUri() + "</name-node>"
@@ -1333,8 +1329,8 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
conf = ae.createLauncherConf(getFileSystem(), context, action, actionXmlconf, actionConf);
- assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get("mapred.child.java.opts"));
- assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get("mapreduce.map.java.opts"));
+ assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get(MAPRED_CHILD_JAVA_OPTS));
+ assertEquals("JAVA-OPT3 JAVA-OPT4 JAVA-OPT1 JAVA-OPT2", conf.get(MAPREDUCE_MAP_JAVA_OPTS));
}
public void testActionLibsPath() throws Exception {
@@ -2262,6 +2258,176 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
assertEquals("/user/map%20dev/test-case/shell/script/shell%201.sh", actPath);
}
+ public void testSubmitOKWithVcoresAndMemory() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.vcores</name><value>1</value></property>" +
+ " <property><name>oozie.launcher.memory.mb</name><value>1024</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+ submitAction(context);
+ waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId());
+ ActionExecutor ae = new JavaActionExecutor();
+ ae.check(context, context.getAction());
+ assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+ assertNull(context.getAction().getData());
+
+ ae.end(context, context.getAction());
+ assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
+ }
+
+ public void testSubmitOKWithLauncherJavaOpts() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.javaopts</name><value>-DtestJavaOpts=true</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+ submitAction(context);
+ waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId());
+ ActionExecutor ae = new JavaActionExecutor();
+ ae.check(context, context.getAction());
+ assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
+ assertNull(context.getAction().getData());
+
+ ae.end(context, context.getAction());
+ assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
+ }
+
+ public void testSubmitFailsWithNegativeVcores() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.vcores</name><value>-1</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+
+ boolean exception = false;
+ try {
+ submitAction(context);
+ } catch (ActionExecutorException e) {
+ exception = true;
+ assertEquals("Illegal exception was thrown", IllegalArgumentException.class, e.getCause().getClass());
+ }
+
+ assertTrue("Exception was not caught", exception);
+ }
+
+ public void testSubmitFailsWithNegativeMemory() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.memory.mb</name><value>-1</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+
+ boolean exception = false;
+ try {
+ submitAction(context);
+ } catch (ActionExecutorException e) {
+ exception = true;
+ assertEquals("Illegal exception was thrown", IllegalArgumentException.class, e.getCause().getClass());
+ }
+
+ assertTrue("Exception was not caught", exception);
+ }
+
+ public void testSubmitOKWithLauncherEnvVars() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.env</name><value>A=foo1" + File.pathSeparator + "B=foo2</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+ submitAction(context);
+ waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId());
+ ActionExecutor ae = new JavaActionExecutor();
+ ae.check(context, context.getAction());
+ assertEquals("SUCCEEDED", context.getAction().getExternalStatus());
+ assertNull(context.getAction().getData());
+
+ ae.end(context, context.getAction());
+ assertEquals(WorkflowAction.Status.OK, context.getAction().getStatus());
+ }
+
+ public void testEnvVarsPropagatedFromLauncherConfig() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.env</name><value>LAUNCHER_ENVIRON=foo1" + File.pathSeparator + "B=foo2</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+ submitAction(context);
+ waitUntilYarnAppDoneAndAssertSuccess(context.getAction().getExternalId());
+ ActionExecutor ae = new JavaActionExecutor();
+ ae.check(context, context.getAction());
+ assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
+ assertNull(context.getAction().getData());
+
+ ae.end(context, context.getAction());
+ assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
+ }
+
+ public void testSubmitFailsWithInvalidLauncherEnvVars() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.env</name><value>Afoo1" + File.pathSeparator + "B=foo2</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+ try {
+ submitAction(context);
+ fail();
+ }
+ catch (ActionExecutorException e) {
+ assertTrue(e.getMessage().contains("Invalid launcher setting for environment variables"));
+ }
+ }
+
+ public void testSubmitWithLauncherQueue() throws Exception {
+ String actionXml = "<java>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<configuration>" +
+ " <property><name>oozie.launcher.queue</name><value>test</value></property>" +
+ "</configuration>" +
+ "<main-class>" + LauncherMainTester.class.getName() + "</main-class>" +
+ "</java>";
+ Context context = createContext(actionXml, null);
+ submitAction(context);
+ final ApplicationId appId = ConverterUtils.toApplicationId(context.getAction().getExternalId());
+ Configuration conf = getHadoopAccessorService().createConfiguration(getJobTrackerUri());
+ String queue = getHadoopAccessorService().createYarnClient(getTestUser(), conf).getApplicationReport(appId).getQueue();
+ assertEquals("test", queue);
+ }
+
+ private HadoopAccessorService getHadoopAccessorService() {
+ return Services.get().get(HadoopAccessorService.class);
+ }
+
+
public void testChildKill() throws Exception {
final JobConf clusterConf = createJobConf();
FileSystem fileSystem = FileSystem.get(clusterConf);
@@ -2319,7 +2485,7 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase {
// kill the action - based on the job tag, the SleepJob is expected to be killed too
ae.kill(context, context.getAction());
- HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class);
+ HadoopAccessorService hadoopAccessorService = getHadoopAccessorService();
Configuration config = hadoopAccessorService.createConfiguration(getJobTrackerUri());
YarnClient yarnClient = hadoopAccessorService.createYarnClient(getTestUser(), config);