You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2016/06/10 17:13:37 UTC

oozie git commit: OOZIE-2547 Add mapreduce.job.cache.files to spark action (satishsaley via rohini)

Repository: oozie
Updated Branches:
  refs/heads/master eee0a4ee4 -> c22364554


OOZIE-2547 Add mapreduce.job.cache.files to spark action (satishsaley via rohini)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c2236455
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c2236455
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c2236455

Branch: refs/heads/master
Commit: c22364554ca51a422319776b89a4c9b727714499
Parents: eee0a4e
Author: Rohini Palaniswamy <ro...@apache.org>
Authored: Fri Jun 10 10:13:25 2016 -0700
Committer: Rohini Palaniswamy <ro...@apache.org>
Committed: Fri Jun 10 10:13:25 2016 -0700

----------------------------------------------------------------------
 .../site/twiki/DG_SparkActionExtension.twiki    |  60 +++++-
 release-log.txt                                 |   1 +
 .../apache/oozie/action/hadoop/SparkMain.java   | 189 ++++++++-----------
 3 files changed, 142 insertions(+), 108 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/docs/src/site/twiki/DG_SparkActionExtension.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/DG_SparkActionExtension.twiki b/docs/src/site/twiki/DG_SparkActionExtension.twiki
index d7d75a1..74875bb 100644
--- a/docs/src/site/twiki/DG_SparkActionExtension.twiki
+++ b/docs/src/site/twiki/DG_SparkActionExtension.twiki
@@ -153,7 +153,7 @@ to navigate to the Oozie Launcher map-reduce job task logs via the Hadoop job-tr
 To ensure that your Spark job shows up in the Spark History Server, make sure to specify these three Spark configuration properties
 either in =spark-opts= with =--conf= or from =oozie.service.SparkConfigurationService.spark.configurations= in oozie-site.xml.
 
-1. spark.yarn.historyServer.address=http://SPH-HOST:18088
+1. spark.yarn.historyServer.address=SPH-HOST:18088
 
 2. spark.eventLog.dir=hdfs://NN:8020/user/spark/applicationHistory
 
@@ -261,6 +261,64 @@ it's localized to the working directory with just its name.
 </xs:schema>
 </verbatim>
 
+---++++ Spark Action Schema Version 0.2
+<verbatim>
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema"
+           xmlns:spark="uri:oozie:spark-action:0.2" elementFormDefault="qualified"
+           targetNamespace="uri:oozie:spark-action:0.2">
+
+    <xs:element name="spark" type="spark:ACTION"/>
+
+    <xs:complexType name="ACTION">
+        <xs:sequence>
+            <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="prepare" type="spark:PREPARE" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="configuration" type="spark:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="master" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="mode" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="name" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="class" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="jar" type="xs:string" minOccurs="1" maxOccurs="1"/>
+            <xs:element name="spark-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
+            <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="CONFIGURATION">
+        <xs:sequence>
+            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+                <xs:complexType>
+                    <xs:sequence>
+                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+                    </xs:sequence>
+                </xs:complexType>
+            </xs:element>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="PREPARE">
+        <xs:sequence>
+            <xs:element name="delete" type="spark:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+            <xs:element name="mkdir" type="spark:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+        </xs:sequence>
+    </xs:complexType>
+
+    <xs:complexType name="DELETE">
+        <xs:attribute name="path" type="xs:string" use="required"/>
+    </xs:complexType>
+
+    <xs:complexType name="MKDIR">
+        <xs:attribute name="path" type="xs:string" use="required"/>
+    </xs:complexType>
+
+</xs:schema>
+</verbatim>
 [[index][::Go back to Oozie Documentation Index::]]
 
 </noautolink>

http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4a556e6..88f66e9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release (trunk - unreleased)
 
+OOZIE-2547 Add mapreduce.job.cache.files to spark action (satishsaley via rohini)
 OOZIE-2550 Flaky tests in TestZKUUIDService.java (pbacsko via rkanter)
 OOZIE-2445 Doc for - Specifying coordinator input datasets in more logical ways (puru)
 OOZIE-2541 Possible resource leak in Hive2Credentials (pbacsko via rkanter)

http://git-wip-us.apache.org/repos/asf/oozie/blob/c2236455/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 13c1075..3acaef9 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -21,6 +21,8 @@ package org.apache.oozie.action.hadoop;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.spark.deploy.SparkSubmit;
 
@@ -28,9 +30,10 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.regex.Pattern;
@@ -43,18 +46,12 @@ public class SparkMain extends LauncherMain {
     private static final String VERBOSE_OPTION = "--verbose";
     private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
     private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
-    private static final String DIST_FILES = "spark.yarn.dist.files=";
-    private static final String JARS_OPTION = "--jars";
     private static final String HIVE_SECURITY_TOKEN = "spark.yarn.security.tokens.hive.enabled";
     private static final String HBASE_SECURITY_TOKEN = "spark.yarn.security.tokens.hbase.enabled";
-
+    private static final String PWD = "$PWD" + File.separator + "*";
     private static final Pattern[] PYSPARK_DEP_FILE_PATTERN = { Pattern.compile("py4\\S*src.zip"),
             Pattern.compile("pyspark.zip") };
     private static final Pattern SPARK_DEFAULTS_FILE_PATTERN = Pattern.compile("spark-defaults.conf");
-
-    private String sparkJars = null;
-    private String sparkClasspath = null;
-
     private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
     private static final Pattern[] SPARK_JOB_IDS_PATTERNS = {
             Pattern.compile("Submitted application (application[0-9_]*)") };
@@ -66,6 +63,7 @@ public class SparkMain extends LauncherMain {
     protected void run(String[] args) throws Exception {
         boolean isPyspark = false;
         Configuration actionConf = loadActionConf();
+
         setYarnTag(actionConf);
         LauncherMainHadoopUtils.killChildYarnJobs(actionConf);
         String logFile = setUpSparkLog4J(actionConf);
@@ -75,6 +73,10 @@ public class SparkMain extends LauncherMain {
         String master = actionConf.get(SparkActionExecutor.SPARK_MASTER);
         sparkArgs.add(master);
 
+        // In local mode, everything runs here in the Launcher Job.
+        // In yarn-client mode, the driver runs here in the Launcher Job and the
+        // executor in Yarn.
+        // In yarn-cluster mode, the driver and executor run in Yarn.
         String sparkDeployMode = actionConf.get(SparkActionExecutor.SPARK_MODE);
         if (sparkDeployMode != null) {
             sparkArgs.add(MODE_OPTION);
@@ -98,33 +100,8 @@ public class SparkMain extends LauncherMain {
         if(jarPath!=null && jarPath.endsWith(".py")){
             isPyspark = true;
         }
-
-        // In local mode, everything runs here in the Launcher Job.
-        // In yarn-client mode, the driver runs here in the Launcher Job and the executor in Yarn.
-        // In yarn-cluster mode, the driver and executor run in Yarn.
-        // Due to this, configuring Spark's classpath is not straightforward (see below)
-
-        // Parse the spark opts.  We need to make sure to pass the necessary jars (Hadoop, Spark, user) to Spark.  The way we do
-        // this depends on what mode is being used:
-        // local/yarn-cluster/yarn-client: Passed as comma-separated list via --jars argument
-        // yarn-cluster/yarn-client: Passed as ':'-separated list via spark.executor.extraClassPath and spark.driver.extraClassPath
-        // yarn-client: Passed as comma-separted list via spark.yarn.dist.files
-        //
-        // --jars will cause the jars to be uploaded to HDFS and localized.  To prevent the Sharelib and user jars from being
-        // unnecessarily reuploaded to HDFS, we use the HDFS paths for these.  The hadoop jars are needed as well, but we'll have
-        // to use local paths for these because they're not in the Sharelib.
-        //
-        // spark.executor.extraClassPath and spark.driver.extraClassPath are blindly used as classpaths, so we need to put only
-        // localized jars.  --jars will cause the jars to be localized to the working directory, so we can simply specify the jar
-        // names for the classpaths, as they'll be found in the working directory.
-        //
-        // This part looks more complicated than it is because we need to append the jars if the user already set things for
-        // these options
-        determineSparkJarsAndClasspath(actionConf, jarPath);
         boolean addedExecutorClasspath = false;
         boolean addedDriverClasspath = false;
-        boolean addedDistFiles = false;
-        boolean addedJars = false;
         boolean addedHiveSecurityToken = false;
         boolean addedHBaseSecurityToken = false;
         String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
@@ -132,29 +109,19 @@ public class SparkMain extends LauncherMain {
             List<String> sparkOptions = splitSparkOpts(sparkOpts);
             for (int i = 0; i < sparkOptions.size(); i++) {
                 String opt = sparkOptions.get(i);
-                if (sparkJars != null) {
-                    if (opt.equals(JARS_OPTION)) {
-                        sparkArgs.add(opt);
-                        i++;
-                        if(i < sparkOptions.size()) {
-                            opt = sparkOptions.get(i);
-                            opt = opt + "," + sparkJars;
-                            addedJars = true;
-                        } else {
-                            throw new OozieActionConfiguratorException(JARS_OPTION + " missing a parameter.");
-                        }
-                    } else if (yarnClientMode && opt.startsWith(DIST_FILES)) {
-                        opt = opt + "," + sparkJars;
-                        addedDistFiles = true;
-                    }
-                }
-                if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+                if (yarnClusterMode || yarnClientMode) {
                     if (opt.startsWith(EXECUTOR_CLASSPATH)) {
-                        opt = opt + File.pathSeparator + sparkClasspath;
+                        // Include the current working directory (of executor
+                        // container) in executor classpath, because it will contain
+                        // localized files
+                        opt = opt + File.pathSeparator + PWD;
                         addedExecutorClasspath = true;
                     }
                     if (opt.startsWith(DRIVER_CLASSPATH)) {
-                        opt = opt + File.pathSeparator + sparkClasspath;
+                        // Include the current working directory (of driver
+                        // container) in executor classpath, because it will contain
+                        // localized files
+                        opt = opt + File.pathSeparator + PWD;
                         addedDriverClasspath = true;
                     }
                 }
@@ -167,25 +134,23 @@ public class SparkMain extends LauncherMain {
                 sparkArgs.add(opt);
             }
         }
-        if (!addedJars && sparkJars != null) {
-            sparkArgs.add("--jars");
-            sparkArgs.add(sparkJars);
-        }
-        if ((yarnClusterMode || yarnClientMode) && sparkClasspath != null) {
+
+        if ((yarnClusterMode || yarnClientMode)) {
             if (!addedExecutorClasspath) {
+                // Include the current working directory (of executor container)
+                // in executor classpath, because it will contain localized
+                // files
                 sparkArgs.add("--conf");
-                sparkArgs.add(EXECUTOR_CLASSPATH + sparkClasspath);
+                sparkArgs.add(EXECUTOR_CLASSPATH + PWD);
             }
             if (!addedDriverClasspath) {
+                // Include the current working directory (of driver container)
+                // in executor classpath, because it will contain localized
+                // files
                 sparkArgs.add("--conf");
-                sparkArgs.add(DRIVER_CLASSPATH + sparkClasspath);
+                sparkArgs.add(DRIVER_CLASSPATH + PWD);
             }
         }
-        if (yarnClientMode && !addedDistFiles && sparkJars != null) {
-            sparkArgs.add("--conf");
-            sparkArgs.add(DIST_FILES + sparkJars);
-        }
-
         sparkArgs.add("--conf");
         sparkArgs.add("spark.executor.extraJavaOptions=-Dlog4j.configuration=" + SPARK_LOG4J_PROPS);
 
@@ -205,6 +170,20 @@ public class SparkMain extends LauncherMain {
             sparkArgs.add("--properties-file");
             sparkArgs.add(SPARK_DEFAULTS_FILE_PATTERN.toString());
         }
+
+        if ((yarnClusterMode || yarnClientMode)) {
+            String cachedFiles = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
+            if (cachedFiles != null && !cachedFiles.isEmpty()) {
+                sparkArgs.add("--files");
+                sparkArgs.add(cachedFiles);
+            }
+            String cachedArchives = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+            if (cachedArchives != null && !cachedArchives.isEmpty()) {
+                sparkArgs.add("--archives");
+                sparkArgs.add(cachedArchives);
+            }
+        }
+
         if (!sparkArgs.contains(VERBOSE_OPTION)) {
             sparkArgs.add(VERBOSE_OPTION);
         }
@@ -217,6 +196,7 @@ public class SparkMain extends LauncherMain {
             createPySparkLibFolder();
         }
 
+
         System.out.println("Spark Action Main class        : " + SparkSubmit.class.getName());
         System.out.println();
         System.out.println("Oozie Spark action configuration");
@@ -299,49 +279,6 @@ public class SparkMain extends LauncherMain {
         SparkSubmit.main(args);
     }
 
-    private void determineSparkJarsAndClasspath(Configuration actionConf, String jarPath) {
-        // distCache gets all of the Sharelib and user jars (from the Distributed Cache)
-        // classpath gets all of the jars from the classpath, which includes the localized jars from the distCache
-        // sparkJars becomes all of the full paths to the Sharelib and user jars from HDFS and the deduped local paths
-        // sparkClasspath becomes all of the jar names in sparkJars (without paths)
-        // We also remove the Spark job jar and job.jar
-        String[] distCache = new String[]{};
-        String dCache = actionConf.get("mapreduce.job.classpath.files");
-        if (dCache != null) {
-            distCache = dCache.split(",");
-        }
-        String[] classpath = System.getProperty("java.class.path").split(File.pathSeparator);
-        StringBuilder cp = new StringBuilder();
-        StringBuilder jars = new StringBuilder();
-        HashSet<String> distCacheJars = new HashSet<String>(distCache.length);
-        for (String path : distCache) {
-            // Skip the job jar because it's already included elsewhere and Spark doesn't like duplicating it here
-            if (!path.equals(jarPath)) {
-                String name = path.substring(path.lastIndexOf("/") + 1);
-                distCacheJars.add(name);
-                cp.append(name).append(File.pathSeparator);
-                jars.append(path).append(",");
-            }
-        }
-        for (String path : classpath) {
-            if (!path.startsWith("job.jar") && path.endsWith(".jar")) {
-                String name = path.substring(path.lastIndexOf("/") + 1);
-                if (!distCacheJars.contains(name)) {
-                    jars.append(path).append(",");
-                }
-                cp.append(name).append(File.pathSeparator);
-            }
-        }
-        if (cp.length() > 0) {
-            cp.setLength(cp.length() - 1);
-            sparkClasspath = cp.toString();
-        }
-        if (jars.length() > 0) {
-            jars.setLength(jars.length() - 1);
-            sparkJars = jars.toString();
-        }
-    }
-
     /**
      * Converts the options to be Spark-compatible.
      * <ul>
@@ -419,4 +356,42 @@ public class SparkMain extends LauncherMain {
         PropertyConfigurator.configure(SPARK_LOG4J_PROPS);
         return logFile;
     }
+
+    /**
+     * Convert URIs into the default format which Spark expects
+     *
+     * @param files
+     * @return
+     * @throws IOException
+     * @throws URISyntaxException
+     */
+    private String fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
+        if (files == null) {
+            return null;
+        }
+        ArrayList<URI> listUris = new ArrayList<URI>();
+        FileSystem fs = FileSystem.get(new Configuration(true));
+        for (int i = 0; i < files.length; i++) {
+            URI fileUri = files[i];
+            // Spark compares URIs based on scheme, host and port.
+            // Here we convert URIs into the default format so that Spark
+            // won't think those belong to different file system.
+            // This will avoid an extra copy of files which already exists on
+            // same hdfs.
+            if (!fileUri.toString().equals(jarPath) && fs.getUri().getScheme().equals(fileUri.getScheme())
+                    && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
+                    && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
+                            || fs.getUri().getPort() == fileUri.getPort())) {
+                URI uri = new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(),
+                        fs.getUri().getPort(), fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
+                // Here we skip the application jar, because
+                // (if uris are same,) it will get distributed multiple times
+                // - one time with --files and another time as application jar.
+                if (!uri.toString().equals(jarPath)) {
+                    listUris.add(uri);
+                }
+            }
+        }
+        return StringUtils.join(listUris, ",");
+    }
 }