You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2015/09/01 01:33:51 UTC
oozie git commit: OOZIE-2277 Honor oozie.action.sharelib.for.spark in
Spark jobs (rkanter)
Repository: oozie
Updated Branches:
refs/heads/master 6b61d878e -> 5a598039a
OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/5a598039
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/5a598039
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/5a598039
Branch: refs/heads/master
Commit: 5a598039a8aeabcf46d0f1d13542c94ff9b8475f
Parents: 6b61d87
Author: Robert Kanter <rk...@cloudera.com>
Authored: Mon Aug 31 16:32:47 2015 -0700
Committer: Robert Kanter <rk...@cloudera.com>
Committed: Mon Aug 31 16:32:47 2015 -0700
----------------------------------------------------------------------
.../service/SparkConfigurationService.java | 13 +-
core/src/main/resources/oozie-default.xml | 10 ++
.../service/TestSparkConfigurationService.java | 12 ++
.../site/twiki/DG_SparkActionExtension.twiki | 17 +--
release-log.txt | 1 +
sharelib/spark/pom.xml | 142 +++++++++++++++++--
.../SparkMain.java | 132 ++++++++++++++++-
.../action/hadoop/TestSparkActionExecutor.java | 18 ++-
8 files changed, 310 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
index 1b7cf4a..b29ab8d 100644
--- a/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
+++ b/core/src/main/java/org/apache/oozie/service/SparkConfigurationService.java
@@ -36,10 +36,14 @@ public class SparkConfigurationService implements Service {
private static XLog LOG = XLog.getLog(SparkConfigurationService.class);
- public static final String SPARK_CONFIGURATION = "oozie.service.SparkConfigurationService.spark.configurations";
+ public static final String CONF_PREFIX = Service.CONF_PREFIX + "SparkConfigurationService.";
+ public static final String SPARK_CONFIGURATIONS = CONF_PREFIX + "spark.configurations";
+ public static final String SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR
+ = CONF_PREFIX + "spark.configurations.ignore.spark.yarn.jar";
private Map<String, Map<String, String>> sparkConfigs;
private static final String SPARK_CONFIG_FILE = "spark-defaults.conf";
+ private static final String SPARK_YARN_JAR_PROP = "spark.yarn.jar";
@Override
public void init(Services services) throws ServiceException {
@@ -59,8 +63,9 @@ public class SparkConfigurationService implements Service {
private void loadSparkConfigs() throws ServiceException {
sparkConfigs = new HashMap<String, Map<String, String>>();
File configDir = new File(ConfigurationService.getConfigurationDirectory());
- String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATION);
+ String[] confDefs = ConfigurationService.getStrings(SPARK_CONFIGURATIONS);
if (confDefs != null) {
+ boolean ignoreSparkYarnJar = ConfigurationService.getBoolean(SPARK_CONFIGURATIONS_IGNORE_SPARK_YARN_JAR);
for (String confDef : confDefs) {
if (confDef.trim().length() > 0) {
String[] parts = confDef.split("=");
@@ -80,6 +85,10 @@ public class SparkConfigurationService implements Service {
fr = new FileReader(file);
props.load(fr);
fr.close();
+ if (ignoreSparkYarnJar) {
+ // Ignore spark.yarn.jar because it may interfere with the Spark Sharelib jars
+ props.remove(SPARK_YARN_JAR_PROP);
+ }
sparkConfigs.put(hostPort, propsToMap(props));
LOG.info("Loaded Spark Configuration: {0}={1}", hostPort, file.getAbsolutePath());
} catch (IOException ioe) {
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/main/resources/oozie-default.xml
----------------------------------------------------------------------
diff --git a/core/src/main/resources/oozie-default.xml b/core/src/main/resources/oozie-default.xml
index fa7754a..8a0bc3b 100644
--- a/core/src/main/resources/oozie-default.xml
+++ b/core/src/main/resources/oozie-default.xml
@@ -2567,6 +2567,16 @@
</property>
<property>
+ <name>oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar</name>
+ <value>true</value>
+ <description>
+ If true, Oozie will ignore the "spark.yarn.jar" property from any Spark configurations specified in
+ oozie.service.SparkConfigurationService.spark.configurations. If false, Oozie will not ignore it. It is recommended
+ to leave this as true because it can interfere with the jars in the Spark sharelib.
+ </description>
+ </property>
+
+ <property>
<name>oozie.email.attachment.enabled</name>
<value>true</value>
<description>
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
index b2c499d..9d82fdc 100644
--- a/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestSparkConfigurationService.java
@@ -68,6 +68,7 @@ public class TestSparkConfigurationService extends XTestCase {
Properties sparkConf1Props = new Properties();
sparkConf1Props.setProperty("a", "A");
sparkConf1Props.setProperty("b", "B");
+ sparkConf1Props.setProperty("spark.yarn.jar", "foo"); // should be ignored by default
FileOutputStream fos = null;
try {
fos = new FileOutputStream(sparkConf1);
@@ -109,6 +110,17 @@ public class TestSparkConfigurationService extends XTestCase {
assertEquals(2, sparkConfigs.size());
assertEquals("Y", sparkConfigs.get("y"));
assertEquals("Z", sparkConfigs.get("z"));
+ scs.destroy();
+ // Setting this to false should make it not ignore spark.yarn.jar
+ ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar",
+ false);
+ scs.init(Services.get());
+ sparkConfigs = scs.getSparkConfig("rm1");
+ assertEquals(3, sparkConfigs.size());
+ assertEquals("A", sparkConfigs.get("a"));
+ assertEquals("B", sparkConfigs.get("b"));
+ assertEquals("foo", sparkConfigs.get("spark.yarn.jar"));
+ ConfigurationService.setBoolean("oozie.service.SparkConfigurationService.spark.configurations.ignore.spark.yarn.jar", true);
scs.destroy();
ConfigurationService.set("oozie.service.SparkConfigurationService.spark.configurations",
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index 32ebe12..ee11229 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -82,7 +82,14 @@ properties that are passed to the Spark job.
The =master= element indicates the url of the Spark Master. Ex: spark://host:port, mesos://host:port, yarn-cluster, yarn-master,
or local.
-The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster.
+The =mode= element if present indicates the mode of spark, where to run spark driver program. Ex: client,cluster. This is typically
+not required because you can specify it as part of =master= (i.e. master=yarn, mode=client is equivalent to master=yarn-client).
+A local =master= always runs in client mode.
+
+Depending on the =master= (and =mode=) entered, the Spark job will run differently as follows:
+ * local mode: everything runs here in the Launcher Job.
+ * yarn-client mode: the driver runs here in the Launcher Job and the executor in Yarn.
+ * yarn-cluster mode: the driver and executor run in Yarn.
The =name= element indicates the name of the spark application.
@@ -142,14 +149,8 @@ to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tr
---+++ Spark on YARN
-To make the Spark action run on YARN, you need to follow these steps:
-
-1. Make the spark-assembly jar available to your Spark action
-
-2. Specify "yarn-client" or "yarn-cluster" in the =master= element
-
To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties
-either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations=
+either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml.
1. spark.yarn.historyServer.address=http://SPH-HOST:18088
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index ce62dc0..a7c180b 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2277 Honor oozie.action.sharelib.for.spark in Spark jobs (rkanter)
OOZIE-2322 Oozie Web UI doesn't work with Kerberos in Internet Explorer 10 or 11 and curl (rkanter)
OOZIE-2343 Shell Action should take Oozie Action config and setup HADOOP_CONF_DIR (rkanter)
OOZIE-2245 Service to periodically check database schema (rkanter)
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index 6f7e74a..0878ae6 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -68,41 +68,76 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-graphx_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-hive_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ <exclusions>
<exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
</exclusion>
<exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jul-to-slf4j</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId>
</exclusion>
<exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>jcl-over-slf4j</artifactId>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-resourcemanager</artifactId>
</exclusion>
</exclusions>
</dependency>
-
<dependency>
- <groupId>org.apache.oozie</groupId>
- <artifactId>oozie-core</artifactId>
- <classifier>tests</classifier>
- <scope>test</scope>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-mllib_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
</dependency>
-
<dependency>
<groupId>org.apache.spark</groupId>
- <artifactId>spark-graphx_2.10</artifactId>
+ <artifactId>spark-repl_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-sql_2.10</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
-
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>${spark.version}</version>
<scope>compile</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-bagel_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ </dependency>
<dependency>
<groupId>org.apache.oozie</groupId>
@@ -117,8 +152,16 @@
</dependency>
<dependency>
+ <groupId>org.apache.oozie</groupId>
+ <artifactId>oozie-core</artifactId>
+ <classifier>tests</classifier>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-minicluster</artifactId>
+ <scope>test</scope>
</dependency>
<dependency>
@@ -205,5 +248,76 @@
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>hadoop-2</id>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-yarn_2.10</artifactId>
+ <version>${spark.version}</version>
+ <scope>compile</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-client</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-yarn-server-web-proxy</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-app</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-auth</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
index b18a0b9..5624951 100644
--- a/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org.apache.oozie.action.hadoop/SparkMain.java
@@ -22,7 +22,9 @@ import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.deploy.SparkSubmit;
+import java.io.File;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
public class SparkMain extends LauncherMain {
@@ -31,8 +33,14 @@ public class SparkMain extends LauncherMain {
private static final String JOB_NAME_OPTION = "--name";
private static final String CLASS_NAME_OPTION = "--class";
private static final String VERBOSE_OPTION = "--verbose";
- private static final String DELIM = " ";
+ private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
+ private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
+ private static final String DIST_FILES = "spark.yarn.dist.files=";
+ private static final String JARS_OPTION = "--jars";
+ private static final String DELIM = "\\s+";
+ private String sparkJars = null;
+ private String sparkClasspath = null;
public static void main(String[] args) throws Exception {
run(SparkMain.class, args);
@@ -47,13 +55,18 @@ public class SparkMain extends LauncherMain {
List<String> sparkArgs = new ArrayList<String>();
sparkArgs.add(MASTER_OPTION);
- sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_MASTER));
+ String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
+ sparkArgs.add(master);
String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
if (sparkDeployMode != null) {
sparkArgs.add(MODE_OPTION);
sparkArgs.add(sparkDeployMode);
}
+ boolean yarnClusterMode = master.equals("yarn-cluster")
+ || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("cluster"));
+ boolean yarnClientMode = master.equals("yarn-client")
+ || (master.equals("yarn") && sparkDeployMode != null && sparkDeployMode.equals("client"));
sparkArgs.add(JOB_NAME_OPTION);
sparkArgs.add(actionConf.get(SparkActionExecutor.SPARK_JOB_NAME));
@@ -64,19 +77,87 @@ public class SparkMain extends LauncherMain {
sparkArgs.add(className);
}
+ String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
+
+ // In local mode, everything runs here in the Launcher Job.
+ // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn.
+ // In yarn-cluster mode, the driver and executor run in Yarn.
+ // Due to this, configuring Spark's classpath is not straightforward (see below)
+
+ // Parse the spark opts. We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark. The way we do
+ // this depends on what mode is being used:
+ // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument
+ // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath
+ // yarn-client: Passed as comma-separted list via spark.yarn.dist.files
+ //
+ // --jars will cause the jars to be uploaded to HDFS and localized. To prevent the Sharelib and user jars from being
+ // unnecessarily reuploaded to HDFS, we use the HDFS paths for these. The hadoop jars are needed as well, but we'll have
+ // to use local paths for these because they're not in the Sharelib.
+ //
+ // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only
+ // localized jars. --jars will cause the jars to be localized to the working directory, so we can simply specify the jar
+ // names for the classpaths, as they'll be found in the working directory.
+ //
+ // This part looks more complicated than it is because we need to append the jars if the user already set things for
+ // these options
+ determineSparkJarsAndClasspath(actionConf, jarPath);
+ boolean addedExecutorClasspath = false;
+ boolean addedDriverClasspath = false;
+ boolean addedDistFiles = false;
+ boolean addedJars = false;
String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
if (StringUtils.isNotEmpty(sparkOpts)) {
String[] sparkOptions = sparkOpts.split(DELIM);
- for (String opt : sparkOptions) {
+ for (int i = 0; i < sparkOptions.length; i++) {
+ String opt = sparkOptions[i];
+ if (sparkJars != null) {
+ if (opt.equals(JARS_OPTION)) {
+ sparkArgs.add(opt);
+ i++;
+ opt = sparkOptions[i];
+ opt = opt + "," + sparkJars;
+ addedJars = true;
+ } else if (yarnClientMode && opt.startsWith(DIST_FILES)) {
+ opt = opt + "," + sparkJars;
+ addedDistFiles = true;
+ }
+ }
+ if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+ if (opt.startsWith(EXECUTOR_CLASSPATH)) {
+ opt = opt + File.pathSeparator + sparkClasspath;
+ addedExecutorClasspath = true;
+ }
+ if (opt.startsWith(DRIVER_CLASSPATH)) {
+ opt = opt + File.pathSeparator + sparkClasspath;
+ addedDriverClasspath = true;
+ }
+ }
sparkArgs.add(opt);
}
}
+ if (!addedJars && sparkJars != null) {
+ sparkArgs.add("--jars");
+ sparkArgs.add(sparkJars);
+ }
+ if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+ if (!addedExecutorClasspath) {
+ sparkArgs.add("--conf");
+ sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath);
+ }
+ if (!addedDriverClasspath) {
+ sparkArgs.add("--conf");
+ sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath);
+ }
+ }
+ if (yarnClientMode && !addedDistFiles && sparkJars != null) {
+ sparkArgs.add("--conf");
+ sparkArgs.add(DIST_FILES + sparkJars);
+ }
if (!sparkArgs.contains(VERBOSE_OPTION)) {
sparkArgs.add(VERBOSE_OPTION);
}
- String jarPath = actionConf.get(SparkActionExecutor.SPARK_JAR);
sparkArgs.add(jarPath);
for (String arg : args) {
@@ -103,4 +184,47 @@ public class SparkMain extends LauncherMain {
System.out.flush();
SparkSubmit.main(args);
}
+
+ private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) {
+ // distCache gets all of the Sharelib and user jars (from the Distributed Cache)
+ // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache
+ // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths
+ // sparkClasspath becomes all of the jar names in sparkJars (without paths)
+ // We also remove the Spark job jar and job.jar
+ String[] distCache = new String[]{};
+ String dCache = actionConf.get("mapreduce.job.classpath.files");
+ if (dCache != null) {
+ distCache = dCache.split(",");
+ }
+ String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator);
+ StringBuilder cp = new StringBuilder();
+ StringBuilder jars = new StringBuilder();
+ HashSet<String> distCacheJars = new HashSet<String>(distCache.length);
+ for (String path : distCache) {
+ // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here
+ if (!path.equals(jarPath)) {
+ String name = path.substring(path.lastIndexOf("/") + 1);
+ distCacheJars.add(name);
+ cp.append(name).append(File.pathSeparator);
+ jars.append(path).append(",");
+ }
+ }
+ for (String path : classpath) {
+ if (!path.startsWith("job.jar") && path.endsWith(".jar")) {
+ String name = path.substring(path.lastIndexOf("/") + 1);
+ if (!distCacheJars.contains(name)) {
+ jars.append(path).append(",");
+ }
+ cp.append(name).append(File.pathSeparator);
+ }
+ }
+ if (cp.length() > 0) {
+ cp.setLength(cp.length() - 1);
+ sparkClasspath = cp.toString();
+ }
+ if (jars.length() > 0) {
+ jars.setLength(jars.length() - 1);
+ sparkJars = jars.toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/oozie/blob/5a598039/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
index f271abc..dcd2360 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkActionExecutor.java
@@ -67,8 +67,11 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
}
public void testSetupMethods() throws Exception {
- _testSetupMethods("local[*]", new HashMap<String, String>());
- _testSetupMethods("yarn", new HashMap<String, String>());
+ _testSetupMethods("local[*]", new HashMap<String, String>(), "client");
+ _testSetupMethods("yarn", new HashMap<String, String>(), "cluster");
+ _testSetupMethods("yarn", new HashMap<String, String>(), "client");
+ _testSetupMethods("yarn-cluster", new HashMap<String, String>(), null);
+ _testSetupMethods("yarn-client", new HashMap<String, String>(), null);
}
public void testSetupMethodsWithSparkConfiguration() throws Exception {
@@ -91,15 +94,16 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
getJobTrackerUri() + "=" + sparkConfDir.getAbsolutePath());
scs.init(Services.get());
- _testSetupMethods("local[*]", new HashMap<String, String>());
+ _testSetupMethods("local[*]", new HashMap<String, String>(), "client");
Map<String, String> extraSparkOpts = new HashMap<String, String>(2);
extraSparkOpts.put("a", "A");
extraSparkOpts.put("b", "B");
- _testSetupMethods("yarn", extraSparkOpts);
+ _testSetupMethods("yarn-cluster", extraSparkOpts, null);
+ _testSetupMethods("yarn-client", extraSparkOpts, null);
}
@SuppressWarnings("unchecked")
- private void _testSetupMethods(String master, Map<String, String> extraSparkOpts) throws Exception {
+ private void _testSetupMethods(String master, Map<String, String> extraSparkOpts, String mode) throws Exception {
SparkActionExecutor ae = new SparkActionExecutor();
assertEquals(Arrays.asList(SparkMain.class), ae.getLauncherClasses());
@@ -107,7 +111,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
"<job-tracker>" + getJobTrackerUri() + "</job-tracker>" +
"<name-node>" + getNameNodeUri() + "</name-node>" +
"<master>" + master + "</master>" +
- "<mode>client</mode>" +
+ (mode != null ? "<mode>" + mode + "</mode>" : "") +
"<name>Some Name</name>" +
"<class>org.apache.oozie.foo</class>" +
"<jar>" + getNameNodeUri() + "/foo.jar</jar>" +
@@ -126,7 +130,7 @@ public class TestSparkActionExecutor extends ActionExecutorTestCase {
Configuration conf = ae.createBaseHadoopConf(context, actionXml);
ae.setupActionConf(conf, context, actionXml, getFsTestCaseDir());
assertEquals(master, conf.get("oozie.spark.master"));
- assertEquals("client", conf.get("oozie.spark.mode"));
+ assertEquals(mode, conf.get("oozie.spark.mode"));
assertEquals("Some Name", conf.get("oozie.spark.name"));
assertEquals("org.apache.oozie.foo", conf.get("oozie.spark.class"));
assertEquals(getNameNodeUri() + "/foo.jar", conf.get("oozie.spark.jar"));