You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2015/09/01 01:33:51 UTC

oozie git commit: OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter)

Repository: oozie
Updated Branches:
  refs/heads/master 6b61d878e -> 5a598039a


OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5a598039
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5a598039
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5a598039

Branch: refs/heads/master
Commit: 5a598039a8aeabcf46d0f1d13542c94ff9b8475f
Parents: 6b61d87
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Aug 31 16:32:47 2015 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Aug 31 16:32:47 2015 -0700

----------------------------------------------------------------------
 .../service/SparkConfigurationService.java      |  13 +-
 core/src/main/resources/oozie-default.xml       |  10 ++
 .../service/TestSparkConfigurationService.java  |  12 ++
 .../site/twiki/DG_SparkActionExtension.twiki    |  17 +--
 release-log.txt                                 |   1 +
 sharelib/spark/pom.xml                          | 142 +++++++++++++++++--
 .../SparkMain.java                              | 132 ++++++++++++++++-
 .../action/hadoop/TestSparkActionExecutor.java  |  18 ++-
 8 files changed, 310 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
index 1b7cf4a..b29ab8d 100644
--- a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
+++ b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
@@ -36,10 +36,14 @@ public class SparkConfigurationService implements Service {
 
     private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
 
-    public static final String SPARK_CONFIGURATION = "oozie.service.SparkConfigurationService.spark.configurations";
+    public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService.";
+    public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations";
+    public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR
+            = CONF_PREFIX + "spark.configurations.ignore.spark.yarn.jar";
 
     private Map<String, Map<String, String>> sparkConfigs;
     private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
+    private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar";
 
     @Override
     public void init(Services services) throws ServiceException {
@@ -59,8 +63,9 @@ public class SparkConfigurationService implements Service {
     private void loadSparkConfigs() throws ServiceException {
         sparkConfigs = new HashMap<String, Map<String, String>>();
         File configDir = new File(ConfigurationService.getConfigurationDirectory());
-        String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATION);
+        String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS);
         if (confDefs != null) {
+            boolean ignoreSparkYarnJar = ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR);
             for (String confDef : confDefs) {
                 if (confDef.trim().length() > 0) {
                     String[] parts = confDef.split("=");
@@ -80,6 +85,10 @@ public class SparkConfigurationService implements Service {
                                     fr = new FileReader(file);
                                     props.load(fr);
                                     fr.close();
+                                    if (ignoreSparkYarnJar) {
+                                        // Ignore spark.yarn.jar because it may interfere with the Spark Sharelib jars
+                                        props.remove(SPARK_YARN_JAR_PROP);
+                                    }
                                     sparkConfigs.put(hostPort, propsToMap(props));
                                     LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
                                 } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index fa7754a..8a0bc3b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2567,6 +2567,16 @@
     </property>
 
     <property>
+        <name>oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar</name>
+        <value>true</value>
+        <description>
+            If true, Oozie will ignore the "spark.yarn.jar" property from any Spark configurations specified in
+            oozie.service.SparkConfigurationService.spark.configurations.  If false, Oozie will not ignore it.  It is recommended
+            to leave this as true because it can interfere with the jars in the Spark sharelib.
+        </description>
+    </property>
+
+    <property>
         <name>oozie.email.attachment.enabled</name>
         <value>true</value>
         <description>

http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
index b2c499d..9d82fdc 100644
--- a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
@@ -68,6 +68,7 @@ public class TestSparkConfigurationService extends XTestCase {
         Properties sparkConf1Props = new Properties();
         sparkConf1Props.setProperty("a", "A");
         sparkConf1Props.setProperty("b", "B");
+        sparkConf1Props.setProperty("spark.yarn.jar", "foo");   // should be ignored by default
         FileOutputStream fos = null;
         try {
             fos = new FileOutputStream(sparkConf1);
@@ -109,6 +110,17 @@ public class TestSparkConfigurationService extends XTestCase {
         assertEquals(2, sparkConfigs.size());
         assertEquals("Y", sparkConfigs.get("y"));
         assertEquals("Z", sparkConfigs.get("z"));
+        scs.destroy();
+        // Setting this to false should make it not ignore spark.yarn.jar
+        ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar",
+                false);
+        scs.init(Services.get());
+        sparkConfigs = scs.getSparkConfig("rm1");
+        assertEquals(3, sparkConfigs.size());
+        assertEquals("A", sparkConfigs.get("a"));
+        assertEquals("B", sparkConfigs.get("b"));
+        assertEquals("foo", sparkConfigs.get("spark.yarn.jar"));
+        ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", true);
 
         scs.destroy();
         ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",

http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 32ebe12..ee11229 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -82,7 +82,14 @@ properties that are passed to the Spark job.
 The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master,
 or local.
 
-The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster.
+The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster.  This is typically
+not required because you can specify it as part of =master= (i.e. master=yarn, mode=client is equivalent to master=yarn-client).
+A local =master= always runs in client mode.
+
+Depending on the =master= (and =mode=) entered, the Spark job will run differently as follows:
+   * local mode: everything runs here in the Launcher Job.
+   * yarn-client mode: the driver runs here in the Launcher Job and the executor in Yarn.
+   * yarn-cluster mode: the driver and executor run in Yarn.
 
 The =name= element indicates the name of the spark application.
 
@@ -142,14 +149,8 @@ to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tr
 
 ---+++ Spark on YARN
 
-To make the Spark action run on YARN, you need to follow these steps:
-
-1. Make the spark-assembly jar available to your Spark action
-
-2. Specify "yarn-client" or "yarn-cluster" in the =master= element
-
 To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties
-either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations=
+either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml.
 
 1. spark.yarn.historyServer.address=http://SPH-HOST:18088
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ce62dc0..a7c180b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter)
 OOZIE-2322 Oozie Web UI doesn't work with Kerberos in Internet Explorer 10 or 11 and curl (rkanter)
 OOZIE-2343 Shell Action should take Oozie Action config and setup HADOOP_CONF_DIR (rkanter)
 OOZIE-2245 Service to periodically check database schema (rkanter)

http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index 6f7e74a..0878ae6 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -68,41 +68,76 @@
                     <groupId>org.apache.hadoop</groupId>
                     <artifactId>hadoop-client</artifactId>
                 </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-graphx_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+            <exclusions>
                 <exclusion>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-web-proxy</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>jul-to-slf4j</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
                 </exclusion>
                 <exclusion>
-                    <groupId>org.slf4j</groupId>
-                    <artifactId>jcl-over-slf4j</artifactId>
+                    <groupId>org.apache.hadoop</groupId>
+                    <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
                 </exclusion>
             </exclusions>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.oozie</groupId>
-            <artifactId>oozie-core</artifactId>
-            <classifier>tests</classifier>
-            <scope>test</scope>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-mllib_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
         </dependency>
-
         <dependency>
             <groupId>org.apache.spark</groupId>
-            <artifactId>spark-graphx_2.10</artifactId>
+            <artifactId>spark-repl_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
             <version>${spark.version}</version>
             <scope>compile</scope>
         </dependency>
-
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming_2.10</artifactId>
             <version>${spark.version}</version>
             <scope>compile</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-flume_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-streaming-kafka_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-bagel_2.10</artifactId>
+            <version>${spark.version}</version>
+            <scope>compile</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.oozie</groupId>
@@ -117,8 +152,16 @@
         </dependency>
 
         <dependency>
+            <groupId>org.apache.oozie</groupId>
+            <artifactId>oozie-core</artifactId>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-minicluster</artifactId>
+            <scope>test</scope>
         </dependency>
 
         <dependency>
@@ -205,5 +248,76 @@
         </plugins>
     </build>
 
+    <profiles>
+        <profile>
+            <id>hadoop-2</id>
+            <dependencies>
+                <dependency>
+                    <groupId>org.apache.spark</groupId>
+                    <artifactId>spark-yarn_2.10</artifactId>
+                    <version>${spark.version}</version>
+                    <scope>compile</scope>
+                    <exclusions>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-yarn-client</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-yarn-api</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-yarn-common</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-yarn-server-common</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-mapreduce-client-core</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-mapreduce-client-common</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-mapreduce-client-app</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-annotations</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-auth</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-aws</artifactId>
+                        </exclusion>
+                        <exclusion>
+                            <groupId>org.apache.hadoop</groupId>
+                            <artifactId>hadoop-client</artifactId>
+                        </exclusion>
+                    </exclusions>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
 </project>
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
index b18a0b9..5624951 100644
--- a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
@@ -22,7 +22,9 @@ import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.spark.deploy.SparkSubmit;
 
+import java.io.File;
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 
 public class SparkMain extends LauncherMain {
@@ -31,8 +33,14 @@ public class SparkMain extends LauncherMain {
     private static final String JOB_NAME_OPTION = "--name";
     private static final String CLASS_NAME_OPTION = "--class";
     private static final String VERBOSE_OPTION = "--verbose";
-    private static final String DELIM = " ";
+    private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
+    private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
+    private static final String DIST_FILES = "spark.yarn.dist.files=";
+    private static final String JARS_OPTION = "--jars";
+    private static final String DELIM = "\\s+";
 
+    private String sparkJars = null;
+    private String sparkClasspath = null;
 
     public static void main(String[] args) throws Exception {
         run(SparkMain.class, args);
@@ -47,13 +55,18 @@ public class SparkMain extends LauncherMain {
         List<String> sparkArgs = new ArrayList<String>();
 
         sparkArgs.add(MASTER_OPTION);
-        sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_MASTER));
+        String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
+        sparkArgs.add(master);
 
         String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
         if (sparkDeployMode != null) {
             sparkArgs.add(MODE_OPTION);
             sparkArgs.add(sparkDeployMode);
         }
+        boolean yarnClusterMode = master.equals("yarn-cluster")
+                || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster"));
+        boolean yarnClientMode = master.equals("yarn-client")
+                || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client"));
 
         sparkArgs.add(JOB_NAME_OPTION);
         sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME));
@@ -64,19 +77,87 @@ public class SparkMain extends LauncherMain {
             sparkArgs.add(className);
         }
 
+        String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
+
+        // In local mode, everything runs here in the Launcher Job.
+        // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn.
+        // In yarn-cluster mode, the driver and executor run in Yarn.
+        // Due to this, configuring Spark's classpath is not straightforward (see below)
+
+        // Parse the spark opts.  We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark.  The way we do
+        // this depends on what mode is being used:
+        // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument
+        // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath
+        // yarn-client: Passed as comma-separted list via spark.yarn.dist.files
+        //
+        // --jars will cause the jars to be uploaded to HDFS and localized.  To prevent the Sharelib and user jars from being
+        // unnecessarily reuploaded to HDFS, we use the HDFS paths for these.  The hadoop jars are needed as well, but we'll have
+        // to use local paths for these because they're not in the Sharelib.
+        //
+        // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only
+        // localized jars.  --jars will cause the jars to be localized to the working directory, so we can simply specify the jar
+        // names for the classpaths, as they'll be found in the working directory.
+        //
+        // This part looks more complicated than it is because we need to append the jars if the user already set things for
+        // these options
+        determineSparkJarsAndClasspath(actionConf, jarPath);
+        boolean addedExecutorClasspath = false;
+        boolean addedDriverClasspath = false;
+        boolean addedDistFiles = false;
+        boolean addedJars = false;
         String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
         if (StringUtils.isNotEmpty(sparkOpts)) {
             String[] sparkOptions = sparkOpts.split(DELIM);
-            for (String opt : sparkOptions) {
+            for (int i = 0; i < sparkOptions.length; i++) {
+                String opt = sparkOptions[i];
+                if (sparkJars != null) {
+                    if (opt.equals(JARS_OPTION)) {
+                        sparkArgs.add(opt);
+                        i++;
+                        opt = sparkOptions[i];
+                        opt = opt + "," + sparkJars;
+                        addedJars = true;
+                    } else if (yarnClientMode && opt.startsWith(DIST_FILES)) {
+                        opt = opt + "," + sparkJars;
+                        addedDistFiles = true;
+                    }
+                }
+                if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+                    if (opt.startsWith(EXECUTOR_CLASSPATH)) {
+                        opt = opt + File.pathSeparator + sparkClasspath;
+                        addedExecutorClasspath = true;
+                    }
+                    if (opt.startsWith(DRIVER_CLASSPATH)) {
+                        opt = opt + File.pathSeparator + sparkClasspath;
+                        addedDriverClasspath = true;
+                    }
+                }
                 sparkArgs.add(opt);
             }
         }
+        if (!addedJars && sparkJars != null) {
+            sparkArgs.add("--jars");
+            sparkArgs.add(sparkJars);
+        }
+        if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+            if (!addedExecutorClasspath) {
+                sparkArgs.add("--conf");
+                sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath);
+            }
+            if (!addedDriverClasspath) {
+                sparkArgs.add("--conf");
+                sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath);
+            }
+        }
+        if (yarnClientMode && !addedDistFiles && sparkJars != null) {
+            sparkArgs.add("--conf");
+            sparkArgs.add(DIST_FILES + sparkJars);
+        }
 
         if (!sparkArgs.contains(VERBOSE_OPTION)) {
             sparkArgs.add(VERBOSE_OPTION);
         }
 
-        String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
         sparkArgs.add(jarPath);
 
         for (String arg : args) {
@@ -103,4 +184,47 @@ public class SparkMain extends LauncherMain {
         System.out.flush();
         SparkSubmit.main(args);
     }
+
+    private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) {
+        // distCache gets all of the Sharelib and user jars (from the Distributed Cache)
+        // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache
+        // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths
+        // sparkClasspath becomes all of the jar names in sparkJars (without paths)
+        // We also remove the Spark job jar and job.jar
+        String[] distCache = new String[]{};
+        String dCache = actionConf.get("mapreduce.job.classpath.files");
+        if (dCache != null) {
+            distCache = dCache.split(",");
+        }
+        String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator);
+        StringBuilder cp = new StringBuilder();
+        StringBuilder jars = new StringBuilder();
+        HashSet<String> distCacheJars = new HashSet<String>(distCache.length);
+        for (String path : distCache) {
+            // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here
+            if (!path.equals(jarPath)) {
+                String name = path.substring(path.lastIndexOf("/") + 1);
+                distCacheJars.add(name);
+                cp.append(name).append(File.pathSeparator);
+                jars.append(path).append(",");
+            }
+        }
+        for (String path : classpath) {
+            if (!path.startsWith("job.jar") && path.endsWith(".jar")) {
+                String name = path.substring(path.lastIndexOf("/") + 1);
+                if (!distCacheJars.contains(name)) {
+                    jars.append(path).append(",");
+                }
+                cp.append(name).append(File.pathSeparator);
+            }
+        }
+        if (cp.length() > 0) {
+            cp.setLength(cp.length() - 1);
+            sparkClasspath = cp.toString();
+        }
+        if (jars.length() > 0) {
+            jars.setLength(jars.length() - 1);
+            sparkJars = jars.toString();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index f271abc..dcd2360 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -67,8 +67,11 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
     }
 
     public void testSetupMethods() throws Exception {
-        _testSetupMethods("local[*]", new HashMap<String, String>());
-        _testSetupMethods("yarn", new HashMap<String, String>());
+        _testSetupMethods("local[*]", new HashMap<String, String>(), "client");
+        _testSetupMethods("yarn", new HashMap<String, String>(), "cluster");
+        _testSetupMethods("yarn", new HashMap<String, String>(), "client");
+        _testSetupMethods("yarn-cluster", new HashMap<String, String>(), null);
+        _testSetupMethods("yarn-client", new HashMap<String, String>(), null);
     }
 
     public void testSetupMethodsWithSparkConfiguration() throws Exception {
@@ -91,15 +94,16 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
                 getJobTrackerUri() + "=" + sparkConfDir.getAbsolutePath());
         scs.init(Services.get());
 
-        _testSetupMethods("local[*]", new HashMap<String, String>());
+        _testSetupMethods("local[*]", new HashMap<String, String>(), "client");
         Map<String, String> extraSparkOpts = new HashMap<String, String>(2);
         extraSparkOpts.put("a", "A");
         extraSparkOpts.put("b", "B");
-        _testSetupMethods("yarn", extraSparkOpts);
+        _testSetupMethods("yarn-cluster", extraSparkOpts, null);
+        _testSetupMethods("yarn-client", extraSparkOpts, null);
     }
 
     @SuppressWarnings("unchecked")
-    private void _testSetupMethods(String master, Map<String, String> extraSparkOpts) throws Exception {
+    private void _testSetupMethods(String master, Map<String, String> extraSparkOpts, String mode) throws Exception {
         SparkActionExecutor ae = new SparkActionExecutor();
         assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses());
 
@@ -107,7 +111,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
                 "<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
                 "<name-node>" + getNameNodeUri() + "</name-node>" +
                 "<master>" + master + "</master>" +
-                "<mode>client</mode>" +
+                (mode != null ? "<mode>" + mode + "</mode>" : "") +
                 "<name>Some Name</name>" +
                 "<class>org.apache.oozie.foo</class>" +
                 "<jar>" + getNameNodeUri() + "/foo.jar</jar>" +
@@ -126,7 +130,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
         Configuration conf = ae.createBaseHadoopConf(context, actionXml);
         ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
         assertEquals(master, conf.get("oozie.spark.master"));
-        assertEquals("client", conf.get("oozie.spark.mode"));
+        assertEquals(mode, conf.get("oozie.spark.mode"));
         assertEquals("Some Name", conf.get("oozie.spark.name"));
         assertEquals("org.apache.oozie.foo", conf.get("oozie.spark.class"));
         assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar"));