You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2015/04/11 03:36:03 UTC

oozie git commit: OOZIE-2170 Oozie should automatically set configs to make Spark jobs show up in the Spark History Server (rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master 4add492c0 -> f0bceb0bc


OOZIE-2170 Oozie should automatically set configs to make Spark jobs show up in the Spark History Server (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f0bceb0b
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f0bceb0b
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f0bceb0b

Branch: refs/heads/master
Commit: f0bceb0bc94e38cc4bf3818400855818b74c2655
Parents: 4add492
Author: Robert Kanter <rk...@cloudera.com>
Authored: Fri Apr 10 14:03:20 2015 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Fri Apr 10 14:03:20 2015 -0700

----------------------------------------------------------------------
 .../action/hadoop/SparkActionExecutor.java      |  16 ++-
 .../service/SparkConfigurationService.java      | 129 ++++++++++++++++++
 core/src/main/resources/oozie-default.xml       |  16 ++-
 .../service/TestSparkConfigurationService.java  | 131 +++++++++++++++++++
 .../site/twiki/DG_SparkActionExtension.twiki    |  24 +++-
 examples/src/main/apps/spark/workflow.xml       |   5 +-
 release-log.txt                                 |   1 +
 .../action/hadoop/TestSparkActionExecutor.java  |  87 +++++++++++-
 8 files changed, 403 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
index 732d5f0..219a116 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/SparkActionExecutor.java
@@ -24,11 +24,14 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.SparkConfigurationService;
 import org.jdom.Element;
 import org.jdom.Namespace;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 
 public class SparkActionExecutor extends JavaActionExecutor {
     public static final String SPARK_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SparkMain";
@@ -70,9 +73,20 @@ public class SparkActionExecutor extends JavaActionExecutor {
         String jarLocation = actionXml.getChildTextTrim("jar", ns);
         actionConf.set(SPARK_JAR, jarLocation);
 
+        StringBuilder sparkOptsSb = new StringBuilder();
+        if (master.startsWith("yarn")) {
+            String resourceManager = actionConf.get(HADOOP_JOB_TRACKER);
+            Map<String, String> sparkConfig = Services.get().get(SparkConfigurationService.class).getSparkConfig(resourceManager);
+            for (Map.Entry<String, String> entry : sparkConfig.entrySet()) {
+                sparkOptsSb.append("--conf ").append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
+            }
+        }
         String sparkOpts = actionXml.getChildTextTrim("spark-opts", ns);
         if (sparkOpts != null) {
-            actionConf.set(SPARK_OPTS, sparkOpts);
+            sparkOptsSb.append(sparkOpts);
+        }
+        if (sparkOptsSb.length() > 0) {
+            actionConf.set(SPARK_OPTS, sparkOptsSb.toString().trim());
         }
 
         return actionConf;

http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
new file mode 100644
index 0000000..1b7cf4a
--- /dev/null
+++ b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XLog;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+public class SparkConfigurationService implements Service {
+
+    private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
+
+    public static final String SPARK_CONFIGURATION = "oozie.service.SparkConfigurationService.spark.configurations";
+
+    private Map<String, Map<String, String>> sparkConfigs;
+    private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
+
+    @Override
+    public void init(Services services) throws ServiceException {
+        loadSparkConfigs();
+    }
+
+    @Override
+    public void destroy() {
+        sparkConfigs.clear();
+    }
+
+    @Override
+    public Class<? extends Service> getInterface() {
+        return SparkConfigurationService.class;
+    }
+
+    private void loadSparkConfigs() throws ServiceException {
+        sparkConfigs = new HashMap<String, Map<String, String>>();
+        File configDir = new File(ConfigurationService.getConfigurationDirectory());
+        String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATION);
+        if (confDefs != null) {
+            for (String confDef : confDefs) {
+                if (confDef.trim().length() > 0) {
+                    String[] parts = confDef.split("=");
+                    if (parts.length == 2) {
+                        String hostPort = parts[0];
+                        String confDir = parts[1];
+                        File dir = new File(confDir);
+                        if (!dir.isAbsolute()) {
+                            dir = new File(configDir, confDir);
+                        }
+                        if (dir.exists()) {
+                            File file = new File(dir, SPARK_CONFIG_FILE);
+                            if (file.exists()) {
+                                Properties props = new Properties();
+                                FileReader fr = null;
+                                try {
+                                    fr = new FileReader(file);
+                                    props.load(fr);
+                                    fr.close();
+                                    sparkConfigs.put(hostPort, propsToMap(props));
+                                    LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
+                                } catch (IOException ioe) {
+                                    LOG.warn("Spark Configuration could not be loaded for {0}: {1}",
+                                            hostPort, ioe.getMessage(), ioe);
+                                } finally {
+                                    IOUtils.closeSafely(fr);
+                                }
+                            } else {
+                                LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
+                                        hostPort, file.getAbsolutePath());
+                            }
+                        } else {
+                            LOG.warn("Spark Configuration could not be loaded for {0}: {1} does not exist",
+                                    hostPort, dir.getAbsolutePath());
+                        }
+                    } else {
+                        LOG.warn("Spark Configuration could not be loaded: invalid value found: {0}", confDef);
+                    }
+                }
+            }
+        } else {
+            LOG.info("Spark Configuration(s) not specified");
+        }
+    }
+
+    private Map<String, String> propsToMap(Properties props) {
+        Map<String, String> map = new HashMap<String, String>(props.size());
+        for (String key : props.stringPropertyNames()) {
+            map.put(key, props.getProperty(key));
+        }
+        return map;
+    }
+
+    public Map<String, String> getSparkConfig(String resourceManagerHostPort) {
+        resourceManagerHostPort = (resourceManagerHostPort != null) ? resourceManagerHostPort.toLowerCase() : null;
+        Map<String, String> config = sparkConfigs.get(resourceManagerHostPort);
+        if (config == null) {
+            config = sparkConfigs.get("*");
+            if (config == null) {
+                config = new HashMap<String, String>();
+            }
+        }
+        return config;
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index 3936fca..ffff7c3 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -130,7 +130,8 @@
             org.apache.oozie.service.GroupsService,
             org.apache.oozie.service.ProxyUserService,
             org.apache.oozie.service.XLogStreamingService,
-            org.apache.oozie.service.JvmPauseMonitorService
+            org.apache.oozie.service.JvmPauseMonitorService,
+            org.apache.oozie.service.SparkConfigurationService
         </value>
         <description>
             All services to be created and managed by Oozie Services singleton.
@@ -2505,4 +2506,17 @@
         </description>
     </property>
 
+    <property>
+        <name>oozie.service.SparkConfigurationService.spark.configurations</name>
+        <value>*=spark-conf</value>
+        <description>
+            Comma separated AUTHORITY=SPARK_CONF_DIR, where AUTHORITY is the HOST:PORT of
+            the ResourceManager of a YARN cluster. The wildcard '*' configuration is
+            used when there is no exact match for an authority. The SPARK_CONF_DIR contains
+            the relevant spark-defaults.conf properties file. If the path is relative is looked within
+            the Oozie configuration directory; though the path can be absolute.  This is only used
+            when the Spark master is set to either "yarn-client" or "yarn-cluster".
+        </description>
+    </property>
+
 </configuration>

http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
new file mode 100644
index 0000000..b2c499d
--- /dev/null
+++ b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.service;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.oozie.ErrorCode;
+import org.apache.oozie.test.XTestCase;
+import org.apache.oozie.util.IOUtils;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.Properties;
+
+public class TestSparkConfigurationService extends XTestCase {
+
+    protected void setUp() throws Exception {
+        super.setUp();
+        Services services = new Services();
+        services.init();
+    }
+
+    protected void tearDown() throws Exception {
+        Services.get().destroy();
+        super.tearDown();
+    }
+
+    public void testSparkConfigsEmpty() throws Exception {
+        SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
+        scs.destroy();
+        ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations", "");
+        scs.init(Services.get());
+        Map<String, String> sparkConfigs = scs.getSparkConfig("foo");
+        assertEquals(0, sparkConfigs.size());
+    }
+
+    public void testSparkConfigs() throws Exception {
+        File sparkConf1Dir = new File(getTestCaseConfDir(), "spark-conf-1");
+        File sparkConf3Dir = new File(getTestCaseConfDir(), "spark-conf-3");
+        File sparkConf4Dir = new File(getTestCaseConfDir(), "spark-conf-4");
+        sparkConf1Dir.mkdirs();
+        sparkConf3Dir.mkdirs();
+        sparkConf4Dir.mkdirs();
+        File sparkConf1 = new File(sparkConf1Dir, "spark-defaults.conf");
+        Properties sparkConf1Props = new Properties();
+        sparkConf1Props.setProperty("a", "A");
+        sparkConf1Props.setProperty("b", "B");
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(sparkConf1);
+            sparkConf1Props.store(fos, "");
+        } finally {
+            IOUtils.closeSafely(fos);
+        }
+        File sparkConf4 = new File(sparkConf4Dir, "spark-defaults.conf");
+        Properties sparkConf4Props = new Properties();
+        sparkConf4Props.setProperty("y", "Y");
+        sparkConf4Props.setProperty("z", "Z");
+        fos = null;
+        try {
+            fos = new FileOutputStream(sparkConf4);
+            sparkConf4Props.store(fos, "");
+        } finally {
+            IOUtils.closeSafely(fos);
+        }
+
+        SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
+        scs.destroy();
+        ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
+                "rm1=" + sparkConf1Dir.getAbsolutePath() +   // absolute path
+                ",rm2" +                                     // invalid entry
+                ",rm3=" + sparkConf3Dir.getAbsolutePath() +  // missing file
+                ",rm4=" + sparkConf4Dir.getName());          // relative path
+        scs.init(Services.get());
+        Map<String, String> sparkConfigs = scs.getSparkConfig("foo");
+        assertEquals(0, sparkConfigs.size());
+        sparkConfigs = scs.getSparkConfig("rm1");
+        assertEquals(2, sparkConfigs.size());
+        assertEquals("A", sparkConfigs.get("a"));
+        assertEquals("B", sparkConfigs.get("b"));
+        sparkConfigs = scs.getSparkConfig("rm2");
+        assertEquals(0, sparkConfigs.size());
+        sparkConfigs = scs.getSparkConfig("rm3");
+        assertEquals(0, sparkConfigs.size());
+        sparkConfigs = scs.getSparkConfig("rm4");
+        assertEquals(2, sparkConfigs.size());
+        assertEquals("Y", sparkConfigs.get("y"));
+        assertEquals("Z", sparkConfigs.get("z"));
+
+        scs.destroy();
+        ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
+                "rm1=" + sparkConf1Dir.getAbsolutePath() +   // defined
+                ",*=" + sparkConf4Dir.getAbsolutePath());    // wildcard
+        scs.init(Services.get());
+        sparkConfigs = scs.getSparkConfig("rm1");
+        assertEquals(2, sparkConfigs.size());
+        assertEquals("A", sparkConfigs.get("a"));
+        assertEquals("B", sparkConfigs.get("b"));
+        sparkConfigs = scs.getSparkConfig("rm2");
+        assertEquals(2, sparkConfigs.size());
+        assertEquals("Y", sparkConfigs.get("y"));
+        assertEquals("Z", sparkConfigs.get("z"));
+        sparkConfigs = scs.getSparkConfig("foo");
+        assertEquals(2, sparkConfigs.size());
+        assertEquals("Y", sparkConfigs.get("y"));
+        assertEquals("Z", sparkConfigs.get("z"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 8e5cd9b..32ebe12 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -79,7 +79,8 @@ specify multiple =job.xml= files.
 The =configuration= element, if present, contains configuration
 properties that are passed to the Spark job.
 
-The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn, or local.
+The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master,
+or local.
 
 The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster.
 
@@ -89,7 +90,9 @@ The =class= element if present, indicates the spark's application main class.
 
 The =jar= element indicates a comma separated list of jars or python files.
 
-The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver.
+The =spark-opts= element if present, contains a list of spark options that can be passed to spark driver. Spark configuration
+options can be passed by specifying '--conf key=value' here, or from =oozie.service.SparkConfigurationService.spark.configurations=
+in oozie-site.xml.  The =spark-opts= configs have priority.
 
 The =arg= element if present, contains arguments that can be passed to spark application.
 
@@ -137,6 +140,23 @@ Spark action logs are redirected to the Oozie Launcher map-reduce job task STDOU
 From Oozie web-console, from the Spark action pop up using the 'Console URL' link, it is possible
 to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tracker web-console.
 
+---+++ Spark on YARN
+
+To make the Spark action run on YARN, you need to follow these steps:
+
+1. Make the spark-assembly jar available to your Spark action
+
+2. Specify "yarn-client" or "yarn-cluster" in the =master= element
+
+To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties
+either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations=
+
+1. spark.yarn.historyServer.address=http://SPH-HOST:18088
+
+2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
+
+3. spark.eventLog.enabled=true
+
 ---++ Appendix, Spark XML-Schema
 
 ---+++ AE.A Appendix A, Spark XML-Schema

http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/examples/src/main/apps/spark/workflow.xml
----------------------------------------------------------------------
diff --git a/examples/src/main/apps/spark/workflow.xml b/examples/src/main/apps/spark/workflow.xml
index ecb9eaa..1b1a3e8 100644
--- a/examples/src/main/apps/spark/workflow.xml
+++ b/examples/src/main/apps/spark/workflow.xml
@@ -22,12 +22,15 @@
         <spark xmlns="uri:oozie:spark-action:0.1">
             <job-tracker>${jobTracker}</job-tracker>
             <name-node>${nameNode}</name-node>
+            <prepare>
+                <delete path="${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark"/>
+            </prepare>
             <master>${master}</master>
             <name>Spark-FileCopy</name>
             <class>org.apache.oozie.example.SparkFileCopy</class>
             <jar>${nameNode}/user/${wf:user()}/${examplesRoot}/apps/spark/lib/oozie-examples.jar</jar>
             <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/input-data/text/data.txt</arg>
-            <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output</arg>
+            <arg>${nameNode}/user/${wf:user()}/${examplesRoot}/output-data/spark</arg>
         </spark>
         <ok to="end" />
         <error to="fail" />

http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 642cf31..bfa9fdd 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.2.0 release (trunk - unreleased)
 
+OOZIE-2170 Oozie should automatically set configs to make Spark jobs show up in the Spark History Server (rkanter)
 OOZIE-2140 Audit Log should be shown in Oozie UI (puru)
 OOZIE-2139 Coord update doesn't work for job which is submitted by bundle (puru)
 OOZIE-1726 Oozie does not support _HOST when configuring kerberos security (venkatnrangan via bzhang)

http://git-wip-us.apache.org/repos/asf/oozie/blob/f0bceb0b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index a15e76b..f271abc 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.action.hadoop;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobClient;
@@ -27,12 +28,17 @@ import org.apache.hadoop.mapred.RunningJob;
 import org.apache.oozie.WorkflowActionBean;
 import org.apache.oozie.WorkflowJobBean;
 import org.apache.oozie.client.WorkflowAction;
+import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.service.HadoopAccessorService;
 import org.apache.oozie.service.Services;
+import org.apache.oozie.service.SparkConfigurationService;
 import org.apache.oozie.service.WorkflowAppService;
 import org.apache.oozie.util.IOUtils;
 import org.apache.oozie.util.XConfiguration;
+import org.apache.oozie.util.XmlUtils;
+import org.jdom.Element;
 
+import java.io.FileOutputStream;
 import java.io.OutputStreamWriter;
 import java.io.File;
 import java.io.FileInputStream;
@@ -42,10 +48,17 @@ import java.io.Writer;
 
 import java.text.MessageFormat;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
 
 public class TestSparkActionExecutor extends ActionExecutorTestCase {
     private static final String SPARK_FILENAME = "file.txt";
     private static final String OUTPUT = "output";
+    private static Pattern SPARK_OPTS_PATTERN = Pattern.compile("([^= ]+)=([^= ]+)");
 
     @Override
     protected void setSystemProps() throws Exception {
@@ -53,10 +66,82 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
         setSystemProperty("oozie.service.ActionService.executor.classes", SparkActionExecutor.class.getName());
     }
 
-    @SuppressWarnings("unchecked")
     public void testSetupMethods() throws Exception {
+        _testSetupMethods("local[*]", new HashMap<String, String>());
+        _testSetupMethods("yarn", new HashMap<String, String>());
+    }
+
+    public void testSetupMethodsWithSparkConfiguration() throws Exception {
+        File sparkConfDir = new File(getTestCaseConfDir(), "spark-conf");
+        sparkConfDir.mkdirs();
+        File sparkConf = new File(sparkConfDir, "spark-defaults.conf");
+        Properties sparkConfProps = new Properties();
+        sparkConfProps.setProperty("a", "A");
+        sparkConfProps.setProperty("b", "B");
+        FileOutputStream fos = null;
+        try {
+            fos = new FileOutputStream(sparkConf);
+            sparkConfProps.store(fos, "");
+        } finally {
+            IOUtils.closeSafely(fos);
+        }
+        SparkConfigurationService scs = Services.get().get(SparkConfigurationService.class);
+        scs.destroy();
+        ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
+                getJobTrackerUri() + "=" + sparkConfDir.getAbsolutePath());
+        scs.init(Services.get());
+
+        _testSetupMethods("local[*]", new HashMap<String, String>());
+        Map<String, String> extraSparkOpts = new HashMap<String, String>(2);
+        extraSparkOpts.put("a", "A");
+        extraSparkOpts.put("b", "B");
+        _testSetupMethods("yarn", extraSparkOpts);
+    }
+
+    @SuppressWarnings("unchecked")
+    private void _testSetupMethods(String master, Map<String, String> extraSparkOpts) throws Exception {
         SparkActionExecutor ae = new SparkActionExecutor();
         assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses());
+
+        Element actionXml = XmlUtils.parseXml("<spark>" +
+                "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
+                "<name-node>" + getNameNodeUri() + "</name-node>" +
+                "<master>" + master + "</master>" +
+                "<mode>client</mode>" +
+                "<name>Some Name</name>" +
+                "<class>org.apache.oozie.foo</class>" +
+                "<jar>" + getNameNodeUri() + "/foo.jar</jar>" +
+                "<spark-opts>--conf foo=bar</spark-opts>" +
+                "</spark>");
+
+        XConfiguration protoConf = new XConfiguration();
+        protoConf.set(WorkflowAppService.HADOOP_USER, getTestUser());
+
+        WorkflowJobBean wf = createBaseWorkflow(protoConf, "spark-action");
+        WorkflowActionBean action = (WorkflowActionBean) wf.getActions().get(0);
+        action.setType(ae.getType());
+
+        Context context = new Context(wf, action);
+
+        Configuration conf = ae.createBaseHadoopConf(context, actionXml);
+        ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
+        assertEquals(master, conf.get("oozie.spark.master"));
+        assertEquals("client", conf.get("oozie.spark.mode"));
+        assertEquals("Some Name", conf.get("oozie.spark.name"));
+        assertEquals("org.apache.oozie.foo", conf.get("oozie.spark.class"));
+        assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar"));
+        Map<String, String> sparkOpts = new HashMap<String, String>();
+        sparkOpts.put("foo", "bar");
+        sparkOpts.putAll(extraSparkOpts);
+        Matcher m = SPARK_OPTS_PATTERN.matcher(conf.get("oozie.spark.spark-opts"));
+        int count = 0;
+        while (m.find()) {
+            count++;
+            String key = m.group(1);
+            String val = m.group(2);
+            assertEquals(sparkOpts.get(key), val);
+        }
+        assertEquals(sparkOpts.size(), count);
     }
 
     private String getActionXml() {