You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ge...@apache.org on 2017/06/08 11:59:38 UTC
oozie git commit: OOZIE-2923 Improve Spark options parsing
(andras.piros via gezapeti)
Repository: oozie
Updated Branches:
refs/heads/master 230e42611 -> f45b18791
OOZIE-2923 Improve Spark options parsing (andras.piros via gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/f45b1879
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/f45b1879
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/f45b1879
Branch: refs/heads/master
Commit: f45b18791a92201fb4959857cafbda7543942022
Parents: 230e426
Author: Gezapeti Cseh <ge...@gmail.com>
Authored: Thu Jun 8 13:59:06 2017 +0200
Committer: Gezapeti Cseh <ge...@gmail.com>
Committed: Thu Jun 8 13:59:17 2017 +0200
----------------------------------------------------------------------
release-log.txt | 1 +
.../oozie/action/hadoop/HadoopUriFinder.java | 66 +++
.../apache/oozie/action/hadoop/JarFilter.java | 122 +++++
.../oozie/action/hadoop/SparkArgsExtractor.java | 334 ++++++++++++
.../apache/oozie/action/hadoop/SparkMain.java | 520 +++----------------
.../action/hadoop/SparkOptionsSplitter.java | 64 +++
.../oozie/action/hadoop/TestJarFilter.java | 1 -
.../action/hadoop/TestSparkArgsExtractor.java | 197 +++++++
.../oozie/action/hadoop/TestSparkMain.java | 13 -
.../action/hadoop/TestSparkOptionsSplitter.java | 16 +-
10 files changed, 851 insertions(+), 483 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 5743ac9..5c5ae92 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.4.0 release (trunk - unreleased)
+OOZIE-2923 Improve Spark options parsing (andras.piros via gezapeti)
OOZIE-2886 Ensure consistent versioning of hadoop jars in sharelibs (dbist13 via rkanter)
OOZIE-2875 Typo in ssh action twiki docs (Dongying Jiao via gezapeti)
OOZIE-2387 Oozie is Unable to handle Spaces in file/archive tag. (asasvari via gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java
new file mode 100644
index 0000000..3f9e982
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/HadoopUriFinder.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
+
+class HadoopUriFinder {
+
+ static String getJarVersion(final File jarFile) throws IOException {
+ try (final JarFile openedJarFile = new JarFile(jarFile)) {
+ final Manifest manifest = openedJarFile.getManifest();
+ return manifest.getMainAttributes().getValue("Specification-Version");
+ }
+ }
+
+ static URI getFixedUri(final URI fileUri) throws URISyntaxException, IOException {
+ final FileSystem fs = FileSystem.get(new Configuration(true));
+ return getFixedUri(fs, fileUri);
+ }
+
+ /**
+ * Spark compares URIs based on scheme, host and port. Here we convert URIs
+ * into the default format so that Spark won't think those belong to
+ * different file system. This will avoid an extra copy of files which
+ * already exists on same hdfs.
+ *
+ * @param fs
+ * @param fileUri
+ * @return fixed uri
+ * @throws URISyntaxException
+ */
+ static URI getFixedUri(final FileSystem fs, final URI fileUri) throws URISyntaxException {
+ if (fs.getUri().getScheme().equals(fileUri.getScheme())
+ && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
+ && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
+ || fs.getUri().getPort() == fileUri.getPort())) {
+ return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(),
+ fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
+ }
+ return fileUri;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java
new file mode 100644
index 0000000..d0b4b5e
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/JarFilter.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Collection;
+import java.util.Iterator;
+
+/**
+ * This class is used for filtering out unwanted jars.
+ */
+class JarFilter {
+ private String sparkVersion = "1.X.X";
+ private String sparkYarnJar;
+ private String applicationJar;
+ private Collection<URI> listUris = null;
+
+ /**
+ * @param listUris List of URIs to be filtered
+ * @param jarPath Application jar
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ JarFilter(final Collection<URI> listUris, final String jarPath) throws URISyntaxException, IOException {
+ this.listUris = listUris;
+ applicationJar = jarPath;
+ final Path p = new Path(jarPath);
+ if (p.isAbsolute()) {
+ applicationJar = HadoopUriFinder.getFixedUri(p.toUri()).toString();
+ }
+ }
+
+ /**
+ * Filters out the Spark yarn jar and application jar. Also records
+ * spark yarn jar's version.
+ *
+ * @throws OozieActionConfiguratorException
+ */
+ void filter() throws OozieActionConfiguratorException {
+ final Iterator<URI> iterator = listUris.iterator();
+ File matchedFile = null;
+ final Path applJarPath = new Path(applicationJar);
+ while (iterator.hasNext()) {
+ final URI uri = iterator.next();
+ final Path p = new Path(uri);
+ if (SparkMain.SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
+ matchedFile = SparkMain.getMatchingFile(SparkMain.SPARK_YARN_JAR_PATTERN);
+ }
+ else if (SparkMain.SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
+ matchedFile = SparkMain.getMatchingFile(SparkMain.SPARK_ASSEMBLY_JAR_PATTERN);
+ }
+ if (matchedFile != null) {
+ sparkYarnJar = uri.toString();
+ try {
+ sparkVersion = HadoopUriFinder.getJarVersion(matchedFile);
+ System.out.println("Spark Version " + sparkVersion);
+ }
+ catch (final IOException io) {
+ System.out.println(
+ "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
+ }
+ iterator.remove();
+ matchedFile = null;
+ }
+ // Here we skip the application jar, because
+ // (if uris are same,) it will get distributed multiple times
+ // - one time with --files and another time as application jar.
+ if (isApplicationJar(p.getName(), uri, applJarPath)) {
+ final String fragment = uri.getFragment();
+ applicationJar = fragment != null && fragment.length() > 0 ? fragment : uri.toString();
+ iterator.remove();
+ }
+ }
+ }
+
+ /**
+ * Checks if a file is application jar
+ *
+ * @param fileName fileName name of the file
+ * @param fileUri fileUri URI of the file
+ * @param applJarPath Path of application jar
+ * @return true if fileName or fileUri is the application jar
+ */
+ private boolean isApplicationJar(final String fileName, final URI fileUri, final Path applJarPath) {
+ return (fileName.equals(applicationJar) || fileUri.toString().equals(applicationJar)
+ || applJarPath.getName().equals(fileName)
+ || applicationJar.equals(fileUri.getFragment()));
+ }
+
+ String getApplicationJar() {
+ return applicationJar;
+ }
+
+ String getSparkYarnJar() {
+ return sparkYarnJar;
+ }
+
+ String getSparkVersion() {
+ return sparkVersion;
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
new file mode 100644
index 0000000..ffc95f9
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.directory.api.util.Strings;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.Path;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+
+class SparkArgsExtractor {
+ private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf");
+ private static final String FILES_OPTION = "--files";
+ private static final String ARCHIVES_OPTION = "--archives";
+ private static final String LOG4J_CONFIGURATION_JAVA_OPTION = "-Dlog4j.configuration=";
+ private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled";
+ private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled";
+ private static final String PWD = "$PWD" + File.separator + "*";
+ private static final String MASTER_OPTION = "--master";
+ private static final String MODE_OPTION = "--deploy-mode";
+ private static final String JOB_NAME_OPTION = "--name";
+ private static final String CLASS_NAME_OPTION = "--class";
+ private static final String VERBOSE_OPTION = "--verbose";
+ private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path";
+ private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
+ private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
+ private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions=";
+ private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions=";
+ private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
+ private static final String SPARK_YARN_JAR = "spark.yarn.jar";
+ private static final String SPARK_YARN_JARS = "spark.yarn.jars";
+ private static final String OPT_SEPARATOR = "=";
+ private static final String OPT_VALUE_SEPARATOR = ",";
+ private static final String CONF_OPTION = "--conf";
+ private static final String MASTER_OPTION_YARN_CLUSTER = "yarn-cluster";
+ private static final String MASTER_OPTION_YARN_CLIENT = "yarn-client";
+ private static final String MASTER_OPTION_YARN = "yarn";
+ private static final String DEPLOY_MODE_CLUSTER = "cluster";
+ private static final String DEPLOY_MODE_CLIENT = "client";
+ private static final String SPARK_YARN_TAGS = "spark.yarn.tags";
+ private static final String OPT_PROPERTIES_FILE = "--properties-file";
+
+ private boolean pySpark = false;
+ private final Configuration actionConf;
+
+ SparkArgsExtractor(final Configuration actionConf) {
+ this.actionConf = actionConf;
+ }
+
+ boolean isPySpark() {
+ return pySpark;
+ }
+
+ List<String> extract(final String[] mainArgs) throws OozieActionConfiguratorException, IOException, URISyntaxException {
+ final List<String> sparkArgs = new ArrayList<>();
+
+ sparkArgs.add(MASTER_OPTION);
+ final String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
+ sparkArgs.add(master);
+
+ // In local mode, everything runs here in the Launcher Job.
+ // In yarn-client mode, the driver runs here in the Launcher Job and the
+ // executor in Yarn.
+ // In yarn-cluster mode, the driver and executor run in Yarn.
+ final String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
+ if (sparkDeployMode != null) {
+ sparkArgs.add(MODE_OPTION);
+ sparkArgs.add(sparkDeployMode);
+ }
+ final boolean yarnClusterMode = master.equals(MASTER_OPTION_YARN_CLUSTER)
+ || (master.equals(MASTER_OPTION_YARN) && sparkDeployMode != null && sparkDeployMode.equals(DEPLOY_MODE_CLUSTER));
+ final boolean yarnClientMode = master.equals(MASTER_OPTION_YARN_CLIENT)
+ || (master.equals(MASTER_OPTION_YARN) && sparkDeployMode != null && sparkDeployMode.equals(DEPLOY_MODE_CLIENT));
+
+ sparkArgs.add(JOB_NAME_OPTION);
+ sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME));
+
+ final String className = actionConf.get(SparkActionExecutor.SPARK_CLASS);
+ if (className != null) {
+ sparkArgs.add(CLASS_NAME_OPTION);
+ sparkArgs.add(className);
+ }
+
+ appendOoziePropertiesToSparkConf(sparkArgs);
+
+ String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
+ if (jarPath != null && jarPath.endsWith(".py")) {
+ pySpark = true;
+ }
+ boolean addedHiveSecurityToken = false;
+ boolean addedHBaseSecurityToken = false;
+ boolean addedLog4jDriverSettings = false;
+ boolean addedLog4jExecutorSettings = false;
+ final StringBuilder driverClassPath = new StringBuilder();
+ final StringBuilder executorClassPath = new StringBuilder();
+ final StringBuilder userFiles = new StringBuilder();
+ final StringBuilder userArchives = new StringBuilder();
+ final String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
+ if (StringUtils.isNotEmpty(sparkOpts)) {
+ final List<String> sparkOptions = SparkOptionsSplitter.splitSparkOpts(sparkOpts);
+ for (int i = 0; i < sparkOptions.size(); i++) {
+ String opt = sparkOptions.get(i);
+ boolean addToSparkArgs = true;
+ if (yarnClusterMode || yarnClientMode) {
+ if (opt.startsWith(EXECUTOR_CLASSPATH)) {
+ appendWithPathSeparator(opt.substring(EXECUTOR_CLASSPATH.length()), executorClassPath);
+ addToSparkArgs = false;
+ }
+ if (opt.startsWith(DRIVER_CLASSPATH)) {
+ appendWithPathSeparator(opt.substring(DRIVER_CLASSPATH.length()), driverClassPath);
+ addToSparkArgs = false;
+ }
+ if (opt.equals(DRIVER_CLASSPATH_OPTION)) {
+ // we need the next element after this option
+ appendWithPathSeparator(sparkOptions.get(i + 1), driverClassPath);
+ // increase i to skip the next element.
+ i++;
+ addToSparkArgs = false;
+ }
+ }
+ if (opt.startsWith(HIVE_SECURITY_TOKEN)) {
+ addedHiveSecurityToken = true;
+ }
+ if (opt.startsWith(HBASE_SECURITY_TOKEN)) {
+ addedHBaseSecurityToken = true;
+ }
+ if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) {
+ if (!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) {
+ opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS;
+ } else {
+ System.out.println("Warning: Spark Log4J settings are overwritten." +
+ " Child job IDs may not be available");
+ }
+ if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS)) {
+ addedLog4jExecutorSettings = true;
+ } else {
+ addedLog4jDriverSettings = true;
+ }
+ }
+ if (opt.startsWith(FILES_OPTION)) {
+ final String userFile;
+ if (opt.contains(OPT_SEPARATOR)) {
+ userFile = opt.substring(opt.indexOf(OPT_SEPARATOR) + OPT_SEPARATOR.length());
+ }
+ else {
+ userFile = sparkOptions.get(i + 1);
+ i++;
+ }
+ if (userFiles.length() > 0) {
+ userFiles.append(OPT_VALUE_SEPARATOR);
+ }
+ userFiles.append(userFile);
+ addToSparkArgs = false;
+ }
+ if (opt.startsWith(ARCHIVES_OPTION)) {
+ final String userArchive;
+ if (opt.contains(OPT_SEPARATOR)) {
+ userArchive = opt.substring(opt.indexOf(OPT_SEPARATOR) + OPT_SEPARATOR.length());
+ }
+ else {
+ userArchive = sparkOptions.get(i + 1);
+ i++;
+ }
+ if (userArchives.length() > 0) {
+ userArchives.append(OPT_VALUE_SEPARATOR);
+ }
+ userArchives.append(userArchive);
+ addToSparkArgs = false;
+ }
+ if (addToSparkArgs) {
+ sparkArgs.add(opt);
+ }
+ else if (sparkArgs.get(sparkArgs.size() - 1).equals(CONF_OPTION)) {
+ sparkArgs.remove(sparkArgs.size() - 1);
+ }
+ }
+ }
+
+ if ((yarnClusterMode || yarnClientMode)) {
+ // Include the current working directory (of executor container)
+ // in executor classpath, because it will contain localized
+ // files
+ appendWithPathSeparator(PWD, executorClassPath);
+ appendWithPathSeparator(PWD, driverClassPath);
+
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(EXECUTOR_CLASSPATH + executorClassPath.toString());
+
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(DRIVER_CLASSPATH + driverClassPath.toString());
+ }
+
+ if (actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS) != null) {
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(SPARK_YARN_TAGS + OPT_SEPARATOR + actionConf.get(LauncherMain.MAPREDUCE_JOB_TAGS));
+ }
+
+ if (!addedHiveSecurityToken) {
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(HIVE_SECURITY_TOKEN + OPT_SEPARATOR + Boolean.toString(false));
+ }
+ if (!addedHBaseSecurityToken) {
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(HBASE_SECURITY_TOKEN + OPT_SEPARATOR + Boolean.toString(false));
+ }
+ if (!addedLog4jExecutorSettings) {
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(EXECUTOR_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS);
+ }
+ if (!addedLog4jDriverSettings) {
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SparkMain.SPARK_LOG4J_PROPS);
+ }
+ final File defaultConfFile = SparkMain.getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN);
+ if (defaultConfFile != null) {
+ sparkArgs.add(OPT_PROPERTIES_FILE);
+ sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString());
+ }
+
+ if ((yarnClusterMode || yarnClientMode)) {
+ final Map<String, URI> fixedFileUrisMap =
+ SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
+ fixedFileUrisMap.put(SparkMain.SPARK_LOG4J_PROPS, new Path(SparkMain.SPARK_LOG4J_PROPS).toUri());
+ fixedFileUrisMap.put(SparkMain.HIVE_SITE_CONF, new Path(SparkMain.HIVE_SITE_CONF).toUri());
+ addUserDefined(userFiles.toString(), fixedFileUrisMap);
+ final Collection<URI> fixedFileUris = fixedFileUrisMap.values();
+ final JarFilter jarFilter = new JarFilter(fixedFileUris, jarPath);
+ jarFilter.filter();
+ jarPath = jarFilter.getApplicationJar();
+
+ final String cachedFiles = StringUtils.join(fixedFileUris, OPT_VALUE_SEPARATOR);
+ if (cachedFiles != null && !cachedFiles.isEmpty()) {
+ sparkArgs.add(FILES_OPTION);
+ sparkArgs.add(cachedFiles);
+ }
+ final Map<String, URI> fixedArchiveUrisMap = SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
+ getCacheArchives(actionConf));
+ addUserDefined(userArchives.toString(), fixedArchiveUrisMap);
+ final String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), OPT_VALUE_SEPARATOR);
+ if (cachedArchives != null && !cachedArchives.isEmpty()) {
+ sparkArgs.add(ARCHIVES_OPTION);
+ sparkArgs.add(cachedArchives);
+ }
+ setSparkYarnJarsConf(sparkArgs, jarFilter.getSparkYarnJar(), jarFilter.getSparkVersion());
+ }
+
+ if (!sparkArgs.contains(VERBOSE_OPTION)) {
+ sparkArgs.add(VERBOSE_OPTION);
+ }
+
+ sparkArgs.add(jarPath);
+ sparkArgs.addAll(Arrays.asList(mainArgs));
+
+ return sparkArgs;
+ }
+
+ private void appendWithPathSeparator(final String what, final StringBuilder to) {
+ if (to.length() > 0) {
+ to.append(File.pathSeparator);
+ }
+ to.append(what);
+ }
+
+ private void addUserDefined(final String userList, final Map<String, URI> urisMap) {
+ if (userList != null) {
+ for (final String file : userList.split(OPT_VALUE_SEPARATOR)) {
+ if (!Strings.isEmpty(file)) {
+ final Path p = new Path(file);
+ urisMap.put(p.getName(), p.toUri());
+ }
+ }
+ }
+ }
+
+ /*
+ * Get properties that needs to be passed to Spark as Spark configuration from actionConf.
+ */
+ @VisibleForTesting
+ void appendOoziePropertiesToSparkConf(final List<String> sparkArgs) {
+ for (final Map.Entry<String, String> oozieConfig : actionConf
+ .getValByRegex("^oozie\\.(?!launcher|spark).+").entrySet()) {
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(String.format("spark.%s=%s", oozieConfig.getKey(), oozieConfig.getValue()));
+ }
+ }
+
+ /**
+ * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
+ *
+ * @param sparkArgs
+ * @param sparkYarnJar
+ * @param sparkVersion
+ */
+ private void setSparkYarnJarsConf(final List<String> sparkArgs, final String sparkYarnJar, final String sparkVersion) {
+ if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
+ // In Spark 1.X.X, set spark.yarn.jar to avoid
+ // multiple distribution
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(SPARK_YARN_JAR + OPT_SEPARATOR + sparkYarnJar);
+ } else {
+ // In Spark 2.X.X, set spark.yarn.jars
+ sparkArgs.add(CONF_OPTION);
+ sparkArgs.add(SPARK_YARN_JARS + OPT_SEPARATOR + sparkYarnJar);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 68f7a60..674839a 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -24,20 +24,13 @@ import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.jar.JarFile;
-import java.util.jar.Manifest;
import java.util.regex.Pattern;
import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -47,243 +40,56 @@ import org.apache.spark.deploy.SparkSubmit;
import com.google.common.annotations.VisibleForTesting;
public class SparkMain extends LauncherMain {
- private static final String MASTER_OPTION = "--master";
- private static final String MODE_OPTION = "--deploy-mode";
- private static final String JOB_NAME_OPTION = "--name";
- private static final String CLASS_NAME_OPTION = "--class";
- private static final String VERBOSE_OPTION = "--verbose";
- private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path";
- private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
- private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
- private static final String EXECUTOR_EXTRA_JAVA_OPTIONS = "spark.executor.extraJavaOptions=";
- private static final String DRIVER_EXTRA_JAVA_OPTIONS = "spark.driver.extraJavaOptions=";
- private static final String LOG4J_CONFIGURATION_JAVA_OPTION = "-Dlog4j.configuration=";
- private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled";
- private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled";
- private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir";
- private static final String PWD = "$PWD" + File.separator + "*";
- private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"),
- Pattern.compile("pyspark.zip") };
- private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf");
- private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
+
@VisibleForTesting
static final Pattern[] SPARK_JOB_IDS_PATTERNS = {
Pattern.compile("Submitted application (application[0-9_]*)") };
- public static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern
+ @VisibleForTesting
+ static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern
.compile("^spark-assembly((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
- public static final Pattern SPARK_YARN_JAR_PATTERN = Pattern
+ @VisibleForTesting
+ static final Pattern SPARK_YARN_JAR_PATTERN = Pattern
.compile("^spark-yarn((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
- private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
- private static final String SPARK_YARN_JAR = "spark.yarn.jar";
- private static final String SPARK_YARN_JARS = "spark.yarn.jars";
- public static final String HIVE_SITE_CONF = "hive-site.xml";
- public static final String FILES_OPTION = "--files";
- public static final String ARCHIVES_OPTION = "--archives";
-
- public static void main(String[] args) throws Exception {
+ static final String HIVE_SITE_CONF = "hive-site.xml";
+ static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
+
+ private static final String CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR = "oozie.action.spark.setup.hadoop.conf.dir";
+ private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"),
+ Pattern.compile("pyspark.zip") };
+
+ public static void main(final String[] args) throws Exception {
run(SparkMain.class, args);
}
@Override
- protected void run(String[] args) throws Exception {
- boolean isPyspark = false;
- Configuration actionConf = loadActionConf();
+ protected void run(final String[] args) throws Exception {
+ final Configuration actionConf = loadActionConf();
prepareHadoopConfig(actionConf);
setYarnTag(actionConf);
LauncherMain.killChildYarnJobs(actionConf);
- String logFile = setUpSparkLog4J(actionConf);
+ final String logFile = setUpSparkLog4J(actionConf);
setHiveSite(actionConf);
- List<String> sparkArgs = new ArrayList<String>();
-
- sparkArgs.add(MASTER_OPTION);
- String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
- sparkArgs.add(master);
-
- // In local mode, everything runs here in the Launcher Job.
- // In yarn-client mode, the driver runs here in the Launcher Job and the
- // executor in Yarn.
- // In yarn-cluster mode, the driver and executor run in Yarn.
- String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
- if (sparkDeployMode != null) {
- sparkArgs.add(MODE_OPTION);
- sparkArgs.add(sparkDeployMode);
- }
- boolean yarnClusterMode = master.equals("yarn-cluster")
- || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster"));
- boolean yarnClientMode = master.equals("yarn-client")
- || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client"));
-
- sparkArgs.add(JOB_NAME_OPTION);
- sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME));
-
- String className = actionConf.get(SparkActionExecutor.SPARK_CLASS);
- if (className != null) {
- sparkArgs.add(CLASS_NAME_OPTION);
- sparkArgs.add(className);
- }
-
- appendOoziePropertiesToSparkConf(sparkArgs, actionConf);
-
- String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
- if(jarPath!=null && jarPath.endsWith(".py")){
- isPyspark = true;
- }
- boolean addedHiveSecurityToken = false;
- boolean addedHBaseSecurityToken = false;
- boolean addedLog4jDriverSettings = false;
- boolean addedLog4jExecutorSettings = false;
- StringBuilder driverClassPath = new StringBuilder();
- StringBuilder executorClassPath = new StringBuilder();
- String userFiles = null, userArchives = null;
- String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
- if (StringUtils.isNotEmpty(sparkOpts)) {
- List<String> sparkOptions = splitSparkOpts(sparkOpts);
- for (int i = 0; i < sparkOptions.size(); i++) {
- String opt = sparkOptions.get(i);
- boolean addToSparkArgs = true;
- if (yarnClusterMode || yarnClientMode) {
- if (opt.startsWith(EXECUTOR_CLASSPATH)) {
- appendWithPathSeparator(opt.substring(EXECUTOR_CLASSPATH.length()), executorClassPath);
- addToSparkArgs = false;
- }
- if (opt.startsWith(DRIVER_CLASSPATH)) {
- appendWithPathSeparator(opt.substring(DRIVER_CLASSPATH.length()), driverClassPath);
- addToSparkArgs = false;
- }
- if (opt.equals(DRIVER_CLASSPATH_OPTION)) {
- // we need the next element after this option
- appendWithPathSeparator(sparkOptions.get(i + 1), driverClassPath);
- // increase i to skip the next element.
- i++;
- addToSparkArgs = false;
- }
- }
- if (opt.startsWith(HIVE_SECURITY_TOKEN)) {
- addedHiveSecurityToken = true;
- }
- if (opt.startsWith(HBASE_SECURITY_TOKEN)) {
- addedHBaseSecurityToken = true;
- }
- if (opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS) || opt.startsWith(DRIVER_EXTRA_JAVA_OPTIONS)) {
- if(!opt.contains(LOG4J_CONFIGURATION_JAVA_OPTION)) {
- opt += " " + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS;
- }else{
- System.out.println("Warning: Spark Log4J settings are overwritten." +
- " Child job IDs may not be available");
- }
- if(opt.startsWith(EXECUTOR_EXTRA_JAVA_OPTIONS)) {
- addedLog4jExecutorSettings = true;
- }else{
- addedLog4jDriverSettings = true;
- }
- }
- if(opt.startsWith(FILES_OPTION)) {
- userFiles = sparkOptions.get(i + 1);
- i++;
- addToSparkArgs = false;
- }
- if(opt.startsWith(ARCHIVES_OPTION)) {
- userArchives = sparkOptions.get(i + 1);
- i++;
- addToSparkArgs = false;
- }
- if(addToSparkArgs) {
- sparkArgs.add(opt);
- }
- }
- }
-
- if ((yarnClusterMode || yarnClientMode)) {
- // Include the current working directory (of executor container)
- // in executor classpath, because it will contain localized
- // files
- appendWithPathSeparator(PWD, executorClassPath);
- appendWithPathSeparator(PWD, driverClassPath);
-
- sparkArgs.add("--conf");
- sparkArgs.add(EXECUTOR_CLASSPATH + executorClassPath.toString());
-
- sparkArgs.add("--conf");
- sparkArgs.add(DRIVER_CLASSPATH + driverClassPath.toString());
- }
-
- if (actionConf.get(MAPREDUCE_JOB_TAGS) != null) {
- sparkArgs.add("--conf");
- sparkArgs.add("spark.yarn.tags=" + actionConf.get(MAPREDUCE_JOB_TAGS));
- }
- if (!addedHiveSecurityToken) {
- sparkArgs.add("--conf");
- sparkArgs.add(HIVE_SECURITY_TOKEN + "=false");
- }
- if (!addedHBaseSecurityToken) {
- sparkArgs.add("--conf");
- sparkArgs.add(HBASE_SECURITY_TOKEN + "=false");
- }
- if(!addedLog4jExecutorSettings) {
- sparkArgs.add("--conf");
- sparkArgs.add(EXECUTOR_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS);
- }
- if(!addedLog4jDriverSettings) {
- sparkArgs.add("--conf");
- sparkArgs.add(DRIVER_EXTRA_JAVA_OPTIONS + LOG4J_CONFIGURATION_JAVA_OPTION + SPARK_LOG4J_PROPS);
- }
- File defaultConfFile = getMatchingFile(SPARK_DEFAULTS_FILE_PATTERN);
- if (defaultConfFile != null) {
- sparkArgs.add("--properties-file");
- sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString());
- }
-
- if ((yarnClusterMode || yarnClientMode)) {
- Map<String, URI> fixedFileUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
- fixedFileUrisMap.put(SPARK_LOG4J_PROPS, new Path(SPARK_LOG4J_PROPS).toUri());
- fixedFileUrisMap.put(HIVE_SITE_CONF, new Path(HIVE_SITE_CONF).toUri());
- addUserDefined(userFiles, fixedFileUrisMap);
- Collection<URI> fixedFileUris = fixedFileUrisMap.values();
- JarFilter jarfilter = new JarFilter(fixedFileUris, jarPath);
- jarfilter.filter();
- jarPath = jarfilter.getApplicationJar();
-
- String cachedFiles = StringUtils.join(fixedFileUris, ",");
- if (cachedFiles != null && !cachedFiles.isEmpty()) {
- sparkArgs.add("--files");
- sparkArgs.add(cachedFiles);
- }
- Map<String, URI> fixedArchiveUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
- getCacheArchives(actionConf));
- addUserDefined(userArchives, fixedArchiveUrisMap);
- String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), ",");
- if (cachedArchives != null && !cachedArchives.isEmpty()) {
- sparkArgs.add("--archives");
- sparkArgs.add(cachedArchives);
- }
- setSparkYarnJarsConf(sparkArgs, jarfilter.getSparkYarnJar(), jarfilter.getSparkVersion());
- }
+ final SparkArgsExtractor sparkArgsExtractor = new SparkArgsExtractor(actionConf);
+ final List<String> sparkArgs = sparkArgsExtractor.extract(args);
- if (!sparkArgs.contains(VERBOSE_OPTION)) {
- sparkArgs.add(VERBOSE_OPTION);
- }
-
- sparkArgs.add(jarPath);
- for (String arg : args) {
- sparkArgs.add(arg);
- }
- if (isPyspark){
+ if (sparkArgsExtractor.isPySpark()){
createPySparkLibFolder();
}
-
System.out.println("Spark Action Main class : " + SparkSubmit.class.getName());
System.out.println();
System.out.println("Oozie Spark action configuration");
System.out.println("=================================================================");
System.out.println();
- PasswordMasker passwordMasker = new PasswordMasker();
- for (String arg : sparkArgs) {
+
+ final PasswordMasker passwordMasker = new PasswordMasker();
+ for (final String arg : sparkArgs) {
System.out.println(" " + passwordMasker.maskPasswordsIfNecessary(arg));
}
System.out.println();
+
try {
runSpark(sparkArgs.toArray(new String[sparkArgs.size()]));
}
@@ -293,21 +99,12 @@ public class SparkMain extends LauncherMain {
}
}
- private void addUserDefined(String userList, Map<String, URI> urisMap) {
- if(userList != null) {
- for (String file : userList.split(",")) {
- Path p = new Path(file);
- urisMap.put(p.getName(), p.toUri());
- }
- }
- }
-
- private void prepareHadoopConfig(Configuration actionConf) throws IOException {
+ private void prepareHadoopConfig(final Configuration actionConf) throws IOException {
// Copying oozie.action.conf.xml into hadoop configuration *-site files.
if (actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, false)) {
- String actionXml = System.getProperty("oozie.action.conf.xml");
+ final String actionXml = System.getProperty("oozie.action.conf.xml");
if (actionXml != null) {
- File currentDir = new File(actionXml).getParentFile();
+ final File currentDir = new File(actionXml).getParentFile();
writeHadoopConfig(actionXml, currentDir);
}
}
@@ -321,15 +118,15 @@ public class SparkMain extends LauncherMain {
* @throws IOException if there is an error during file copy
*/
private void createPySparkLibFolder() throws OozieActionConfiguratorException, IOException {
- File pythonLibDir = new File("python/lib");
+ final File pythonLibDir = new File("python/lib");
if(!pythonLibDir.exists()){
pythonLibDir.mkdirs();
System.out.println("PySpark lib folder " + pythonLibDir.getAbsolutePath() + " folder created.");
}
- for(Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) {
- File file = getMatchingPyFile(fileNamePattern);
- File destination = new File(pythonLibDir, file.getName());
+ for(final Pattern fileNamePattern : PYSPARK_DEP_FILE_PATTERN) {
+ final File file = getMatchingPyFile(fileNamePattern);
+ final File destination = new File(pythonLibDir, file.getName());
FileUtils.copyFile(file, destination);
System.out.println("Copied " + file + " to " + destination.getAbsolutePath());
}
@@ -342,8 +139,8 @@ public class SparkMain extends LauncherMain {
* @return the file if there is one
* @throws OozieActionConfiguratorException if there is are no files matching the pattern
*/
- private File getMatchingPyFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
- File f = getMatchingFile(fileNamePattern);
+ private File getMatchingPyFile(final Pattern fileNamePattern) throws OozieActionConfiguratorException {
+ final File f = getMatchingFile(fileNamePattern);
if (f != null) {
return f;
}
@@ -359,22 +156,24 @@ public class SparkMain extends LauncherMain {
* @param fileNamePattern the pattern to look for
* @return the file if there is one else it returns null
*/
- private static File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
- File localDir = new File(".");
- String[] files = localDir.list();
-
- if (files != null) {
- for(String fileName : files){
- if(fileNamePattern.matcher(fileName).find()){
- return new File(fileName);
- }
+ static File getMatchingFile(final Pattern fileNamePattern) {
+ final File localDir = new File(".");
+
+ final String[] localFileNames = localDir.list();
+ if (localFileNames == null) {
+ return null;
+ }
+
+ for (final String fileName : localFileNames){
+ if (fileNamePattern.matcher(fileName).find()){
+ return new File(fileName);
}
}
return null;
}
- private void runSpark(String[] args) throws Exception {
+ private void runSpark(final String[] args) throws Exception {
System.out.println("=================================================================");
System.out.println();
System.out.println(">>> Invoking Spark class now >>>");
@@ -383,49 +182,16 @@ public class SparkMain extends LauncherMain {
SparkSubmit.main(args);
}
- /**
- * Converts the options to be Spark-compatible.
- * <ul>
- * <li>Parameters are separated by whitespace and can be groupped using double quotes</li>
- * <li>Quotes should be removed</li>
- * <li>Adjacent whitespace separators are treated as one</li>
- * </ul>
- * @param sparkOpts the options for Spark
- * @return the options parsed into a list
- */
- static List<String> splitSparkOpts(String sparkOpts){
- List<String> result = new ArrayList<String>();
- StringBuilder currentWord = new StringBuilder();
- boolean insideQuote = false;
- for (int i = 0; i < sparkOpts.length(); i++) {
- char c = sparkOpts.charAt(i);
- if (c == '"') {
- insideQuote = !insideQuote;
- } else if (Character.isWhitespace(c) && !insideQuote) {
- if (currentWord.length() > 0) {
- result.add(currentWord.toString());
- currentWord.setLength(0);
- }
- } else {
- currentWord.append(c);
- }
- }
- if(currentWord.length()>0) {
- result.add(currentWord.toString());
- }
- return result;
- }
-
- private String setUpSparkLog4J(Configuration distcpConf) throws IOException {
+ private String setUpSparkLog4J(final Configuration actionConf) throws IOException {
// Logfile to capture job IDs
- String hadoopJobId = System.getProperty("oozie.launcher.job.id");
+ final String hadoopJobId = System.getProperty("oozie.launcher.job.id");
if (hadoopJobId == null) {
throw new RuntimeException("Launcher Hadoop Job ID system,property not set");
}
- String logFile = new File("spark-oozie-" + hadoopJobId + ".log").getAbsolutePath();
+ final String logFile = new File("spark-oozie-" + hadoopJobId + ".log").getAbsolutePath();
- String logLevel = distcpConf.get("oozie.spark.log.level", "INFO");
- String rootLogLevel = distcpConf.get("oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL, "INFO");
+ final String logLevel = actionConf.get("oozie.spark.log.level", "INFO");
+ final String rootLogLevel = actionConf.get("oozie.action." + LauncherMapper.ROOT_LOGGER_LEVEL, "INFO");
log4jProperties.setProperty("log4j.rootLogger", rootLogLevel + ", A");
log4jProperties.setProperty("log4j.logger.org.apache.spark", logLevel + ", A, jobid");
@@ -441,7 +207,7 @@ public class SparkMain extends LauncherMain {
log4jProperties.setProperty("log4j.logger.org.apache.hadoop.mapreduce.Job", "INFO, jobid");
log4jProperties.setProperty("log4j.logger.org.apache.hadoop.yarn.client.api.impl.YarnClientImpl", "INFO, jobid");
- String localProps = new File(SPARK_LOG4J_PROPS).getAbsolutePath();
+ final String localProps = new File(SPARK_LOG4J_PROPS).getAbsolutePath();
try (OutputStream os1 = new FileOutputStream(localProps)) {
log4jProperties.store(os1, "");
}
@@ -458,100 +224,27 @@ public class SparkMain extends LauncherMain {
* @throws IOException
* @throws URISyntaxException
*/
- static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(URI[] files) throws IOException, URISyntaxException {
- Map<String, URI> map= new HashMap<>();
+ static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] files) throws IOException, URISyntaxException {
+ final Map<String, URI> map= new LinkedHashMap<>();
if (files == null) {
return map;
}
- FileSystem fs = FileSystem.get(new Configuration(true));
+ final FileSystem fs = FileSystem.get(new Configuration(true));
for (int i = 0; i < files.length; i++) {
- URI fileUri = files[i];
- Path p = new Path(fileUri);
- map.put(p.getName(), getFixedUri(fs, fileUri));
+ final URI fileUri = files[i];
+ final Path p = new Path(fileUri);
+ map.put(p.getName(), HadoopUriFinder.getFixedUri(fs, fileUri));
}
return map;
}
/**
- * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
- *
- * @param sparkArgs
- * @param sparkYarnJar
- * @param sparkVersion
- */
- private void setSparkYarnJarsConf(List<String> sparkArgs, String sparkYarnJar, String sparkVersion) {
- if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
- // In Spark 1.X.X, set spark.yarn.jar to avoid
- // multiple distribution
- sparkArgs.add("--conf");
- sparkArgs.add(SPARK_YARN_JAR + "=" + sparkYarnJar);
- }
- else {
- // In Spark 2.X.X, set spark.yarn.jars
- sparkArgs.add("--conf");
- sparkArgs.add(SPARK_YARN_JARS + "=" + sparkYarnJar);
- }
- }
-
- private static String getJarVersion(File jarFile) throws IOException {
- @SuppressWarnings("resource")
- Manifest manifest = new JarFile(jarFile).getManifest();
- return manifest.getMainAttributes().getValue("Specification-Version");
- }
-
- /*
- * Get properties that needs to be passed to Spark as Spark configuration from actionConf.
- */
- @VisibleForTesting
- protected void appendOoziePropertiesToSparkConf(List<String> sparkArgs, Configuration actionConf) {
- for (Map.Entry<String, String> oozieConfig : actionConf
- .getValByRegex("^oozie\\.(?!launcher|spark).+").entrySet()) {
- sparkArgs.add("--conf");
- sparkArgs.add(String.format("spark.%s=%s", oozieConfig.getKey(), oozieConfig.getValue()));
- }
- }
-
- private void appendWithPathSeparator(String what, StringBuilder to){
- if(to.length() > 0){
- to.append(File.pathSeparator);
- }
- to.append(what);
- }
-
- private static URI getFixedUri(URI fileUri) throws URISyntaxException, IOException {
- FileSystem fs = FileSystem.get(new Configuration(true));
- return getFixedUri(fs, fileUri);
- }
-
- /**
- * Spark compares URIs based on scheme, host and port. Here we convert URIs
- * into the default format so that Spark won't think those belong to
- * different file system. This will avoid an extra copy of files which
- * already exists on same hdfs.
- *
- * @param fs
- * @param fileUri
- * @return fixed uri
- * @throws URISyntaxException
- */
- private static URI getFixedUri(FileSystem fs, URI fileUri) throws URISyntaxException {
- if (fs.getUri().getScheme().equals(fileUri.getScheme())
- && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
- && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
- || fs.getUri().getPort() == fileUri.getPort())) {
- return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(),
- fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
- }
- return fileUri;
- }
-
- /**
* Sets up hive-site.xml
*
* @param hiveConf
* @throws IOException
*/
- private void setHiveSite(Configuration hiveConf) throws IOException {
+ private void setHiveSite(final Configuration hiveConf) throws IOException {
// See https://issues.apache.org/jira/browse/HIVE-1411
hiveConf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG");
@@ -579,99 +272,4 @@ public class SparkMain extends LauncherMain {
// to null.
HiveConf.setHiveSiteLocation(HiveConf.class.getClassLoader().getResource("hive-site.xml"));
}
-
- /**
- * This class is used for filtering out unwanted jars.
- */
- static class JarFilter {
- private String sparkVersion = "1.X.X";
- private String sparkYarnJar;
- private String applicationJar;
- private Collection<URI> listUris = null;
-
- /**
- * @param listUris List of URIs to be filtered
- * @param jarPath Application jar
- * @throws IOException
- * @throws URISyntaxException
- */
- public JarFilter(Collection<URI> listUris, String jarPath) throws URISyntaxException, IOException {
- this.listUris = listUris;
- applicationJar = jarPath;
- Path p = new Path(jarPath);
- if (p.isAbsolute()) {
- applicationJar = getFixedUri(p.toUri()).toString();
- }
- }
-
- /**
- * Filters out the Spark yarn jar and application jar. Also records
- * spark yarn jar's version.
- *
- * @throws OozieActionConfiguratorException
- */
- public void filter() throws OozieActionConfiguratorException {
- Iterator<URI> iterator = listUris.iterator();
- File matchedFile = null;
- Path applJarPath = new Path(applicationJar);
- while (iterator.hasNext()) {
- URI uri = iterator.next();
- Path p = new Path(uri);
- if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
- matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
- }
- else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
- matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
- }
- if (matchedFile != null) {
- sparkYarnJar = uri.toString();
- try {
- sparkVersion = getJarVersion(matchedFile);
- System.out.println("Spark Version " + sparkVersion);
- }
- catch (IOException io) {
- System.out.println(
- "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
- }
- iterator.remove();
- matchedFile = null;
- }
- // Here we skip the application jar, because
- // (if uris are same,) it will get distributed multiple times
- // - one time with --files and another time as application jar.
- if (isApplicationJar(p.getName(), uri, applJarPath)) {
- String fragment = uri.getFragment();
- applicationJar = fragment != null && fragment.length() > 0 ? fragment : uri.toString();
- iterator.remove();
- }
- }
- }
-
- /**
- * Checks if a file is application jar
- *
- * @param fileName fileName name of the file
- * @param fileUri fileUri URI of the file
- * @param applJarPath Path of application jar
- * @return true if fileName or fileUri is the application jar
- */
- private boolean isApplicationJar(String fileName, URI fileUri, Path applJarPath) {
- return (fileName.equals(applicationJar) || fileUri.toString().equals(applicationJar)
- || applJarPath.getName().equals(fileName)
- || applicationJar.equals(fileUri.getFragment()));
- }
-
- public String getApplicationJar() {
- return applicationJar;
- }
-
- public String getSparkYarnJar() {
- return sparkYarnJar;
- }
-
- public String getSparkVersion() {
- return sparkVersion;
- }
-
- }
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java
new file mode 100644
index 0000000..30def6f
--- /dev/null
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkOptionsSplitter.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import java.util.ArrayList;
+import java.util.List;
+
+class SparkOptionsSplitter {
+
+ /**
+ * Converts the options to be Spark-compatible.
+ * <ul>
+ * <li>Parameters are separated by whitespace and can be groupped using double quotes</li>
+ * <li>Quotes should be removed</li>
+ * <li>Adjacent whitespace separators are treated as one</li>
+ * </ul>
+ *
+ * @param sparkOpts the options for Spark
+ * @return the options parsed into a list
+ */
+ static List<String> splitSparkOpts(final String sparkOpts) {
+ final List<String> result = new ArrayList<String>();
+ final StringBuilder currentWord = new StringBuilder();
+
+ boolean insideQuote = false;
+ for (int i = 0; i < sparkOpts.length(); i++) {
+ final char c = sparkOpts.charAt(i);
+ if (c == '"') {
+ insideQuote = !insideQuote;
+ }
+ else if (Character.isWhitespace(c) && !insideQuote) {
+ if (currentWord.length() > 0) {
+ result.add(currentWord.toString());
+ currentWord.setLength(0);
+ }
+ }
+ else {
+ currentWord.append(c);
+ }
+ }
+
+ if (currentWord.length() > 0) {
+ result.add(currentWord.toString());
+ }
+
+ return result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
index 2d4c83c..ff1b3ce 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestJarFilter.java
@@ -31,7 +31,6 @@ import java.util.jar.Attributes;
import java.util.jar.JarOutputStream;
import java.util.jar.Manifest;
-import org.apache.oozie.action.hadoop.SparkMain.JarFilter;
import org.junit.Test;
public class TestJarFilter {
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
new file mode 100644
index 0000000..7db26a6
--- /dev/null
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import com.google.common.collect.Lists;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestSparkArgsExtractor {
+
+ @Test
+ public void testAppendOoziePropertiesToSparkConf() throws Exception {
+ final List<String> sparkArgs = new ArrayList<>();
+ final Configuration actionConf = new Configuration();
+ actionConf.set("foo", "foo-not-to-include");
+ actionConf.set("oozie.launcher", "launcher-not-to-include");
+ actionConf.set("oozie.spark", "spark-not-to-include");
+ actionConf.set("oozie.bar", "bar");
+
+ new SparkArgsExtractor(actionConf).appendOoziePropertiesToSparkConf(sparkArgs);
+
+ assertEquals(Lists.newArrayList("--conf", "spark.oozie.bar=bar"), sparkArgs);
+ }
+
+ @Test
+ public void testLocalClientArgsParsing() throws Exception {
+ final Configuration actionConf = new Configuration();
+ actionConf.set(SparkActionExecutor.SPARK_MASTER, "local[*]");
+ actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+ actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+ actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+ actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1042M " +
+ "--conf spark.executor.extraClassPath=aaa " +
+ "--conf user.property.after.spark.executor.extraClassPath=bbb " +
+ "--conf spark.driver.extraClassPath=ccc " +
+ "--conf user.property.after.spark.driver.extraClassPath=ddd " +
+ "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\"");
+ actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+ final String[] mainArgs = {"arg0", "arg1"};
+ final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+ assertEquals("Spark args mismatch",
+ Lists.newArrayList("--master", "local[*]",
+ "--deploy-mode", "client",
+ "--name", "Spark Copy File",
+ "--class", "org.apache.oozie.example.SparkFileCopy",
+ "--driver-memory", "1042M",
+ "--conf", "spark.executor.extraClassPath=aaa",
+ "--conf", "user.property.after.spark.executor.extraClassPath=bbb",
+ "--conf", "spark.driver.extraClassPath=ccc",
+ "--conf", "user.property.after.spark.driver.extraClassPath=ddd",
+ "--conf", "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError " +
+ "-XX:HeapDumpPath=/tmp -Dlog4j.configuration=spark-log4j.properties",
+ "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+ "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+ "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+ "--verbose",
+ "/lib/test.jar",
+ "arg0",
+ "arg1"),
+ sparkArgs);
+ }
+
+ @Test
+ public void testYarnClientExecutorAndDriverExtraClasspathsArgsParsing() throws Exception {
+ final Configuration actionConf = new Configuration();
+ actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+ actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+ actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+ actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+ actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-memory 1042M " +
+ "--conf spark.executor.extraClassPath=aaa " +
+ "--conf user.property.after.spark.executor.extraClassPath=bbb " +
+ "--conf spark.driver.extraClassPath=ccc " +
+ "--conf user.property.after.spark.driver.extraClassPath=ddd " +
+ "--conf spark.executor.extraJavaOptions=\"-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/tmp\"");
+ actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+ final String[] mainArgs = {"arg0", "arg1"};
+ final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+ assertEquals("Spark args mismatch",
+ Lists.newArrayList("--master", "yarn",
+ "--deploy-mode", "client",
+ "--name", "Spark Copy File",
+ "--class", "org.apache.oozie.example.SparkFileCopy",
+ "--driver-memory", "1042M",
+ "--conf", "user.property.after.spark.executor.extraClassPath=bbb",
+ "--conf", "user.property.after.spark.driver.extraClassPath=ddd",
+ "--conf", "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError " +
+ "-XX:HeapDumpPath=/tmp -Dlog4j.configuration=spark-log4j.properties",
+ "--conf", "spark.executor.extraClassPath=aaa:$PWD/*",
+ "--conf", "spark.driver.extraClassPath=ccc:$PWD/*",
+ "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+ "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+ "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+ "--files", "spark-log4j.properties,hive-site.xml",
+ "--conf", "spark.yarn.jar=null",
+ "--verbose",
+ "/lib/test.jar",
+ "arg0",
+ "arg1"),
+ sparkArgs);
+ }
+
+ @Test
+ public void testYarnClientFilesAndArchivesArgsParsing() throws Exception {
+ final Configuration actionConf = new Configuration();
+ actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+ actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+ actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+ actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+ actionConf.set(SparkActionExecutor.SPARK_OPTS, "--files aaa " +
+ "--archives bbb " +
+ "--files=ccc " +
+ "--archives=ddd");
+ actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+ final String[] mainArgs = {"arg0", "arg1"};
+ final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+ assertEquals("Spark args mismatch",
+ Lists.newArrayList("--master", "yarn",
+ "--deploy-mode", "client",
+ "--name", "Spark Copy File",
+ "--class", "org.apache.oozie.example.SparkFileCopy",
+ "--conf", "spark.executor.extraClassPath=$PWD/*",
+ "--conf", "spark.driver.extraClassPath=$PWD/*",
+ "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+ "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+ "--conf", "spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+ "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+ "--files", "spark-log4j.properties,hive-site.xml,aaa,ccc",
+ "--archives", "bbb,ddd",
+ "--conf", "spark.yarn.jar=null",
+ "--verbose",
+ "/lib/test.jar",
+ "arg0",
+ "arg1"),
+ sparkArgs);
+ }
+
+ @Test
+ public void testDriverClassPathArgsParsing() throws Exception {
+ final Configuration actionConf = new Configuration();
+ actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+ actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+ actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+ actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+ actionConf.set(SparkActionExecutor.SPARK_OPTS, "--driver-class-path aaa");
+ actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+ final String[] mainArgs = {"arg0", "arg1"};
+ final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(mainArgs);
+
+ assertEquals("Spark args mismatch",
+ Lists.newArrayList("--master", "yarn",
+ "--deploy-mode", "client",
+ "--name", "Spark Copy File",
+ "--class", "org.apache.oozie.example.SparkFileCopy",
+ "--conf", "spark.executor.extraClassPath=$PWD/*",
+ "--conf", "spark.driver.extraClassPath=aaa:$PWD/*",
+ "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+ "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+ "--conf", "spark.executor.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+ "--conf", "spark.driver.extraJavaOptions=-Dlog4j.configuration=spark-log4j.properties",
+ "--files", "spark-log4j.properties,hive-site.xml",
+ "--conf", "spark.yarn.jar=null",
+ "--verbose",
+ "/lib/test.jar",
+ "arg0",
+ "arg1"),
+ sparkArgs);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
index 5f6a0cd..b9f37c8 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
@@ -127,19 +127,6 @@ public class TestSparkMain extends MainTestCase {
}
}
- public void testAppendOoziePropertiesToSparkConf() throws Exception {
- SparkMain instance = SparkMain.class.newInstance();
- List<String> sparkArgs = new ArrayList<String>();
- Configuration actionConf = new Configuration();
- actionConf.set("foo", "foo-not-to-include");
- actionConf.set("oozie.launcher", "launcher-not-to-include");
- actionConf.set("oozie.spark", "spark-not-to-include");
- actionConf.set("oozie.bar", "bar");
-
- instance.appendOoziePropertiesToSparkConf(sparkArgs, actionConf);
- assertEquals(Lists.newArrayList("--conf", "spark.oozie.bar=bar"), sparkArgs);
- }
-
public void testJobIDPattern() {
List<String> lines = new ArrayList<String>();
lines.add("Submitted application application_001");
http://git-wip-us.apache.org/repos/asf/oozie/blob/f45b1879/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
index 31f53ac..02786a2 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkOptionsSplitter.java
@@ -34,13 +34,13 @@ public class TestSparkOptionsSplitter {
@Parameterized.Parameters
public static List<Object[]> params() {
return Arrays.asList(new Object[][]{
- {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})},
- {"--option1 value1", Arrays.asList(new String[]{"--option1", "value1"})},
- {" --option1 value1 ", Arrays.asList(new String[]{"--option1", "value1"})},
- {"--conf special=value1", Arrays.asList(new String[]{"--conf", "special=value1"})},
- {"--conf special=\"value1\"", Arrays.asList(new String[]{"--conf", "special=value1"})},
- {"--conf special=\"value1 value2\"", Arrays.asList(new String[]{"--conf", "special=value1 value2"})},
- {" --conf special=\"value1 value2\" ", Arrays.asList(new String[]{"--conf", "special=value1 value2"})},
+ {"--option1 value1", Arrays.asList("--option1", "value1")},
+ {"--option1 value1", Arrays.asList("--option1", "value1")},
+ {" --option1 value1 ", Arrays.asList("--option1", "value1")},
+ {"--conf special=value1", Arrays.asList("--conf", "special=value1")},
+ {"--conf special=\"value1\"", Arrays.asList("--conf", "special=value1")},
+ {"--conf special=\"value1 value2\"", Arrays.asList("--conf", "special=value1 value2")},
+ {" --conf special=\"value1 value2\" ", Arrays.asList("--conf", "special=value1 value2")},
});
}
@@ -55,6 +55,6 @@ public class TestSparkOptionsSplitter {
@Test
public void test() {
- assertThat("Error for input >>" + input + "<<", SparkMain.splitSparkOpts(input), is(output));
+ assertThat("Error for input >>" + input + "<<", SparkOptionsSplitter.splitSparkOpts(input), is(output));
}
}