You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2015/04/11 03:36:03 UTC
oozie git commit: OOZIE-2170 Oozie should automatically set configs
to make Spark jobs show up in the Spark History Server (rkanter)
Repository: oozie
Updated Branches:
refs/heads/master 4add492c0 -> f0bceb0bc
OOZIE-2170 Oozie should automatically set configs to make Spark jobs show up in the Spark History Server (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f0bceb0b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f0bceb0b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f0bceb0b
Branch: refs/heads/master
Commit: f0bceb0bc94e38cc4bf3818400855818b74c2655
Parents: 4add492
Author: Robert Kanter <rk...@cloudera.com>
Authored: Fri Apr 10 14:03:20 2015 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Fri Apr 10 14:03:20 2015 -0700
----------------------------------------------------------------------
.../action/hadoop/SparkActionExecutor.java | 16 ++-
.../service/SparkConfigurationService.java | 129 ++++++++++++++++++
core/src/main/resources/oozie-default.xml | 16 ++-
.../service/TestSparkConfigurationService.java | 131 +++++++++++++++++++
.../site/twiki/DG_SparkActionExtension.twiki | 24 +++-
examples/src/main/apps/spark/workflow.xml | 5 +-
release-log.txt | 1 +
.../action/hadoop/TestSparkActionExecutor.java | 87 +++++++++++-
8 files changed, 403 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 732d5f0..219a116 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -24,11 +24,14 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.oozie.action.ActionExecutorException;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.SparkConfigurationService;
import org.jdom.Element;
import org.jdom.Namespace;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
public class SparkActionExecutor extends JavaActionExecutor {
public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain";
@@ -70,9 +73,20 @@ public class SparkActionExecutor extends JavaActionExecutor {
String jarLocation = actionXml.getChildTextTrim("jar", ns);
actionConf.set(SPARK_JAR, jarLocation);
+ StringBuilder sparkOptsSb = new StringBuilder();
+ if (master.startsWith("yarn")) {
+ String resourceManager = actionConf.get(HADOOP_JOB_TRACKER);
+ Map<String, String> sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
+ for (Map.Entry<String, String> entry : sparkConfig.entrySet()) {
+ sparkOptsSb.append("--conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
+ }
+ }
String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns);
if (sparkOpts != null) {
- actionConf.set(SPARK_OPTS, sparkOpts);
+ sparkOptsSb.append(sparkOpts);
+ }
+ if (sparkOptsSb.length() > 0) {
+ actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim());
}
return actionConf;
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
new file mode 100644
index 0000000..1b7cf4a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class SparkConfigurationService implements Service {
+
+ private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
+
+ public static final String SPARK_CONFIGURATION = "oozie.service.SparkConfigurationService.spark.configurations";
+
+ private Map<String, Map<String, String>> sparkConfigs;
+ private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
+
+ @Override
+ public void init(Services services) throws ServiceException {
+ loadSparkConfigs();
+ }
+
+ @Override
+ public void destroy() {
+ sparkConfigs.clear();
+ }
+
+ @Override
+ public Class<? extends Service> getInterface() {
+ return SparkConfigurationService.class;
+ }
+
+ private void loadSparkConfigs() throws ServiceException {
+ sparkConfigs = new HashMap<String, Map<String, String>>();
+ File configDir = new File(ConfigurationService.getConfigurationDirectory());
+ String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATION);
+ if (confDefs != null) {
+ for (String confDef : confDefs) {
+ if (confDef.trim().length() > 0) {
+ String[] parts = confDef.split("=");
+ if (parts.length == 2) {
+ String hostPort = parts[0];
+ String confDir = parts[1];
+ File dir = new File(confDir);
+ if (!dir.isAbsolute()) {
+ dir = new File(configDir, confDir);
+ }
+ if (dir.exists()) {
+ File file = new File(dir, SPARK_CONFIG_FILE);
+ if (file.exists()) {
+ Properties props = new Properties();
+ FileReader fr = null;
+ try {
+ fr = new FileReader(file);
+ props.load(fr);
+ fr.close();
+ sparkConfigs.put(hostPort, propsToMap(props));
+ LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
+ } catch (IOException ioe) {
+ LOG.warn("Spark Configuration could not be loaded for {0}: {1}",
+ hostPort, ioe.getMessage(), ioe);
+ } finally {
+ IOUtils.closeSafely(fr);
+ }
+ } else {
+ LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
+ hostPort, file.getAbsolutePath());
+ }
+ } else {
+ LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
+ hostPort, dir.getAbsolutePath());
+ }
+ } else {
+ LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef);
+ }
+ }
+ }
+ } else {
+ LOG.info("Spark Configuration(s) not specified");
+ }
+ }
+
+ private Map<String, String> propsToMap(Properties props) {
+ Map<String, String> map = new HashMap<String, String>(props.size());
+ for (String key : props.stringPropertyNames()) {
+ map.put(key, props.getProperty(key));
+ }
+ return map;
+ }
+
+ public Map<String, String> getSparkConfig(String resourceManagerHostPort) {
+ resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null;
+ Map<String, String> config = sparkConfigs.get(resourceManagerHostPort);
+ if (config == null) {
+ config = sparkConfigs.get("*");
+ if (config == null) {
+ config = new HashMap<String, String>();
+ }
+ }
+ return config;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 3936fca..ffff7c3 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -130,7 +130,8 @@
org.apache.oozie.service.GroupsService,
org.apache.oozie.service.ProxyUserService,
org.apache.oozie.service.XLogStreamingService,
- org.apache.oozie.service.JvmPauseMonitorService
+ org.apache.oozie.service.JvmPauseMonitorService,
+ org.apache.oozie.service.SparkConfigurationService
</value>
<description>
All services to be created and managed by Oozie Services singleton.
@@ -2505,4 +2506,17 @@
</description>
</property>
+ <property>
+ <name>oozie.service.SparkConfigurationService.spark.configurations</name>
+ <value>*=spark-conf</value>
+ <description>
+ Comma separated AUTHORITY=SPARK_CONF_DIR, where AUTHORITY is the HOST:PORT of
+ the ResourceManager of a YARN cluster. The wildcard '*' configuration is
+ used when there is no exact match for an authority. The SPARK_CONF_DIR contains
+ the relevant spark-defaults.conf properties file. If the path is relative is looked within
+ the Oozie configuration directory; though the path can be absolute. This is only used
+ when the Spark master is set to either "yarn-client" or "yarn-cluster".
+ </description>
+ </property>
+
</configuration>
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
new file mode 100644
index 0000000..b2c499d
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.IOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+
+public class TestSparkConfigurationService extends XTestCase {
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ Services services = new Services();
+ services.init();
+ }
+
+ protected void tearDown() throws Exception {
+ Services.get().destroy();
+ super.tearDown();
+ }
+
+ public void testSparkConfigsEmpty() throws Exception {
+ SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
+ scs.destroy();
+ ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", "");
+ scs.init(Services.get());
+ Map<String, String> sparkConfigs = scs.getSparkConfig("foo");
+ assertEquals(0, sparkConfigs.size());
+ }
+
+ public void testSparkConfigs() throws Exception {
+ File sparkConf1Dir = new File(getTestCaseConfDir(), "spark-conf-1");
+ File sparkConf3Dir = new File(getTestCaseConfDir(), "spark-conf-3");
+ File sparkConf4Dir = new File(getTestCaseConfDir(), "spark-conf-4");
+ sparkConf1Dir.mkdirs();
+ sparkConf3Dir.mkdirs();
+ sparkConf4Dir.mkdirs();
+ File sparkConf1 = new File(sparkConf1Dir, "spark-defaults.conf");
+ Properties sparkConf1Props = new Properties();
+ sparkConf1Props.setProperty("a", "A");
+ sparkConf1Props.setProperty("b", "B");
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(sparkConf1);
+ sparkConf1Props.store(fos, "");
+ } finally {
+ IOUtils.closeSafely(fos);
+ }
+ File sparkConf4 = new File(sparkConf4Dir, "spark-defaults.conf");
+ Properties sparkConf4Props = new Properties();
+ sparkConf4Props.setProperty("y", "Y");
+ sparkConf4Props.setProperty("z", "Z");
+ fos = null;
+ try {
+ fos = new FileOutputStream(sparkConf4);
+ sparkConf4Props.store(fos, "");
+ } finally {
+ IOUtils.closeSafely(fos);
+ }
+
+ SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
+ scs.destroy();
+ ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
+ "rm1=" + sparkConf1Dir.getAbsolutePath() + // absolute path
+ ",rm2" + // invalid entry
+ ",rm3=" + sparkConf3Dir.getAbsolutePath() + // missing file
+ ",rm4=" + sparkConf4Dir.getName()); // relative path
+ scs.init(Services.get());
+ Map<String, String> sparkConfigs = scs.getSparkConfig("foo");
+ assertEquals(0, sparkConfigs.size());
+ sparkConfigs = scs.getSparkConfig("rm1");
+ assertEquals(2, sparkConfigs.size());
+ assertEquals("A", sparkConfigs.get("a"));
+ assertEquals("B", sparkConfigs.get("b"));
+ sparkConfigs = scs.getSparkConfig("rm2");
+ assertEquals(0, sparkConfigs.size());
+ sparkConfigs = scs.getSparkConfig("rm3");
+ assertEquals(0, sparkConfigs.size());
+ sparkConfigs = scs.getSparkConfig("rm4");
+ assertEquals(2, sparkConfigs.size());
+ assertEquals("Y", sparkConfigs.get("y"));
+ assertEquals("Z", sparkConfigs.get("z"));
+
+ scs.destroy();
+ ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
+ "rm1=" + sparkConf1Dir.getAbsolutePath() + // defined
+ ",*=" + sparkConf4Dir.getAbsolutePath()); // wildcard
+ scs.init(Services.get());
+ sparkConfigs = scs.getSparkConfig("rm1");
+ assertEquals(2, sparkConfigs.size());
+ assertEquals("A", sparkConfigs.get("a"));
+ assertEquals("B", sparkConfigs.get("b"));
+ sparkConfigs = scs.getSparkConfig("rm2");
+ assertEquals(2, sparkConfigs.size());
+ assertEquals("Y", sparkConfigs.get("y"));
+ assertEquals("Z", sparkConfigs.get("z"));
+ sparkConfigs = scs.getSparkConfig("foo");
+ assertEquals(2, sparkConfigs.size());
+ assertEquals("Y", sparkConfigs.get("y"));
+ assertEquals("Z", sparkConfigs.get("z"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 8e5cd9b..32ebe12 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -79,7 +79,8 @@ specify multiple =job.xml= files.
The =configuration= element, if present, contains configuration
properties that are passed to the Spark job.
-The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn, or local.
+The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master,
+or local.
The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster.
@@ -89,7 +90,9 @@ The =class= element if present, indicates the spark's application main class.
The =jar= element indicates a comma separated list of jars or python files.
-The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver.
+The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver. Spark configuration
+options can be passed by specifying '--conf key=value' here, or from =oozie.service.SparkConfigurationService.spark.configurations=
+in oozie-site.xml. The =spark-opts= configs have priority.
The =arg= element if present, contains arguments that can be passed to spark application.
@@ -137,6 +140,23 @@ Spark action logs are redirected to the Oozie Launcher map-reduce job task STDOU
From Oozie web-console, from the Spark action pop up using the 'Console URL' link, it is possible
to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tracker web-console.
+---+++ Spark on YARN
+
+To make the Spark action run on YARN, you need to follow these steps:
+
+1. Make the spark-assembly jar available to your Spark action
+
+2. Specify "yarn-client" or "yarn-cluster" in the =master= element
+
+To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties
+either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations=
+
+1. spark.yarn.historyServer.address=http://SPH-HOST:18088
+
+2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
+
+3. spark.eventLog.enabled=true
+
---++ Appendix, Spark XML-Schema
---+++ AE.A Appendix A, Spark XML-Schema
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/examples/src/main/apps/spark/workflow.xml
----------------------------------------------------------------------
diff --git a/examples/src/main/apps/spark/workflow.xml b/examples/src/main/apps/spark/workflow.xml
index ecb9eaa..1b1a3e8 100644
--- a/examples/src/main/apps/spark/workflow.xml
+++ b/examples/src/main/apps/spark/workflow.xml
@@ -22,12 +22,15 @@
<spark xmlns="uri:oozie:spark-action:0.1">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
+ <prepare>
+ <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark"/>
+ </prepare>
<master>${master}</master>
<name>Spark-FileCopy</name>
<class>org.apache.oozie.example.SparkFileCopy</class>
<jar>${nameNode}/user/${wf:user()}/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar>
<arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data/text/data.txt</arg>
- <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output</arg>
+ <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark</arg>
</spark>
<ok to="end" />
<error to="fail" />
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 642cf31..bfa9fdd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.2.0 release (trunk - unreleased)
+OOZIE-2170 Oozie should automatically set configs to make Spark jobs show up in the Spark History Server (rkanter)
OOZIE-2140 Audit Log should be shown in Oozie UI (puru)
OOZIE-2139 Coord update doesn't work for job which is submitted by bundle (puru)
OOZIE-1726 Oozie does not support _HOST when configuring kerberos security (venkatnrangan via bzhang)
http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index a15e76b..f271abc 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -18,6 +18,7 @@
package org.apache.oozie.action.hadoop;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
@@ -27,12 +28,17 @@ import org.apache.hadoop.mapred.RunningJob;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.ConfigurationService;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
+import org.apache.oozie.service.SparkConfigurationService;
import org.apache.oozie.service.WorkflowAppService;
import org.apache.oozie.util.IOUtils;
import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
+import java.io.FileOutputStream;
import java.io.OutputStreamWriter;
import java.io.File;
import java.io.FileInputStream;
@@ -42,10 +48,17 @@ import java.io.Writer;
import java.text.MessageFormat;
import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class TestSparkActionExecutor extends ActionExecutorTestCase {
private static final String SPARK_FILENAME = "file.txt";
private static final String OUTPUT = "output";
+ private static Pattern SPARK_OPTS_PATTERN = Pattern.compile("([^= ]+)=([^= ]+)");
@Override
protected void setSystemProps() throws Exception {
@@ -53,10 +66,82 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
setSystemProperty("oozie.service.ActionService.executor.classes", SparkActionExecutor.class.getName());
}
- @SuppressWarnings("unchecked")
public void testSetupMethods() throws Exception {
+ _testSetupMethods("local[*]", new HashMap<String, String>());
+ _testSetupMethods("yarn", new HashMap<String, String>());
+ }
+
+ public void testSetupMethodsWithSparkConfiguration() throws Exception {
+ File sparkConfDir = new File(getTestCaseConfDir(), "spark-conf");
+ sparkConfDir.mkdirs();
+ File sparkConf = new File(sparkConfDir, "spark-defaults.conf");
+ Properties sparkConfProps = new Properties();
+ sparkConfProps.setProperty("a", "A");
+ sparkConfProps.setProperty("b", "B");
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(sparkConf);
+ sparkConfProps.store(fos, "");
+ } finally {
+ IOUtils.closeSafely(fos);
+ }
+ SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
+ scs.destroy();
+ ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
+ getJobTrackerUri() + "=" + sparkConfDir.getAbsolutePath());
+ scs.init(Services.get());
+
+ _testSetupMethods("local[*]", new HashMap<String, String>());
+ Map<String, String> extraSparkOpts = new HashMap<String, String>(2);
+ extraSparkOpts.put("a", "A");
+ extraSparkOpts.put("b", "B");
+ _testSetupMethods("yarn", extraSparkOpts);
+ }
+
+ @SuppressWarnings("unchecked")
+ private void _testSetupMethods(String master, Map<String, String> extraSparkOpts) throws Exception {
SparkActionExecutor ae = new SparkActionExecutor();
assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses());
+
+ Element actionXml = XmlUtils.parseXml("<spark>" +
+ "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+ "<name-node>" + getNameNodeUri() + "</name-node>" +
+ "<master>" + master + "</master>" +
+ "<mode>client</mode>" +
+ "<name>Some Name</name>" +
+ "<class>org.apache.oozie.foo</class>" +
+ "<jar>" + getNameNodeUri() + "/foo.jar</jar>" +
+ "<spark-opts>--conf foo=bar</spark-opts>" +
+ "</spark>");
+
+ XConfiguration protoConf = new XConfiguration();
+ protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+
+ WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action");
+ WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
+ action.setType(ae.getType());
+
+ Context context = new Context(wf, action);
+
+ Configuration conf = ae.createBaseHadoopConf(context, actionXml);
+ ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+ assertEquals(master, conf.get("oozie.spark.master"));
+ assertEquals("client", conf.get("oozie.spark.mode"));
+ assertEquals("Some Name", conf.get("oozie.spark.name"));
+ assertEquals("org.apache.oozie.foo", conf.get("oozie.spark.class"));
+ assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar"));
+ Map<String, String> sparkOpts = new HashMap<String, String>();
+ sparkOpts.put("foo", "bar");
+ sparkOpts.putAll(extraSparkOpts);
+ Matcher m = SPARK_OPTS_PATTERN.matcher(conf.get("oozie.spark.spark-opts"));
+ int count = 0;
+ while (m.find()) {
+ count++;
+ String key = m.group(1);
+ String val = m.group(2);
+ assertEquals(sparkOpts.get(key), val);
+ }
+ assertEquals(sparkOpts.size(), count);
}
private String getActionXml() {