You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by an...@apache.org on 2018/05/16 14:21:42 UTC
oozie git commit: OOZIE-3228 [Spark action] Can't load properties
from spark-defaults.conf (andras.piros)
Repository: oozie
Updated Branches:
refs/heads/master 438ba6df7 -> 5b21530aa
OOZIE-3228 [Spark action] Can't load properties from spark-defaults.conf (andras.piros)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5b21530a
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5b21530a
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5b21530a
Branch: refs/heads/master
Commit: 5b21530aaee04f7f3af8e43af7f9ccd01935f589
Parents: 438ba6d
Author: Andras Piros <an...@cloudera.com>
Authored: Wed May 16 16:19:36 2018 +0200
Committer: Andras Piros <an...@cloudera.com>
Committed: Wed May 16 16:19:36 2018 +0200
----------------------------------------------------------------------
.../site/twiki/DG_SparkActionExtension.twiki | 17 ++++--
release-log.txt | 1 +
.../oozie/action/hadoop/SparkArgsExtractor.java | 58 +++++++++++++++++++-
.../action/hadoop/TestSparkArgsExtractor.java | 53 +++++++++++++++++-
4 files changed, 121 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 66076ff..ce80e45 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -109,10 +109,10 @@ Some examples of the =spark-opts= element:
* '--conf key=value --verbose --properties-file user.properties'
There are several ways to define properties that will be passed to Spark. They are processed in the following order:
- * propagated from =oozie.service.SparkConfigurationService.spark.configurations=
- * read from a localized =spark-defaults.conf= file
- * read from a file defined in =spark-opts= via the =--properties-file=
- * properties defined in =spark-opts= element
+ * propagated from =oozie.service.SparkConfigurationService.spark.configurations=
+ * read from a localized =spark-defaults.conf= file
+ * read from a file defined in =spark-opts= via the =--properties-file=
+ * properties defined in =spark-opts= element
(The latter takes precedence over the former.)
The server propagated properties, the =spark-defaults.conf= and the user-defined properties file are merged together into a
@@ -120,6 +120,15 @@ single properties file as Spark handles only one file in its =--properties-file=
The =arg= element if present, contains arguments that can be passed to spark application.
+In case some property values are present both in =spark-defaults.conf= and as property key/value pairs generated by Oozie, the user
+configured values from =spark-defaults.conf= are prepended to the ones generated by Oozie, as part of the Spark arguments list.
+
+Following properties to prepend to Spark arguments:
+ * =spark.executor.extraClassPath=
+ * =spark.driver.extraClassPath=
+ * =spark.executor.extraJavaOptions=
+ * =spark.driver.extraJavaOptions=
+
All the above elements can be parameterized (templatized) using EL
expressions.
http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 267af2a..a89b714 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.1.0 release (trunk - unreleased)
+OOZIE-3228 [Spark action] Can't load properties from spark-defaults.conf (andras.piros)
OOZIE-3250 Reduce heap waste by reducing duplicate byte[] count (andras.piros)
OOZIE-3240 Flaky test TestJMSAccessorService#testConnectionRetry (pbacsko via gezapeti)
OOZIE-3246 Flaky test TestJMSJobEventListener#testConnectionDrop (pbacsko via gezapeti)
http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
index 5af39cd..28d9c5c 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkArgsExtractor.java
@@ -42,6 +42,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.ListIterator;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
@@ -76,6 +77,8 @@ class SparkArgsExtractor {
private static final String SPARK_YARN_JARS = "spark.yarn.jars";
private static final String OPT_SEPARATOR = "=";
private static final String OPT_VALUE_SEPARATOR = ",";
+ private static final String SPARK_OPT_SEPARATOR = ":";
+ private static final String JAVA_OPT_SEPARATOR = " ";
private static final String CONF_OPTION = "--conf";
private static final String MASTER_OPTION_YARN_CLUSTER = "yarn-cluster";
private static final String MASTER_OPTION_YARN_CLIENT = "yarn-client";
@@ -84,7 +87,7 @@ class SparkArgsExtractor {
private static final String DEPLOY_MODE_CLIENT = "client";
private static final String SPARK_YARN_TAGS = "spark.yarn.tags";
private static final String OPT_PROPERTIES_FILE = "--properties-file";
- public static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = "spark-defaults-oozie-generated.properties";
+ static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = "spark-defaults-oozie-generated.properties";
private boolean pySpark = false;
private final Configuration actionConf;
@@ -351,6 +354,8 @@ class SparkArgsExtractor {
if (persisted) {
sparkArgs.add(OPT_PROPERTIES_FILE);
sparkArgs.add(SPARK_DEFAULTS_GENERATED_PROPERTIES);
+
+ checkPropertiesAndPrependArgs(properties, sparkArgs);
}
}
@@ -371,6 +376,57 @@ class SparkArgsExtractor {
return false;
}
+ /**
+ * In case some property values are present both in {@code spark-defaults.conf} and as property key/value pairs generated by
+ * Oozie, prepend the user configured values from {@code spark-defaults.conf} to the ones generated by Oozie, as part of the
+ * Spark arguments list. Users don't want to lose the configured values, and we don't want to lose the generated ones, either.
+ * <p>
+ * Following properties to prepend to Spark arguments:
+ * <ul>
+ * <li>{@code spark.executor.extraClassPath}</li>
+ * <li>{@code spark.driver.extraClassPath}</li>
+ * <li>{@code spark.executor.extraJavaOptions}</li>
+ * <li>{@code spark.driver.extraJavaOptions}</li>
+ * </ul>
+ * @param source {@link Properties} defined in {@code spark-defaults.conf} by the user
+ * @param target Spark options
+ */
+ private void checkPropertiesAndPrependArgs(final Properties source, final List<String> target) {
+ checkPropertiesAndPrependArg(EXECUTOR_CLASSPATH, SPARK_OPT_SEPARATOR, source, target);
+ checkPropertiesAndPrependArg(DRIVER_CLASSPATH, SPARK_OPT_SEPARATOR, source, target);
+ checkPropertiesAndPrependArg(EXECUTOR_EXTRA_JAVA_OPTIONS, JAVA_OPT_SEPARATOR, source, target);
+ checkPropertiesAndPrependArg(DRIVER_EXTRA_JAVA_OPTIONS, JAVA_OPT_SEPARATOR, source, target);
+ }
+
+ /**
+ * Prepend one user defined property value from {@code spark-defaults.properties} to the Oozie generated value, and store to
+ * Spark options.
+ * @param key key of the user defined property key/value pair
+ * @param separator user defined and generated values must be separated, depending on the context
+ * @param source {@link Properties} defined in {@code spark-defaults.conf} by the user
+ * @param target Spark options
+ */
+ private void checkPropertiesAndPrependArg(final String key,
+ final String separator,
+ final Properties source,
+ final List<String> target) {
+ final String propertiesKey = key.replace(OPT_SEPARATOR, "");
+ if (source.containsKey(propertiesKey)) {
+ final ListIterator<String> targetIterator = target.listIterator();
+ while (targetIterator.hasNext()) {
+ final String arg = targetIterator.next();
+ if (arg.startsWith(key)) {
+ final String valueToPrepend = source.getProperty(propertiesKey);
+ final String oldValue = arg.substring(arg.indexOf(key) + key.length());
+ String newValue = valueToPrepend + separator + oldValue;
+ System.out.println(String.format("Spark argument to replace: [%s=%s]", propertiesKey, oldValue));
+ targetIterator.set(key + newValue);
+ System.out.println(String.format("Spark argument replaced with: [%s=%s]", propertiesKey, newValue));
+ }
+ }
+ }
+ }
+
private void loadUserDefinedPropertiesFile(final String userDefinedPropertiesFile, final Properties properties) {
if (userDefinedPropertiesFile != null) {
System.out.println(String.format("Reading Spark config from %s %s...", OPT_PROPERTIES_FILE, userDefinedPropertiesFile));
http://git-wip-us.apache.org/repos/asf/oozie/blob/5b21530a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
index 6a9baa5..d75e727 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkArgsExtractor.java
@@ -22,12 +22,12 @@ import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.junit.After;
import org.junit.Test;
-import scala.util.PropertiesTrait;
import java.io.File;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
@@ -37,6 +37,8 @@ import static org.junit.Assert.assertEquals;
public class TestSparkArgsExtractor {
+ private static final String SPARK_DEFAULTS_PROPERTIES = "spark-defaults.conf";
+
@Test
public void testAppendOoziePropertiesToSparkConf() throws Exception {
final List<String> sparkArgs = new ArrayList<>();
@@ -307,6 +309,46 @@ public class TestSparkArgsExtractor {
sparkArgs);
}
+ @Test
+ public void testPropertiesArePrependedToSparkArgs() throws IOException, OozieActionConfiguratorException, URISyntaxException {
+ final Configuration actionConf = new Configuration();
+ actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
+ actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
+ actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
+ actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
+ actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
+
+ createTemporaryFileWithContent(SPARK_DEFAULTS_PROPERTIES, "spark.executor.extraClassPath=/etc/hbase/conf:/etc/hive/conf\n" +
+ "spark.driver.extraClassPath=/etc/hbase/conf:/etc/hive/conf\n" +
+ "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions\n" +
+ "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions");
+
+ final List<String> sparkArgs = new SparkArgsExtractor(actionConf).extract(new String[0]);
+
+ assertEquals("Spark args mismatch",
+ Lists.newArrayList("--master", "yarn",
+ "--deploy-mode", "client",
+ "--name", "Spark Copy File",
+ "--class", "org.apache.oozie.example.SparkFileCopy",
+ "--conf", "spark.executor.extraClassPath=/etc/hbase/conf:/etc/hive/conf:$PWD/*",
+ "--conf", "spark.driver.extraClassPath=/etc/hbase/conf:/etc/hive/conf:$PWD/*",
+ "--conf", "spark.yarn.security.tokens.hadoopfs.enabled=false",
+ "--conf", "spark.yarn.security.tokens.hive.enabled=false",
+ "--conf", "spark.yarn.security.tokens.hbase.enabled=false",
+ "--conf", "spark.yarn.security.credentials.hadoopfs.enabled=false",
+ "--conf", "spark.yarn.security.credentials.hive.enabled=false",
+ "--conf", "spark.yarn.security.credentials.hbase.enabled=false",
+ "--conf", "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions " +
+ "-Dlog4j.configuration=spark-log4j.properties",
+ "--conf", "spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:+PrintGC -XX:+UnlockExperimentalVMOptions " +
+ "-Dlog4j.configuration=spark-log4j.properties",
+ "--properties-file", "spark-defaults-oozie-generated.properties",
+ "--files", "spark-log4j.properties,hive-site.xml",
+ "--conf", "spark.yarn.jar=null",
+ "--verbose", "/lib/test.jar"),
+ sparkArgs);
+ }
+
private Properties readMergedProperties() throws IOException {
final File file = new File(SPARK_DEFAULTS_GENERATED_PROPERTIES);
file.deleteOnExit();
@@ -327,8 +369,13 @@ public class TestSparkArgsExtractor {
@After
public void cleanUp() throws Exception {
- File f = new File("spark-defaults.conf");
- if(f.exists()) {
+ checkAndDeleteFile(SPARK_DEFAULTS_GENERATED_PROPERTIES);
+ checkAndDeleteFile(SPARK_DEFAULTS_PROPERTIES);
+ }
+
+ private void checkAndDeleteFile(final String filename) {
+ final File f = new File(filename);
+ if (f.exists()) {
f.delete();
}
}