You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2016/06/10 17:13:37 UTC
oozie git commit: OOZIE-2547 Add mapreduce.job.cache.files to spark
action (satishsaley via rohini)
Repository: oozie
Updated Branches:
refs/heads/master eee0a4ee4 -> c22364554
OOZIE-2547 Add mapreduce.job.cache.files to spark action (satishsaley via rohini)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c2236455
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c2236455
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c2236455
Branch: refs/heads/master
Commit: c22364554ca51a422319776b89a4c9b727714499
Parents: eee0a4e
Author: Rohini Palaniswamy <ro...@apache.org>
Authored: Fri Jun 10 10:13:25 2016 -0700
Committer: Rohini Palaniswamy <ro...@apache.org>
Committed: Fri Jun 10 10:13:25 2016 -0700
----------------------------------------------------------------------
.../site/twiki/DG_SparkActionExtension.twiki | 60 +++++-
release-log.txt | 1 +
.../apache/oozie/action/hadoop/SparkMain.java | 189 ++++++++-----------
3 files changed, 142 insertions(+), 108 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index d7d75a1..74875bb 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -153,7 +153,7 @@ to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tr
To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties
either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml.
-1. spark.yarn.historyServer.address=http://SPH-HOST:18088
+1. spark.yarn.historyServer.address=SPH-HOST:18088
2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
@@ -261,6 +261,64 @@ it's localized to the working directory with just its name.
</xs:schema>
</verbatim>
+---++++ Spark Action Schema Version 0.2
+<verbatim>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+ xmlns:spark="uri:oozie:spark-action:0.2" elementFormDefault="qualified"
+ targetNamespace="uri:oozie:spark-action:0.2">
+
+ <xs:element name="spark" type="spark:ACTION"/>
+
+ <xs:complexType name="ACTION">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="CONFIGURATION">
+ <xs:sequence>
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PREPARE">
+ <xs:sequence>
+ <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="DELETE">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="MKDIR">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+</xs:schema>
+</verbatim>
[[index][::Go back to Oozie Documentation Index::]]
</noautolink>
http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4a556e6..88f66e9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release (trunk - unreleased)
+OOZIE-2547 Add mapreduce.job.cache.files to spark action (satishsaley via rohini)
OOZIE-2550 Flaky tests in TestZKUUIDService.java (pbacsko via rkanter)
OOZIE-2445 Doc for - Specifying coordinator input datasets in more logical ways (puru)
OOZIE-2541 Possible resource leak in Hive2Credentials (pbacsko via rkanter)
http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 13c1075..3acaef9 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -21,6 +21,8 @@ package org.apache.oozie.action.hadoop;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.log4j.PropertyConfigurator;
import org.apache.spark.deploy.SparkSubmit;
@@ -28,9 +30,10 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
@@ -43,18 +46,12 @@ public class SparkMain extends LauncherMain {
private static final String VERBOSE_OPTION = "--verbose";
private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
- private static final String DIST_FILES = "spark.yarn.dist.files=";
- private static final String JARS_OPTION = "--jars";
private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled";
private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled";
-
+ private static final String PWD = "$PWD" + File.separator + "*";
private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"),
Pattern.compile("pyspark.zip") };
private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf");
-
- private String sparkJars = null;
- private String sparkClasspath = null;
-
private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
private static final Pattern[] SPARK_JOB_IDS_PATTERNS = {
Pattern.compile("Submitted application (application[0-9_]*)") };
@@ -66,6 +63,7 @@ public class SparkMain extends LauncherMain {
protected void run(String[] args) throws Exception {
boolean isPyspark = false;
Configuration actionConf = loadActionConf();
+
setYarnTag(actionConf);
LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
String logFile = setUpSparkLog4J(actionConf);
@@ -75,6 +73,10 @@ public class SparkMain extends LauncherMain {
String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
sparkArgs.add(master);
+ // In local mode, everything runs here in the Launcher Job.
+ // In yarn-client mode, the driver runs here in the Launcher Job and the
+ // executor in Yarn.
+ // In yarn-cluster mode, the driver and executor run in Yarn.
String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
if (sparkDeployMode != null) {
sparkArgs.add(MODE_OPTION);
@@ -98,33 +100,8 @@ public class SparkMain extends LauncherMain {
if(jarPath!=null && jarPath.endsWith(".py")){
isPyspark = true;
}
-
- // In local mode, everything runs here in the Launcher Job.
- // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn.
- // In yarn-cluster mode, the driver and executor run in Yarn.
- // Due to this, configuring Spark's classpath is not straightforward (see below)
-
- // Parse the spark opts. We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark. The way we do
- // this depends on what mode is being used:
- // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument
- // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath
- // yarn-client: Passed as comma-separted list via spark.yarn.dist.files
- //
- // --jars will cause the jars to be uploaded to HDFS and localized. To prevent the Sharelib and user jars from being
- // unnecessarily reuploaded to HDFS, we use the HDFS paths for these. The hadoop jars are needed as well, but we'll have
- // to use local paths for these because they're not in the Sharelib.
- //
- // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only
- // localized jars. --jars will cause the jars to be localized to the working directory, so we can simply specify the jar
- // names for the classpaths, as they'll be found in the working directory.
- //
- // This part looks more complicated than it is because we need to append the jars if the user already set things for
- // these options
- determineSparkJarsAndClasspath(actionConf, jarPath);
boolean addedExecutorClasspath = false;
boolean addedDriverClasspath = false;
- boolean addedDistFiles = false;
- boolean addedJars = false;
boolean addedHiveSecurityToken = false;
boolean addedHBaseSecurityToken = false;
String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
@@ -132,29 +109,19 @@ public class SparkMain extends LauncherMain {
List<String> sparkOptions = splitSparkOpts(sparkOpts);
for (int i = 0; i < sparkOptions.size(); i++) {
String opt = sparkOptions.get(i);
- if (sparkJars != null) {
- if (opt.equals(JARS_OPTION)) {
- sparkArgs.add(opt);
- i++;
- if(i < sparkOptions.size()) {
- opt = sparkOptions.get(i);
- opt = opt + "," + sparkJars;
- addedJars = true;
- } else {
- throw new OozieActionConfiguratorException(JARS_OPTION + " missing a parameter.");
- }
- } else if (yarnClientMode && opt.startsWith(DIST_FILES)) {
- opt = opt + "," + sparkJars;
- addedDistFiles = true;
- }
- }
- if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+ if (yarnClusterMode || yarnClientMode) {
if (opt.startsWith(EXECUTOR_CLASSPATH)) {
- opt = opt + File.pathSeparator + sparkClasspath;
+ // Include the current working directory (of executor
+ // container) in executor classpath, because it will contain
+ // localized files
+ opt = opt + File.pathSeparator + PWD;
addedExecutorClasspath = true;
}
if (opt.startsWith(DRIVER_CLASSPATH)) {
- opt = opt + File.pathSeparator + sparkClasspath;
+ // Include the current working directory (of driver
+ // container) in executor classpath, because it will contain
+ // localized files
+ opt = opt + File.pathSeparator + PWD;
addedDriverClasspath = true;
}
}
@@ -167,25 +134,23 @@ public class SparkMain extends LauncherMain {
sparkArgs.add(opt);
}
}
- if (!addedJars && sparkJars != null) {
- sparkArgs.add("--jars");
- sparkArgs.add(sparkJars);
- }
- if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+
+ if ((yarnClusterMode || yarnClientMode)) {
if (!addedExecutorClasspath) {
+ // Include the current working directory (of executor container)
+ // in executor classpath, because it will contain localized
+ // files
sparkArgs.add("--conf");
- sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath);
+ sparkArgs.add(EXECUTOR_CLASSPATH + PWD);
}
if (!addedDriverClasspath) {
+ // Include the current working directory (of driver container)
+ // in executor classpath, because it will contain localized
+ // files
sparkArgs.add("--conf");
- sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath);
+ sparkArgs.add(DRIVER_CLASSPATH + PWD);
}
}
- if (yarnClientMode && !addedDistFiles && sparkJars != null) {
- sparkArgs.add("--conf");
- sparkArgs.add(DIST_FILES + sparkJars);
- }
-
sparkArgs.add("--conf");
sparkArgs.add("spark.executor.extraJavaOptions=-Dlog4j.configuration=" + SPARK_LOG4J_PROPS);
@@ -205,6 +170,20 @@ public class SparkMain extends LauncherMain {
sparkArgs.add("--properties-file");
sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString());
}
+
+ if ((yarnClusterMode || yarnClientMode)) {
+ String cachedFiles = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
+ if (cachedFiles != null && !cachedFiles.isEmpty()) {
+ sparkArgs.add("--files");
+ sparkArgs.add(cachedFiles);
+ }
+ String cachedArchives = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+ if (cachedArchives != null && !cachedArchives.isEmpty()) {
+ sparkArgs.add("--archives");
+ sparkArgs.add(cachedArchives);
+ }
+ }
+
if (!sparkArgs.contains(VERBOSE_OPTION)) {
sparkArgs.add(VERBOSE_OPTION);
}
@@ -217,6 +196,7 @@ public class SparkMain extends LauncherMain {
createPySparkLibFolder();
}
+
System.out.println("Spark Action Main class : " + SparkSubmit.class.getName());
System.out.println();
System.out.println("Oozie Spark action configuration");
@@ -299,49 +279,6 @@ public class SparkMain extends LauncherMain {
SparkSubmit.main(args);
}
- private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) {
- // distCache gets all of the Sharelib and user jars (from the Distributed Cache)
- // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache
- // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths
- // sparkClasspath becomes all of the jar names in sparkJars (without paths)
- // We also remove the Spark job jar and job.jar
- String[] distCache = new String[]{};
- String dCache = actionConf.get("mapreduce.job.classpath.files");
- if (dCache != null) {
- distCache = dCache.split(",");
- }
- String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator);
- StringBuilder cp = new StringBuilder();
- StringBuilder jars = new StringBuilder();
- HashSet<String> distCacheJars = new HashSet<String>(distCache.length);
- for (String path : distCache) {
- // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here
- if (!path.equals(jarPath)) {
- String name = path.substring(path.lastIndexOf("/") + 1);
- distCacheJars.add(name);
- cp.append(name).append(File.pathSeparator);
- jars.append(path).append(",");
- }
- }
- for (String path : classpath) {
- if (!path.startsWith("job.jar") && path.endsWith(".jar")) {
- String name = path.substring(path.lastIndexOf("/") + 1);
- if (!distCacheJars.contains(name)) {
- jars.append(path).append(",");
- }
- cp.append(name).append(File.pathSeparator);
- }
- }
- if (cp.length() > 0) {
- cp.setLength(cp.length() - 1);
- sparkClasspath = cp.toString();
- }
- if (jars.length() > 0) {
- jars.setLength(jars.length() - 1);
- sparkJars = jars.toString();
- }
- }
-
/**
* Converts the options to be Spark-compatible.
* <ul>
@@ -419,4 +356,42 @@ public class SparkMain extends LauncherMain {
PropertyConfigurator.configure(SPARK_LOG4J_PROPS);
return logFile;
}
+
+ /**
+ * Convert URIs into the default format which Spark expects
+ *
+ * @param files
+ * @return
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ private String fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
+ if (files == null) {
+ return null;
+ }
+ ArrayList<URI> listUris = new ArrayList<URI>();
+ FileSystem fs = FileSystem.get(new Configuration(true));
+ for (int i = 0; i < files.length; i++) {
+ URI fileUri = files[i];
+ // Spark compares URIs based on scheme, host and port.
+ // Here we convert URIs into the default format so that Spark
+ // won't think those belong to different file system.
+ // This will avoid an extra copy of files which already exists on
+ // same hdfs.
+ if (!fileUri.toString().equals(jarPath) && fs.getUri().getScheme().equals(fileUri.getScheme())
+ && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
+ && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
+ || fs.getUri().getPort() == fileUri.getPort())) {
+ URI uri = new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(),
+ fs.getUri().getPort(), fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
+ // Here we skip the application jar, because
+ // (if uris are same,) it will get distributed multiple times
+ // - one time with --files and another time as application jar.
+ if (!uri.toString().equals(jarPath)) {
+ listUris.add(uri);
+ }
+ }
+ }
+ return StringUtils.join(listUris, ",");
+ }
}