You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/06/08 11:59:38 UTC

oozie git commit: OOZIE-2923 Improve Spark options parsing (andras.piros via gezapeti)

Repository: oozie
Updated Branches:
  refs/heads/master 230e42611 -> f45b18791


OOZIE-2923 Improve Spark options parsing (andras.piros via gezapeti)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f45b1879
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f45b1879
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f45b1879

Branch: refs/heads/master
Commit: f45b18791a92201fb4959857cafbda7543942022
Parents: 230e426
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Thu Jun 8 13:59:06 2017 +0200
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Thu Jun 8 13:59:17 2017 +0200

----------------------------------------------------------------------
 release-log.txt                                 |   1 +
 .../oozie/action/hadoop/HadoopUriFinder.java    |  66 +++
 .../apache/oozie/action/hadoop/JarFilter.java   | 122 +++++
 .../oozie/action/hadoop/SparkArgsExtractor.java | 334 ++++++++++++
 .../apache/oozie/action/hadoop/SparkMain.java   | 520 +++----------------
 .../action/hadoop/SparkOptionsSplitter.java     |  64 +++
 .../oozie/action/hadoop/TestJarFilter.java      |   1 -
 .../action/hadoop/TestSparkArgsExtractor.java   | 197 +++++++
 .../oozie/action/hadoop/TestSparkMain.java      |  13 -
 .../action/hadoop/TestSparkOptionsSplitter.java |  16 +-
 10 files changed, 851 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 5743ac9..5c5ae92 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2923 Improve Spark options parsing (andras.piros via gezapeti)
 OOZIE-2886 Ensure consistent versioning of hadoop jars in sharelibs (dbist13 via rkanter)
 OOZIE-2875 Typo in ssh action twiki docs (Dongying Jiao via gezapeti)
 OOZIE-2387 Oozie is Unable to handle Spaces in file/archive tag. (asasvari via gezapeti)

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java
new file mode 100644
index 0000000..3f9e982
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+class HadoopUriFinder {
+
+    static String getJarVersion(final File jarFile) throws IOException {
+        try (final JarFile openedJarFile = new JarFile(jarFile)) {
+            final Manifest manifest = openedJarFile.getManifest();
+            return manifest.getMainAttributes().getValue("Specification-Version");
+        }
+    }
+
+    static URI getFixedUri(final URI fileUri) throws URISyntaxException, IOException {
+        final FileSystem fs = FileSystem.get(new Configuration(true));
+        return getFixedUri(fs, fileUri);
+    }
+
+    /**
+     * Spark compares URIs based on scheme, host and port. Here we convert URIs
+     * into the default format so that Spark won't think those belong to
+     * different file system. This will avoid an extra copy of files which
+     * already exists on same hdfs.
+     *
+     * @param fs
+     * @param fileUri
+     * @return fixed uri
+     * @throws URISyntaxException
+     */
+    static URI getFixedUri(final FileSystem fs, final URI fileUri) throws URISyntaxException {
+        if (fs.getUri().getScheme().equals(fileUri.getScheme())
+                && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
+                && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
+                        || fs.getUri().getPort() == fileUri.getPort())) {
+            return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(),
+                    fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
+        }
+        return fileUri;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java
new file mode 100644
index 0000000..d0b4b5e
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * This class is used for filtering out unwanted jars.
+ */
+class JarFilter {
+    private String sparkVersion = "1.X.X";
+    private String sparkYarnJar;
+    private String applicationJar;
+    private Collection<URI> listUris = null;
+
+    /**
+     * @param listUris List of URIs to be filtered
+     * @param jarPath Application jar
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    JarFilter(final Collection<URI> listUris, final String jarPath) throws URISyntaxException, IOException {
+        this.listUris = listUris;
+        applicationJar = jarPath;
+        final Path p = new Path(jarPath);
+        if (p.isAbsolute()) {
+            applicationJar = HadoopUriFinder.getFixedUri(p.toUri()).toString();
+        }
+    }
+
+    /**
+     * Filters out the Spark yarn jar and application jar. Also records
+     * spark yarn jar's version.
+     *
+     * @throws OozieActionConfiguratorException
+     */
+    void filter() throws OozieActionConfiguratorException {
+        final Iterator<URI> iterator = listUris.iterator();
+        File matchedFile = null;
+        final Path applJarPath = new Path(applicationJar);
+        while (iterator.hasNext()) {
+            final URI uri = iterator.next();
+            final Path p = new Path(uri);
+            if (SparkMain.SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
+                matchedFile = SparkMain.getMatchingFile(SparkMain.SPARK_YARN_JAR_PATTERN);
+            }
+            else if (SparkMain.SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
+                matchedFile = SparkMain.getMatchingFile(SparkMain.SPARK_ASSEMBLY_JAR_PATTERN);
+            }
+            if (matchedFile != null) {
+                sparkYarnJar = uri.toString();
+                try {
+                    sparkVersion = HadoopUriFinder.getJarVersion(matchedFile);
+                    System.out.println("Spark Version " + sparkVersion);
+                }
+                catch (final IOException io) {
+                    System.out.println(
+                            "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
+                }
+                iterator.remove();
+                matchedFile = null;
+            }
+            // Here we skip the application jar, because
+            // (if uris are same,) it will get distributed multiple times
+            // - one time with --files and another time as application jar.
+            if (isApplicationJar(p.getName(), uri, applJarPath)) {
+                final String fragment = uri.getFragment();
+                applicationJar = fragment != null && fragment.length() > 0 ? fragment : uri.toString();
+                iterator.remove();
+            }
+        }
+    }
+
+    /**
+     * Checks if a file is application jar
+     *
+     * @param fileName fileName name of the file
+     * @param fileUri fileUri URI of the file
+     * @param applJarPath Path of application jar
+     * @return true if fileName or fileUri is the application jar
+     */
+    private boolean isApplicationJar(final String fileName, final URI fileUri, final Path applJarPath) {
+        return (fileName.equals(applicationJar) || fileUri.toString().equals(applicationJar)
+                || applJarPath.getName().equals(fileName)
+                || applicationJar.equals(fileUri.getFragment()));
+    }
+
+    String getApplicationJar() {
+        return applicationJar;
+    }
+
+    String getSparkYarnJar() {
+        return sparkYarnJar;
+    }
+
+    String getSparkVersion() {
+        return sparkVersion;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
new file mode 100644
index 0000000..ffc95f9
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+class SparkArgsExtractor {
+    private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf");
+    private static final String FILES_OPTION = "--files";
+    private static final String ARCHIVES_OPTION = "--archives";
+    private static final String LOG4J_CONFIGURATION_JAVA_OPTION = "-Dlog4j.configuration=";
+    private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled";
+    private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled";
+    private static final String PWD = "$PWD" + File.separator + "*";
+    private static final String MASTER_OPTION = "--master";
+    private static final String MODE_OPTION = "--deploy-mode";
+    private static final String JOB_NAME_OPTION = "--name";
+    private static final String CLASS_NAME_OPTION = "--class";
+    private static final String VERBOSE_OPTION = "--verbose";
+    private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path";
+    private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
+    private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
+    private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions=";
+    private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions=";
+    private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
+    private static final String SPARK_YARN_JAR = "spark.yarn.jar";
+    private static final String SPARK_YARN_JARS = "spark.yarn.jars";
+    private static final String OPT_SEPARATOR = "=";
+    private static final String OPT_VALUE_SEPARATOR = ",";
+    private static final String CONF_OPTION = "--conf";
+    private static final String MASTER_OPTION_YARN_CLUSTER = "yarn-cluster";
+    private static final String MASTER_OPTION_YARN_CLIENT = "yarn-client";
+    private static final String MASTER_OPTION_YARN = "yarn";
+    private static final String DEPLOY_MODE_CLUSTER = "cluster";
+    private static final String DEPLOY_MODE_CLIENT = "client";
+    private static final String SPARK_YARN_TAGS = "spark.yarn.tags";
+    private static final String OPT_PROPERTIES_FILE = "--properties-file";
+
+    private boolean pySpark = false;
+    private final Configuration actionConf;
+
+    SparkArgsExtractor(final Configuration actionConf) {
+        this.actionConf = actionConf;
+    }
+
+    boolean isPySpark() {
+        return pySpark;
+    }
+
+    List<String> extract(final String[] mainArgs) throws OozieActionConfiguratorException, IOException, URISyntaxException {
+        final List<String> sparkArgs = new ArrayList<>();
+
+        sparkArgs.add(MASTER_OPTION);
+        final String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
+        sparkArgs.add(master);
+
+        // In local mode, everything runs here in the Launcher Job.
+        // In yarn-client mode, the driver runs here in the Launcher Job and the
+        // executor in Yarn.
+        // In yarn-cluster mode, the driver and executor run in Yarn.
+        final String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
+        if (sparkDeployMode != null) {
+            sparkArgs.add(MODE_OPTION);
+            sparkArgs.add(sparkDeployMode);
+        }
+        final boolean yarnClusterMode = master.equals(MASTER_OPTION_YARN_CLUSTER)
+                || (master.equals(MASTER_OPTION_YARN) && sparkDeployMode != null && sparkDeployMode.equals(DEPLOY_MODE_CLUSTER));
+        final boolean yarnClientMode = master.equals(MASTER_OPTION_YARN_CLIENT)
+                || (master.equals(MASTER_OPTION_YARN) && sparkDeployMode != null && sparkDeployMode.equals(DEPLOY_MODE_CLIENT));
+
+        sparkArgs.add(JOB_NAME_OPTION);
+        sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME));
+
+        final String className = actionConf.get(SparkActionExecutor.SPARK_CLASS);
+        if (className != null) {
+            sparkArgs.add(CLASS_NAME_OPTION);
+            sparkArgs.add(className);
+        }
+
+        appendOoziePropertiesToSparkConf(sparkArgs);
+
+        String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
+        if (jarPath != null && jarPath.endsWith(".py")) {
+            pySpark = true;
+        }
+        boolean addedHiveSecurityToken = false;
+        boolean addedHBaseSecurityToken = false;
+        boolean addedLog4jDriverSettings = false;
+        boolean addedLog4jExecutorSettings = false;
+        final StringBuilder driverClassPath = new StringBuilder();
+        final StringBuilder executorClassPath = new StringBuilder();
+        final StringBuilder userFiles = new StringBuilder();
+        final StringBuilder userArchives = new StringBuilder();
+        final String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
+        if (StringUtils.isNotEmpty(sparkOpts)) {
+            final List<String> sparkOptions = SparkOptionsSplitter.splitSparkOpts(sparkOpts);
+            for (int i = 0; i < sparkOptions.size(); i++) {
+                String opt = sparkOptions.get(i);
+                boolean addToSparkArgs = true;
+                if (yarnClusterMode || yarnClientMode) {
+                    if (opt.startsWith(EXECUTOR_CLASSPATH)) {
+                        appendWithPathSeparator(opt.substring(EXECUTOR_CLASSPATH.length()), executorClassPath);
+                        addToSparkArgs = false;
+                    }
+                    if (opt.startsWith(DRIVER_CLASSPATH)) {
+                        appendWithPathSeparator(opt.substring(DRIVER_CLASSPATH.length()), driverClassPath);
+                        addToSparkArgs = false;
+                    }
+                    if (opt.equals(DRIVER_CLASSPATH_OPTION)) {
+                        // we need the next element after this option
+                        appendWithPathSeparator(sparkOptions.get(i + 1), driverClassPath);
+                        // increase i to skip the next element.
+                        i++;
+                        addToSparkArgs = false;
+                    }
+                }
+                if (opt.startsWith(HIVE_SECURITY_TOKEN)) {
+                    addedHiveSecurityToken = true;
+                }
+                if (opt.startsWith(HBASE_SECURITY_TOKEN)) {
+                    addedHBaseSecurityToken = true;
+                }
+                if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) {
+                    if (!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) {
+                        opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS;
+                    } else {
+                        System.out.println("Warning: Spark Log4J settings are overwritten." +
+                                " Child job IDs may not be available");
+                    }
+                    if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS)) {
+                        addedLog4jExecutorSettings = true;
+                    } else {
+                        addedLog4jDriverSettings = true;
+                    }
+                }
+                if (opt.startsWith(FILES_OPTION)) {
+                    final String userFile;
+                    if (opt.contains(OPT_SEPARATOR)) {
+                        userFile = opt.substring(opt.indexOf(OPT_SEPARATOR) + OPT_SEPARATOR.length());
+                    }
+                    else {
+                        userFile = sparkOptions.get(i + 1);
+                        i++;
+                    }
+                    if (userFiles.length() > 0) {
+                        userFiles.append(OPT_VALUE_SEPARATOR);
+                    }
+                    userFiles.append(userFile);
+                    addToSparkArgs = false;
+                }
+                if (opt.startsWith(ARCHIVES_OPTION)) {
+                    final String userArchive;
+                    if (opt.contains(OPT_SEPARATOR)) {
+                        userArchive = opt.substring(opt.indexOf(OPT_SEPARATOR) + OPT_SEPARATOR.length());
+                    }
+                    else {
+                        userArchive = sparkOptions.get(i + 1);
+                        i++;
+                    }
+                    if (userArchives.length() > 0) {
+                        userArchives.append(OPT_VALUE_SEPARATOR);
+                    }
+                    userArchives.append(userArchive);
+                    addToSparkArgs = false;
+                }
+                if (addToSparkArgs) {
+                    sparkArgs.add(opt);
+                }
+                else if (sparkArgs.get(sparkArgs.size() - 1).equals(CONF_OPTION)) {
+                    sparkArgs.remove(sparkArgs.size() - 1);
+                }
+            }
+        }
+
+        if ((yarnClusterMode || yarnClientMode)) {
+            // Include the current working directory (of executor container)
+            // in executor classpath, because it will contain localized
+            // files
+            appendWithPathSeparator(PWD, executorClassPath);
+            appendWithPathSeparator(PWD, driverClassPath);
+
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(EXECUTOR_CLASSPATH + executorClassPath.toString());
+
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(DRIVER_CLASSPATH + driverClassPath.toString());
+        }
+
+        if (actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS) != null) {
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(SPARK_YARN_TAGS + OPT_SEPARATOR + actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS));
+        }
+
+        if (!addedHiveSecurityToken) {
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(HIVE_SECURITY_TOKEN + OPT_SEPARATOR + Boolean.toString(false));
+        }
+        if (!addedHBaseSecurityToken) {
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(HBASE_SECURITY_TOKEN + OPT_SEPARATOR + Boolean.toString(false));
+        }
+        if (!addedLog4jExecutorSettings) {
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(EXECUTOR_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS);
+        }
+        if (!addedLog4jDriverSettings) {
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS);
+        }
+        final File defaultConfFile = SparkMain.getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN);
+        if (defaultConfFile != null) {
+            sparkArgs.add(OPT_PROPERTIES_FILE);
+            sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString());
+        }
+
+        if ((yarnClusterMode || yarnClientMode)) {
+            final Map<String, URI> fixedFileUrisMap =
+                    SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
+            fixedFileUrisMap.put(SparkMain.SPARK_LOG4J_PROPS, new Path(SparkMain.SPARK_LOG4J_PROPS).toUri());
+            fixedFileUrisMap.put(SparkMain.HIVE_SITE_CONF, new Path(SparkMain.HIVE_SITE_CONF).toUri());
+            addUserDefined(userFiles.toString(), fixedFileUrisMap);
+            final Collection<URI> fixedFileUris = fixedFileUrisMap.values();
+            final JarFilter jarFilter = new JarFilter(fixedFileUris, jarPath);
+            jarFilter.filter();
+            jarPath = jarFilter.getApplicationJar();
+
+            final String cachedFiles = StringUtils.join(fixedFileUris, OPT_VALUE_SEPARATOR);
+            if (cachedFiles != null && !cachedFiles.isEmpty()) {
+                sparkArgs.add(FILES_OPTION);
+                sparkArgs.add(cachedFiles);
+            }
+            final Map<String, URI> fixedArchiveUrisMap = SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
+                    getCacheArchives(actionConf));
+            addUserDefined(userArchives.toString(), fixedArchiveUrisMap);
+            final String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), OPT_VALUE_SEPARATOR);
+            if (cachedArchives != null && !cachedArchives.isEmpty()) {
+                sparkArgs.add(ARCHIVES_OPTION);
+                sparkArgs.add(cachedArchives);
+            }
+            setSparkYarnJarsConf(sparkArgs, jarFilter.getSparkYarnJar(), jarFilter.getSparkVersion());
+        }
+
+        if (!sparkArgs.contains(VERBOSE_OPTION)) {
+            sparkArgs.add(VERBOSE_OPTION);
+        }
+
+        sparkArgs.add(jarPath);
+        sparkArgs.addAll(Arrays.asList(mainArgs));
+
+        return sparkArgs;
+    }
+
+    private void appendWithPathSeparator(final String what, final StringBuilder to) {
+        if (to.length() > 0) {
+            to.append(File.pathSeparator);
+        }
+        to.append(what);
+    }
+
+    private void addUserDefined(final String userList, final Map<String, URI> urisMap) {
+        if (userList != null) {
+            for (final String file : userList.split(OPT_VALUE_SEPARATOR)) {
+                if (!Strings.isEmpty(file)) {
+                    final Path p = new Path(file);
+                    urisMap.put(p.getName(), p.toUri());
+                }
+            }
+        }
+    }
+
+    /*
+     * Get properties that needs to be passed to Spark as Spark configuration from actionConf.
+     */
+    @VisibleForTesting
+    void appendOoziePropertiesToSparkConf(final List<String> sparkArgs) {
+        for (final Map.Entry<String, String> oozieConfig : actionConf
+                .getValByRegex("^oozie\\.(?!launcher|spark).+").entrySet()) {
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(String.format("spark.%s=%s", oozieConfig.getKey(), oozieConfig.getValue()));
+        }
+    }
+
+    /**
+     * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
+     *
+     * @param sparkArgs
+     * @param sparkYarnJar
+     * @param sparkVersion
+     */
+    private void setSparkYarnJarsConf(final List<String> sparkArgs, final String sparkYarnJar, final String sparkVersion) {
+        if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
+            // In Spark 1.X.X, set spark.yarn.jar to avoid
+            // multiple distribution
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(SPARK_YARN_JAR + OPT_SEPARATOR + sparkYarnJar);
+        } else {
+            // In Spark 2.X.X, set spark.yarn.jars
+            sparkArgs.add(CONF_OPTION);
+            sparkArgs.add(SPARK_YARN_JARS + OPT_SEPARATOR + sparkYarnJar);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 68f7a60..674839a 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -24,20 +24,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.jar.JarFile;
-import java.util.jar.Manifest;
 import java.util.regex.Pattern;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -47,243 +40,56 @@ import org.apache.spark.deploy.SparkSubmit;
 import com.google.common.annotations.VisibleForTesting;
 
 public class SparkMain extends LauncherMain {
-    private static final String MASTER_OPTION = "--master";
-    private static final String MODE_OPTION = "--deploy-mode";
-    private static final String JOB_NAME_OPTION = "--name";
-    private static final String CLASS_NAME_OPTION = "--class";
-    private static final String VERBOSE_OPTION = "--verbose";
-    private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path";
-    private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
-    private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
-    private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions=";
-    private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions=";
-    private static final String LOG4J_CONFIGURATION_JAVA_OPTION = "-Dlog4j.configuration=";
-    private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled";
-    private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled";
-    private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir";
-    private static final String PWD = "$PWD" + File.separator + "*";
-    private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"),
-            Pattern.compile("pyspark.zip") };
-    private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf");
-    private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
+
     @VisibleForTesting
     static final Pattern[] SPARK_JOB_IDS_PATTERNS = {
             Pattern.compile("Submitted application (application[0-9_]*)") };
-    public static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern
+    @VisibleForTesting
+    static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern
             .compile("^spark-assembly((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
-    public static final Pattern SPARK_YARN_JAR_PATTERN = Pattern
+    @VisibleForTesting
+    static final Pattern SPARK_YARN_JAR_PATTERN = Pattern
             .compile("^spark-yarn((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
-    private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
-    private static final String SPARK_YARN_JAR = "spark.yarn.jar";
-    private static final String SPARK_YARN_JARS = "spark.yarn.jars";
-    public static final String HIVE_SITE_CONF = "hive-site.xml";
-    public static final String FILES_OPTION = "--files";
-    public static final String ARCHIVES_OPTION = "--archives";
-
-    public static void main(String[] args) throws Exception {
+    static final String HIVE_SITE_CONF = "hive-site.xml";
+    static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
+
+    private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir";
+    private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"),
+            Pattern.compile("pyspark.zip") };
+
+    public static void main(final String[] args) throws Exception {
         run(SparkMain.class, args);
     }
 
     @Override
-    protected void run(String[] args) throws Exception {
-        boolean isPyspark = false;
-        Configuration actionConf = loadActionConf();
+    protected void run(final String[] args) throws Exception {
+        final Configuration actionConf = loadActionConf();
         prepareHadoopConfig(actionConf);
 
         setYarnTag(actionConf);
         LauncherMain.killChildYarnJobs(actionConf);
-        String logFile = setUpSparkLog4J(actionConf);
+        final String logFile = setUpSparkLog4J(actionConf);
         setHiveSite(actionConf);
-        List<String> sparkArgs = new ArrayList<String>();
-
-        sparkArgs.add(MASTER_OPTION);
-        String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
-        sparkArgs.add(master);
-
-        // In local mode, everything runs here in the Launcher Job.
-        // In yarn-client mode, the driver runs here in the Launcher Job and the
-        // executor in Yarn.
-        // In yarn-cluster mode, the driver and executor run in Yarn.
-        String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
-        if (sparkDeployMode != null) {
-            sparkArgs.add(MODE_OPTION);
-            sparkArgs.add(sparkDeployMode);
-        }
-        boolean yarnClusterMode = master.equals("yarn-cluster")
-                || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster"));
-        boolean yarnClientMode = master.equals("yarn-client")
-                || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client"));
-
-        sparkArgs.add(JOB_NAME_OPTION);
-        sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME));
-
-        String className = actionConf.get(SparkActionExecutor.SPARK_CLASS);
-        if (className != null) {
-            sparkArgs.add(CLASS_NAME_OPTION);
-            sparkArgs.add(className);
-        }
-
-        appendOoziePropertiesToSparkConf(sparkArgs, actionConf);
-
-        String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
-        if(jarPath!=null && jarPath.endsWith(".py")){
-            isPyspark = true;
-        }
-        boolean addedHiveSecurityToken = false;
-        boolean addedHBaseSecurityToken = false;
-        boolean addedLog4jDriverSettings = false;
-        boolean addedLog4jExecutorSettings = false;
-        StringBuilder driverClassPath = new StringBuilder();
-        StringBuilder executorClassPath = new StringBuilder();
-        String userFiles = null, userArchives = null;
-        String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
-        if (StringUtils.isNotEmpty(sparkOpts)) {
-            List<String> sparkOptions = splitSparkOpts(sparkOpts);
-            for (int i = 0; i < sparkOptions.size(); i++) {
-                String opt = sparkOptions.get(i);
-                boolean addToSparkArgs = true;
-                if (yarnClusterMode || yarnClientMode) {
-                    if (opt.startsWith(EXECUTOR_CLASSPATH)) {
-                        appendWithPathSeparator(opt.substring(EXECUTOR_CLASSPATH.length()), executorClassPath);
-                        addToSparkArgs = false;
-                    }
-                    if (opt.startsWith(DRIVER_CLASSPATH)) {
-                        appendWithPathSeparator(opt.substring(DRIVER_CLASSPATH.length()), driverClassPath);
-                        addToSparkArgs = false;
-                    }
-                    if (opt.equals(DRIVER_CLASSPATH_OPTION)) {
-                        // we need the next element after this option
-                        appendWithPathSeparator(sparkOptions.get(i + 1), driverClassPath);
-                        // increase i to skip the next element.
-                        i++;
-                        addToSparkArgs = false;
-                    }
-                }
-                if (opt.startsWith(HIVE_SECURITY_TOKEN)) {
-                    addedHiveSecurityToken = true;
-                }
-                if (opt.startsWith(HBASE_SECURITY_TOKEN)) {
-                    addedHBaseSecurityToken = true;
-                }
-                if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) {
-                    if(!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) {
-                        opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS;
-                    }else{
-                        System.out.println("Warning: Spark Log4J settings are overwritten." +
-                                " Child job IDs may not be available");
-                    }
-                    if(opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS)) {
-                        addedLog4jExecutorSettings = true;
-                    }else{
-                        addedLog4jDriverSettings = true;
-                    }
-                }
-                if(opt.startsWith(FILES_OPTION)) {
-                    userFiles = sparkOptions.get(i + 1);
-                    i++;
-                    addToSparkArgs = false;
-                }
-                if(opt.startsWith(ARCHIVES_OPTION)) {
-                    userArchives = sparkOptions.get(i + 1);
-                    i++;
-                    addToSparkArgs = false;
-                }
-                if(addToSparkArgs) {
-                    sparkArgs.add(opt);
-                }
-            }
-        }
-
-        if ((yarnClusterMode || yarnClientMode)) {
-            // Include the current working directory (of executor container)
-            // in executor classpath, because it will contain localized
-            // files
-            appendWithPathSeparator(PWD, executorClassPath);
-            appendWithPathSeparator(PWD, driverClassPath);
-
-            sparkArgs.add("--conf");
-            sparkArgs.add(EXECUTOR_CLASSPATH + executorClassPath.toString());
-
-            sparkArgs.add("--conf");
-            sparkArgs.add(DRIVER_CLASSPATH + driverClassPath.toString());
-        }
-
-        if (actionConf.get(MAPREDUCE_JOB_TAGS) != null) {
-            sparkArgs.add("--conf");
-            sparkArgs.add("spark.yarn.tags=" + actionConf.get(MAPREDUCE_JOB_TAGS));
-        }
 
-        if (!addedHiveSecurityToken) {
-            sparkArgs.add("--conf");
-            sparkArgs.add(HIVE_SECURITY_TOKEN + "=false");
-        }
-        if (!addedHBaseSecurityToken) {
-            sparkArgs.add("--conf");
-            sparkArgs.add(HBASE_SECURITY_TOKEN + "=false");
-        }
-        if(!addedLog4jExecutorSettings) {
-            sparkArgs.add("--conf");
-            sparkArgs.add(EXECUTOR_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS);
-        }
-        if(!addedLog4jDriverSettings) {
-            sparkArgs.add("--conf");
-            sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS);
-        }
-        File defaultConfFile = getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN);
-        if (defaultConfFile != null) {
-            sparkArgs.add("--properties-file");
-            sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString());
-        }
-
-        if ((yarnClusterMode || yarnClientMode)) {
-            Map<String, URI> fixedFileUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
-            fixedFileUrisMap.put(SPARK_LOG4J_PROPS, new Path(SPARK_LOG4J_PROPS).toUri());
-            fixedFileUrisMap.put(HIVE_SITE_CONF, new Path(HIVE_SITE_CONF).toUri());
-            addUserDefined(userFiles, fixedFileUrisMap);
-            Collection<URI> fixedFileUris = fixedFileUrisMap.values();
-            JarFilter jarfilter = new JarFilter(fixedFileUris, jarPath);
-            jarfilter.filter();
-            jarPath = jarfilter.getApplicationJar();
-
-            String cachedFiles = StringUtils.join(fixedFileUris, ",");
-            if (cachedFiles != null && !cachedFiles.isEmpty()) {
-                sparkArgs.add("--files");
-                sparkArgs.add(cachedFiles);
-            }
-            Map<String, URI> fixedArchiveUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
-                    getCacheArchives(actionConf));
-            addUserDefined(userArchives, fixedArchiveUrisMap);
-            String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), ",");
-            if (cachedArchives != null && !cachedArchives.isEmpty()) {
-                sparkArgs.add("--archives");
-                sparkArgs.add(cachedArchives);
-            }
-            setSparkYarnJarsConf(sparkArgs, jarfilter.getSparkYarnJar(), jarfilter.getSparkVersion());
-        }
+        final SparkArgsExtractor sparkArgsExtractor = new SparkArgsExtractor(actionConf);
+        final List<String> sparkArgs = sparkArgsExtractor.extract(args);
 
-        if (!sparkArgs.contains(VERBOSE_OPTION)) {
-            sparkArgs.add(VERBOSE_OPTION);
-        }
-
-        sparkArgs.add(jarPath);
-        for (String arg : args) {
-            sparkArgs.add(arg);
-        }
-        if (isPyspark){
+        if (sparkArgsExtractor.isPySpark()){
             createPySparkLibFolder();
         }
 
-
         System.out.println("Spark Action Main class        : " + SparkSubmit.class.getName());
         System.out.println();
         System.out.println("Oozie Spark action configuration");
         System.out.println("=================================================================");
         System.out.println();
-        PasswordMasker passwordMasker = new PasswordMasker();
-        for (String arg : sparkArgs) {
+
+        final PasswordMasker passwordMasker = new PasswordMasker();
+        for (final String arg : sparkArgs) {
             System.out.println("                    " + passwordMasker.maskPasswordsIfNecessary(arg));
         }
         System.out.println();
+
         try {
             runSpark(sparkArgs.toArray(new String[sparkArgs.size()]));
         }
@@ -293,21 +99,12 @@ public class SparkMain extends LauncherMain {
         }
     }
 
-    private void addUserDefined(String userList, Map<String, URI> urisMap) {
-        if(userList != null) {
-            for (String file : userList.split(",")) {
-                Path p = new Path(file);
-                urisMap.put(p.getName(), p.toUri());
-            }
-        }
-    }
-
-    private void prepareHadoopConfig(Configuration actionConf) throws IOException {
+    private void prepareHadoopConfig(final Configuration actionConf) throws IOException {
         // Copying oozie.action.conf.xml into hadoop configuration *-site files.
         if (actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, false)) {
-            String actionXml = System.getProperty("oozie.action.conf.xml");
+            final String actionXml = System.getProperty("oozie.action.conf.xml");
             if (actionXml != null) {
-                File currentDir = new File(actionXml).getParentFile();
+                final File currentDir = new File(actionXml).getParentFile();
                 writeHadoopConfig(actionXml, currentDir);
             }
         }
@@ -321,15 +118,15 @@ public class SparkMain extends LauncherMain {
      * @throws IOException if there is an error during file copy
      */
     private void createPySparkLibFolder() throws OozieActionConfiguratorException, IOException {
-        File pythonLibDir = new File("python/lib");
+        final File pythonLibDir = new File("python/lib");
         if(!pythonLibDir.exists()){
             pythonLibDir.mkdirs();
             System.out.println("PySpark lib folder " + pythonLibDir.getAbsolutePath() + " folder created.");
         }
 
-        for(Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) {
-            File file = getMatchingPyFile(fileNamePattern);
-            File destination = new File(pythonLibDir, file.getName());
+        for(final Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) {
+            final File file = getMatchingPyFile(fileNamePattern);
+            final File destination = new File(pythonLibDir, file.getName());
             FileUtils.copyFile(file, destination);
             System.out.println("Copied " + file + " to " + destination.getAbsolutePath());
         }
@@ -342,8 +139,8 @@ public class SparkMain extends LauncherMain {
      * @return the file if there is one
      * @throws OozieActionConfiguratorException if there is are no files matching the pattern
      */
-    private File getMatchingPyFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
-        File f = getMatchingFile(fileNamePattern);
+    private File getMatchingPyFile(final Pattern fileNamePattern) throws OozieActionConfiguratorException {
+        final File f = getMatchingFile(fileNamePattern);
         if (f != null) {
             return f;
         }
@@ -359,22 +156,24 @@ public class SparkMain extends LauncherMain {
      * @param fileNamePattern the pattern to look for
      * @return the file if there is one else it returns null
      */
-    private static File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
-        File localDir = new File(".");
-        String[] files = localDir.list();
-
-        if (files != null) {
-            for(String fileName : files){
-                if(fileNamePattern.matcher(fileName).find()){
-                    return new File(fileName);
-                }
+    static File getMatchingFile(final Pattern fileNamePattern) {
+        final File localDir = new File(".");
+
+        final String[] localFileNames = localDir.list();
+        if (localFileNames == null) {
+            return null;
+        }
+
+        for (final String fileName : localFileNames){
+            if (fileNamePattern.matcher(fileName).find()){
+                return new File(fileName);
             }
         }
 
         return null;
     }
 
-    private void runSpark(String[] args) throws Exception {
+    private void runSpark(final String[] args) throws Exception {
         System.out.println("=================================================================");
         System.out.println();
         System.out.println(">>> Invoking Spark class now >>>");
@@ -383,49 +182,16 @@ public class SparkMain extends LauncherMain {
         SparkSubmit.main(args);
     }
 
-    /**
-     * Converts the options to be Spark-compatible.
-     * <ul>
-     *     <li>Parameters are separated by whitespace and can be groupped using double quotes</li>
-     *     <li>Quotes should be removed</li>
-     *     <li>Adjacent whitespace separators are treated as one</li>
-     * </ul>
-     * @param sparkOpts the options for Spark
-     * @return the options parsed into a list
-     */
-    static List<String> splitSparkOpts(String sparkOpts){
-        List<String> result = new ArrayList<String>();
-        StringBuilder currentWord = new StringBuilder();
-        boolean insideQuote = false;
-        for (int i = 0; i < sparkOpts.length(); i++) {
-            char c = sparkOpts.charAt(i);
-            if (c == '"') {
-                insideQuote = !insideQuote;
-            } else if (Character.isWhitespace(c) && !insideQuote) {
-                if (currentWord.length() > 0) {
-                    result.add(currentWord.toString());
-                    currentWord.setLength(0);
-                }
-            } else {
-                currentWord.append(c);
-            }
-        }
-        if(currentWord.length()>0) {
-            result.add(currentWord.toString());
-        }
-        return result;
-    }
-
-    private String setUpSparkLog4J(Configuration distcpConf) throws IOException {
+    private String setUpSparkLog4J(final Configuration actionConf) throws IOException {
         // Logfile to capture job IDs
-        String hadoopJobId = System.getProperty("oozie.launcher.job.id");
+        final String hadoopJobId = System.getProperty("oozie.launcher.job.id");
         if (hadoopJobId == null) {
             throw new RuntimeException("Launcher Hadoop Job ID system,property not set");
         }
-        String logFile = new File("spark-oozie-" + hadoopJobId + ".log").getAbsolutePath();
+        final String logFile = new File("spark-oozie-" + hadoopJobId + ".log").getAbsolutePath();
 
-        String logLevel = distcpConf.get("oozie.spark.log.level", "INFO");
-        String rootLogLevel = distcpConf.get("oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL, "INFO");
+        final String logLevel = actionConf.get("oozie.spark.log.level", "INFO");
+        final String rootLogLevel = actionConf.get("oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL, "INFO");
 
         log4jProperties.setProperty("log4j.rootLogger", rootLogLevel + ", A");
         log4jProperties.setProperty("log4j.logger.org.apache.spark", logLevel + ", A, jobid");
@@ -441,7 +207,7 @@ public class SparkMain extends LauncherMain {
         log4jProperties.setProperty("log4j.logger.org.apache.hadoop.mapreduce.Job", "INFO, jobid");
         log4jProperties.setProperty("log4j.logger.org.apache.hadoop.yarn.client.api.impl.YarnClientImpl", "INFO, jobid");
 
-        String localProps = new File(SPARK_LOG4J_PROPS).getAbsolutePath();
+        final String localProps = new File(SPARK_LOG4J_PROPS).getAbsolutePath();
         try (OutputStream os1 = new FileOutputStream(localProps)) {
             log4jProperties.store(os1, "");
         }
@@ -458,100 +224,27 @@ public class SparkMain extends LauncherMain {
      * @throws IOException
      * @throws URISyntaxException
      */
-    static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(URI[] files) throws IOException, URISyntaxException {
-        Map<String, URI> map= new HashMap<>();
+    static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] files) throws IOException, URISyntaxException {
+        final Map<String, URI> map= new LinkedHashMap<>();
         if (files == null) {
             return map;
         }
-        FileSystem fs = FileSystem.get(new Configuration(true));
+        final FileSystem fs = FileSystem.get(new Configuration(true));
         for (int i = 0; i < files.length; i++) {
-            URI fileUri = files[i];
-            Path p = new Path(fileUri);
-            map.put(p.getName(), getFixedUri(fs, fileUri));
+            final URI fileUri = files[i];
+            final Path p = new Path(fileUri);
+            map.put(p.getName(), HadoopUriFinder.getFixedUri(fs, fileUri));
         }
         return map;
     }
 
     /**
-     * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
-     *
-     * @param sparkArgs
-     * @param sparkYarnJar
-     * @param sparkVersion
-     */
-    private void setSparkYarnJarsConf(List<String> sparkArgs, String sparkYarnJar, String sparkVersion) {
-        if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
-            // In Spark 1.X.X, set spark.yarn.jar to avoid
-            // multiple distribution
-            sparkArgs.add("--conf");
-            sparkArgs.add(SPARK_YARN_JAR + "=" + sparkYarnJar);
-                }
-        else {
-            // In Spark 2.X.X, set spark.yarn.jars
-            sparkArgs.add("--conf");
-            sparkArgs.add(SPARK_YARN_JARS + "=" + sparkYarnJar);
-        }
-    }
-
-    private static String getJarVersion(File jarFile) throws IOException {
-        @SuppressWarnings("resource")
-        Manifest manifest = new JarFile(jarFile).getManifest();
-        return manifest.getMainAttributes().getValue("Specification-Version");
-    }
-
-    /*
-     * Get properties that needs to be passed to Spark as Spark configuration from actionConf.
-     */
-    @VisibleForTesting
-    protected void appendOoziePropertiesToSparkConf(List<String> sparkArgs, Configuration actionConf) {
-        for (Map.Entry<String, String> oozieConfig : actionConf
-                .getValByRegex("^oozie\\.(?!launcher|spark).+").entrySet()) {
-            sparkArgs.add("--conf");
-            sparkArgs.add(String.format("spark.%s=%s", oozieConfig.getKey(), oozieConfig.getValue()));
-        }
-    }
-
-    private void appendWithPathSeparator(String what, StringBuilder to){
-        if(to.length() > 0){
-            to.append(File.pathSeparator);
-        }
-        to.append(what);
-    }
-
-    private static URI getFixedUri(URI fileUri) throws URISyntaxException, IOException {
-        FileSystem fs = FileSystem.get(new Configuration(true));
-        return getFixedUri(fs, fileUri);
-    }
-
-    /**
-     * Spark compares URIs based on scheme, host and port. Here we convert URIs
-     * into the default format so that Spark won't think those belong to
-     * different file system. This will avoid an extra copy of files which
-     * already exists on same hdfs.
-     *
-     * @param fs
-     * @param fileUri
-     * @return fixed uri
-     * @throws URISyntaxException
-     */
-    private static URI getFixedUri(FileSystem fs, URI fileUri) throws URISyntaxException {
-        if (fs.getUri().getScheme().equals(fileUri.getScheme())
-                && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
-                && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
-                        || fs.getUri().getPort() == fileUri.getPort())) {
-            return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(),
-                    fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
-        }
-        return fileUri;
-    }
-
-    /**
      * Sets up hive-site.xml
      *
      * @param hiveConf
      * @throws IOException
      */
-    private void setHiveSite(Configuration hiveConf) throws IOException {
+    private void setHiveSite(final Configuration hiveConf) throws IOException {
         // See https://issues.apache.org/jira/browse/HIVE-1411
         hiveConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
 
@@ -579,99 +272,4 @@ public class SparkMain extends LauncherMain {
         // to null.
         HiveConf.setHiveSiteLocation(HiveConf.class.getClassLoader().getResource("hive-site.xml"));
     }
-
-    /**
-     * This class is used for filtering out unwanted jars.
-     */
-    static class JarFilter {
-        private String sparkVersion = "1.X.X";
-        private String sparkYarnJar;
-        private String applicationJar;
-        private Collection<URI> listUris = null;
-
-        /**
-         * @param listUris List of URIs to be filtered
-         * @param jarPath Application jar
-         * @throws IOException
-         * @throws URISyntaxException
-         */
-        public JarFilter(Collection<URI> listUris, String jarPath) throws URISyntaxException, IOException {
-            this.listUris = listUris;
-            applicationJar = jarPath;
-            Path p = new Path(jarPath);
-            if (p.isAbsolute()) {
-                applicationJar = getFixedUri(p.toUri()).toString();
-            }
-        }
-
-        /**
-         * Filters out the Spark yarn jar and application jar. Also records
-         * spark yarn jar's version.
-         *
-         * @throws OozieActionConfiguratorException
-         */
-        public void filter() throws OozieActionConfiguratorException {
-            Iterator<URI> iterator = listUris.iterator();
-            File matchedFile = null;
-            Path applJarPath = new Path(applicationJar);
-            while (iterator.hasNext()) {
-                URI uri = iterator.next();
-                Path p = new Path(uri);
-                if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
-                    matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
-                }
-                else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
-                    matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
-                }
-                if (matchedFile != null) {
-                    sparkYarnJar = uri.toString();
-                    try {
-                        sparkVersion = getJarVersion(matchedFile);
-                        System.out.println("Spark Version " + sparkVersion);
-                    }
-                    catch (IOException io) {
-                        System.out.println(
-                                "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
-                    }
-                    iterator.remove();
-                    matchedFile = null;
-                }
-                // Here we skip the application jar, because
-                // (if uris are same,) it will get distributed multiple times
-                // - one time with --files and another time as application jar.
-                if (isApplicationJar(p.getName(), uri, applJarPath)) {
-                    String fragment = uri.getFragment();
-                    applicationJar = fragment != null && fragment.length() > 0 ? fragment : uri.toString();
-                    iterator.remove();
-                }
-            }
-        }
-
-        /**
-         * Checks if a file is application jar
-         *
-         * @param fileName fileName name of the file
-         * @param fileUri fileUri URI of the file
-         * @param applJarPath Path of application jar
-         * @return true if fileName or fileUri is the application jar
-         */
-        private boolean isApplicationJar(String fileName, URI fileUri, Path applJarPath) {
-            return (fileName.equals(applicationJar) || fileUri.toString().equals(applicationJar)
-                    || applJarPath.getName().equals(fileName)
-                    || applicationJar.equals(fileUri.getFragment()));
-        }
-
-        public String getApplicationJar() {
-            return applicationJar;
-        }
-
-        public String getSparkYarnJar() {
-            return sparkYarnJar;
-        }
-
-        public String getSparkVersion() {
-            return sparkVersion;
-        }
-
-    }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java
new file mode 100644
index 0000000..30def6f
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class SparkOptionsSplitter {
+
+    /**
+     * Converts the options to be Spark-compatible.
+     * <ul>
+     * <li>Parameters are separated by whitespace and can be groupped using double quotes</li>
+     * <li>Quotes should be removed</li>
+     * <li>Adjacent whitespace separators are treated as one</li>
+     * </ul>
+     *
+     * @param sparkOpts the options for Spark
+     * @return the options parsed into a list
+     */
+    static List<String> splitSparkOpts(final String sparkOpts) {
+        final List<String> result = new ArrayList<String>();
+        final StringBuilder currentWord = new StringBuilder();
+
+        boolean insideQuote = false;
+        for (int i = 0; i < sparkOpts.length(); i++) {
+            final char c = sparkOpts.charAt(i);
+            if (c == '"') {
+                insideQuote = !insideQuote;
+            }
+            else if (Character.isWhitespace(c) && !insideQuote) {
+                if (currentWord.length() > 0) {
+                    result.add(currentWord.toString());
+                    currentWord.setLength(0);
+                }
+            }
+            else {
+                currentWord.append(c);
+            }
+        }
+
+        if (currentWord.length() > 0) {
+            result.add(currentWord.toString());
+        }
+
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
index 2d4c83c..ff1b3ce 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
@@ -31,7 +31,6 @@ import java.util.jar.Attributes;
 import java.util.jar.JarOutputStream;
 import java.util.jar.Manifest;
 
-import org.apache.oozie.action.hadoop.SparkMain.JarFilter;
 import org.junit.Test;
 
 public class TestJarFilter {

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
new file mode 100644
index 0000000..7db26a6
--- /dev/null
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSparkArgsExtractor {
+
+    @Test
+    public void testAppendOoziePropertiesToSparkConf() throws Exception {
+        final List<String> sparkArgs = new ArrayList<>();
+        final Configuration actionConf = new Configuration();
+        actionConf.set("foo", "foo-not-to-include");
+        actionConf.set("oozie.launcher", "launcher-not-to-include");
+        actionConf.set("oozie.spark", "spark-not-to-include");
+        actionConf.set("oozie.bar", "bar");
+
+        new SparkArgsExtractor(actionConf).appendOoziePropertiesToSparkConf(sparkArgs);
+
+        assertEquals(Lists.newArrayList("--conf", "spark.oozie.bar=bar"), sparkArgs);
+    }
+
+    @Test
+    public void testLocalClientArgsParsing() throws Exception {
+        final Configuration actionConf = new Configuration();
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "local[*]");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+        actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory  1042M " +
+                "--conf spark.executor.extraClassPath=aaa " +
+                "--conf user.property.after.spark.executor.extraClassPath=bbb " +
+                "--conf spark.driver.extraClassPath=ccc " +
+                "--conf user.property.after.spark.driver.extraClassPath=ddd " +
+                "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\"");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+        final String[] mainArgs = {"arg0", "arg1"};
+        final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+        assertEquals("Spark args mismatch",
+                Lists.newArrayList("--master", "local[*]",
+                        "--deploy-mode", "client",
+                        "--name", "Spark Copy File",
+                        "--class", "org.apache.oozie.example.SparkFileCopy",
+                        "--driver-memory", "1042M",
+                        "--conf", "spark.executor.extraClassPath=aaa",
+                        "--conf", "user.property.after.spark.executor.extraClassPath=bbb",
+                        "--conf", "spark.driver.extraClassPath=ccc",
+                        "--conf", "user.property.after.spark.driver.extraClassPath=ddd",
+                        "--conf", "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError " +
+                                "-XX:HeapDumpPath=/tmp -Dlog4j.configuration=spark-log4j.properties",
+                        "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+                        "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+                        "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+                        "--verbose",
+                        "/lib/test.jar",
+                        "arg0",
+                        "arg1"),
+                sparkArgs);
+    }
+
+    @Test
+    public void testYarnClientExecutorAndDriverExtraClasspathsArgsParsing() throws Exception {
+        final Configuration actionConf = new Configuration();
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+        actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory  1042M " +
+                "--conf spark.executor.extraClassPath=aaa " +
+                "--conf user.property.after.spark.executor.extraClassPath=bbb " +
+                "--conf spark.driver.extraClassPath=ccc " +
+                "--conf user.property.after.spark.driver.extraClassPath=ddd " +
+                "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\"");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+        final String[] mainArgs = {"arg0", "arg1"};
+        final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+        assertEquals("Spark args mismatch",
+                Lists.newArrayList("--master", "yarn",
+                        "--deploy-mode", "client",
+                        "--name", "Spark Copy File",
+                        "--class", "org.apache.oozie.example.SparkFileCopy",
+                        "--driver-memory", "1042M",
+                        "--conf", "user.property.after.spark.executor.extraClassPath=bbb",
+                        "--conf", "user.property.after.spark.driver.extraClassPath=ddd",
+                        "--conf", "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError " +
+                                "-XX:HeapDumpPath=/tmp -Dlog4j.configuration=spark-log4j.properties",
+                        "--conf", "spark.executor.extraClassPath=aaa:$PWD/*",
+                        "--conf", "spark.driver.extraClassPath=ccc:$PWD/*",
+                        "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+                        "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+                        "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+                        "--files", "spark-log4j.properties,hive-site.xml",
+                        "--conf", "spark.yarn.jar=null",
+                        "--verbose",
+                        "/lib/test.jar",
+                        "arg0",
+                        "arg1"),
+                sparkArgs);
+    }
+
+    @Test
+    public void testYarnClientFilesAndArchivesArgsParsing() throws Exception {
+        final Configuration actionConf = new Configuration();
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+        actionConf.set(SparkActionExecutor.SPARK_OPTS, "--files aaa " +
+                "--archives bbb " +
+                "--files=ccc " +
+                "--archives=ddd");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+        final String[] mainArgs = {"arg0", "arg1"};
+        final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+        assertEquals("Spark args mismatch",
+                Lists.newArrayList("--master", "yarn",
+                        "--deploy-mode", "client",
+                        "--name", "Spark Copy File",
+                        "--class", "org.apache.oozie.example.SparkFileCopy",
+                        "--conf", "spark.executor.extraClassPath=$PWD/*",
+                        "--conf", "spark.driver.extraClassPath=$PWD/*",
+                        "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+                        "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+                        "--conf", "spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+                        "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+                        "--files", "spark-log4j.properties,hive-site.xml,aaa,ccc",
+                        "--archives", "bbb,ddd",
+                        "--conf", "spark.yarn.jar=null",
+                        "--verbose",
+                        "/lib/test.jar",
+                        "arg0",
+                        "arg1"),
+                sparkArgs);
+    }
+
+    @Test
+    public void testDriverClassPathArgsParsing() throws Exception {
+        final Configuration actionConf = new Configuration();
+        actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+        actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+        actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+        actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+        actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-class-path aaa");
+        actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+        final String[] mainArgs = {"arg0", "arg1"};
+        final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+        assertEquals("Spark args mismatch",
+                Lists.newArrayList("--master", "yarn",
+                        "--deploy-mode", "client",
+                        "--name", "Spark Copy File",
+                        "--class", "org.apache.oozie.example.SparkFileCopy",
+                        "--conf", "spark.executor.extraClassPath=$PWD/*",
+                        "--conf", "spark.driver.extraClassPath=aaa:$PWD/*",
+                        "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+                        "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+                        "--conf", "spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+                        "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+                        "--files", "spark-log4j.properties,hive-site.xml",
+                        "--conf", "spark.yarn.jar=null",
+                        "--verbose",
+                        "/lib/test.jar",
+                        "arg0",
+                        "arg1"),
+                sparkArgs);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
index 5f6a0cd..b9f37c8 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
@@ -127,19 +127,6 @@ public class TestSparkMain extends MainTestCase {
         }
     }
 
-    public void testAppendOoziePropertiesToSparkConf() throws Exception {
-        SparkMain instance = SparkMain.class.newInstance();
-        List<String> sparkArgs = new ArrayList<String>();
-        Configuration actionConf = new Configuration();
-        actionConf.set("foo", "foo-not-to-include");
-        actionConf.set("oozie.launcher", "launcher-not-to-include");
-        actionConf.set("oozie.spark", "spark-not-to-include");
-        actionConf.set("oozie.bar", "bar");
-
-        instance.appendOoziePropertiesToSparkConf(sparkArgs, actionConf);
-        assertEquals(Lists.newArrayList("--conf", "spark.oozie.bar=bar"), sparkArgs);
-    }
-
     public void testJobIDPattern() {
         List<String> lines = new ArrayList<String>();
         lines.add("Submitted application application_001");

http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
index 31f53ac..02786a2 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
@@ -34,13 +34,13 @@ public class TestSparkOptionsSplitter {
     @Parameterized.Parameters
     public static List<Object[]> params() {
         return Arrays.asList(new Object[][]{
-                {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})},
-                {"--option1   value1", Arrays.asList(new String[]{"--option1", "value1"})},
-                {"   --option1 value1   ", Arrays.asList(new String[]{"--option1", "value1"})},
-                {"--conf special=value1", Arrays.asList(new String[]{"--conf", "special=value1"})},
-                {"--conf special=\"value1\"", Arrays.asList(new String[]{"--conf", "special=value1"})},
-                {"--conf special=\"value1 value2\"", Arrays.asList(new String[]{"--conf", "special=value1 value2"})},
-                {" --conf special=\"value1 value2\"  ", Arrays.asList(new String[]{"--conf", "special=value1 value2"})},
+                {"--option1 value1", Arrays.asList("--option1", "value1")},
+                {"--option1   value1", Arrays.asList("--option1", "value1")},
+                {"   --option1 value1   ", Arrays.asList("--option1", "value1")},
+                {"--conf special=value1", Arrays.asList("--conf", "special=value1")},
+                {"--conf special=\"value1\"", Arrays.asList("--conf", "special=value1")},
+                {"--conf special=\"value1 value2\"", Arrays.asList("--conf", "special=value1 value2")},
+                {" --conf special=\"value1 value2\"  ", Arrays.asList("--conf", "special=value1 value2")},
         });
     }
 
@@ -55,6 +55,6 @@ public class TestSparkOptionsSplitter {
 
     @Test
     public void test() {
-        assertThat("Error for input >>" + input + "<<", SparkMain.splitSparkOpts(input), is(output));
+        assertThat("Error for input >>" + input + "<<", SparkOptionsSplitter.splitSparkOpts(input), is(output));
     }
 }