You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/03/22 11:23:33 UTC

[28/50] [abbrv] oozie git commit: OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley)

OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d249fbf4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d249fbf4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d249fbf4

Branch: refs/heads/oya
Commit: d249fbf43cd75b97f594330884189e8d8df55ddd
Parents: c9881fc
Author: Satish Subhashrao Saley <sa...@yahoo-inc.com>
Authored: Mon Feb 6 09:40:44 2017 -0800
Committer: Satish Subhashrao Saley <sa...@yahoo-inc.com>
Committed: Mon Feb 6 09:40:44 2017 -0800

----------------------------------------------------------------------
 pom.xml                                         |   2 +-
 release-log.txt                                 |   1 +
 .../apache/oozie/action/hadoop/SparkMain.java   | 184 ++++++++++++-------
 3 files changed, 122 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/d249fbf4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 15f4195..c00e3b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1999,7 +1999,7 @@
                 <activeByDefault>false</activeByDefault>
             </activation>
             <properties>
-                <spark.version>2.0.0</spark.version>
+                <spark.version>2.1.0</spark.version>
                 <spark.streaming.kafka.version>1.6.2</spark.streaming.kafka.version>
                 <spark.bagel.version>1.6.2</spark.bagel.version>
             </properties>

http://git-wip-us.apache.org/repos/asf/oozie/blob/d249fbf4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 309de5d..16beee2 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.4.0 release (trunk - unreleased)
 
+OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley)
 OOZIE-2789 Maven complains about checkstyle error during build (xzheng via abhishekbafna)
 OOZIE-2777 Config-default.xml longer than 64k results in java.io.UTFDataFormatException (gezapeti via harsh)
 OOZIE-2782 Input logic wait documentation is confusing (puru)

http://git-wip-us.apache.org/repos/asf/oozie/blob/d249fbf4/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 0da74d4..8f2f438 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -72,8 +72,6 @@ public class SparkMain extends LauncherMain {
     private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
     private static final String SPARK_YARN_JAR = "spark.yarn.jar";
     private static final String SPARK_YARN_JARS = "spark.yarn.jars";
-    private String sparkYarnJar = null;
-    private String sparkVersion = "1.X.X";
     public static void main(String[] args) throws Exception {
         run(SparkMain.class, args);
     }
@@ -216,19 +214,22 @@ public class SparkMain extends LauncherMain {
         }
 
         if ((yarnClusterMode || yarnClientMode)) {
-            LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
-            String cachedFiles = filterSparkYarnJar(fixedUris);
+            LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf));
+            JarFilter jarfilter = new JarFilter(fixedUris, jarPath);
+            jarfilter.filter();
+            jarPath = jarfilter.getApplicationJar();
+            String cachedFiles = StringUtils.join(fixedUris, ",");
             if (cachedFiles != null && !cachedFiles.isEmpty()) {
                 sparkArgs.add("--files");
                 sparkArgs.add(cachedFiles);
             }
-            fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+            fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf));
             String cachedArchives = StringUtils.join(fixedUris, ",");
             if (cachedArchives != null && !cachedArchives.isEmpty()) {
                 sparkArgs.add("--archives");
                 sparkArgs.add(cachedArchives);
             }
-            setSparkYarnJarsConf(sparkArgs);
+            setSparkYarnJarsConf(sparkArgs, jarfilter.getSparkYarnJar(), jarfilter.getSparkVersion());
         }
 
         if (!sparkArgs.contains(VERBOSE_OPTION)) {
@@ -319,7 +320,7 @@ public class SparkMain extends LauncherMain {
      * @param fileNamePattern the pattern to look for
      * @return the file if there is one else it returns null
      */
-    private File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
+    private static File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
         File localDir = new File(".");
         for(String fileName : localDir.list()){
             if(fileNamePattern.matcher(fileName).find()){
@@ -424,7 +425,7 @@ public class SparkMain extends LauncherMain {
      * @throws IOException
      * @throws URISyntaxException
      */
-    private LinkedList<URI> fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
+    private LinkedList<URI> fixFsDefaultUris(URI[] files) throws IOException, URISyntaxException {
         if (files == null) {
             return null;
         }
@@ -432,70 +433,19 @@ public class SparkMain extends LauncherMain {
         FileSystem fs = FileSystem.get(new Configuration(true));
         for (int i = 0; i < files.length; i++) {
             URI fileUri = files[i];
-            // Spark compares URIs based on scheme, host and port.
-            // Here we convert URIs into the default format so that Spark
-            // won't think those belong to different file system.
-            // This will avoid an extra copy of files which already exists on
-            // same hdfs.
-            if (!fileUri.toString().equals(jarPath) && fs.getUri().getScheme().equals(fileUri.getScheme())
-                    && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
-                    && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
-                            || fs.getUri().getPort() == fileUri.getPort())) {
-                URI uri = new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(),
-                        fs.getUri().getPort(), fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
-                // Here we skip the application jar, because
-                // (if uris are same,) it will get distributed multiple times
-                // - one time with --files and another time as application jar.
-                if (!uri.toString().equals(jarPath)) {
-                    listUris.add(uri);
-                }
-            }
+            listUris.add(getFixedUri(fs, fileUri));
         }
         return listUris;
     }
 
     /**
-     * Filters out the Spark yarn jar and records its version
-     *
-     * @param listUris string containing uris separated by comma
-     * @return
-     * @throws OozieActionConfiguratorException
-     */
-    private String filterSparkYarnJar(LinkedList<URI> listUris) throws OozieActionConfiguratorException {
-        Iterator<URI> iterator = listUris.iterator();
-        File matchedFile = null;
-        while (iterator.hasNext()) {
-            URI uri = iterator.next();
-            Path p = new Path(uri);
-            if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
-                matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
-            }
-            else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
-                matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
-            }
-            if (matchedFile != null) {
-                sparkYarnJar = uri.toString();
-                try {
-                    sparkVersion = getJarVersion(matchedFile);
-                    System.out.println("Spark Version " + sparkVersion);
-                }
-                catch (IOException io) {
-                    System.out.println(
-                            "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
-                }
-                iterator.remove();
-                break;
-            }
-        }
-        return StringUtils.join(listUris, ",");
-    }
-
-    /**
      * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
      *
      * @param sparkArgs
+     * @param sparkYarnJar
+     * @param sparkVersion
      */
-    private void setSparkYarnJarsConf(List<String> sparkArgs) {
+    private void setSparkYarnJarsConf(List<String> sparkArgs, String sparkYarnJar, String sparkVersion) {
         if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
             // In Spark 1.X.X, set spark.yarn.jar to avoid
             // multiple distribution
@@ -509,7 +459,7 @@ public class SparkMain extends LauncherMain {
         }
     }
 
-    private String getJarVersion(File jarFile) throws IOException {
+    private static String getJarVersion(File jarFile) throws IOException {
         @SuppressWarnings("resource")
         Manifest manifest = new JarFile(jarFile).getManifest();
         return manifest.getMainAttributes().getValue("Specification-Version");
@@ -521,4 +471,110 @@ public class SparkMain extends LauncherMain {
         }
         to.append(what);
     }
+
+    private static URI getFixedUri(URI fileUri) throws URISyntaxException, IOException {
+        FileSystem fs = FileSystem.get(new Configuration(true));
+        return getFixedUri(fs, fileUri);
+    }
+
+    /**
+     * Spark compares URIs based on scheme, host and port. Here we convert URIs
+     * into the default format so that Spark won't think those belong to
+     * different file system. This will avoid an extra copy of files which
+     * already exists on same hdfs.
+     *
+     * @param fs
+     * @param fileUri
+     * @return fixed uri
+     * @throws URISyntaxException
+     */
+    private static URI getFixedUri(FileSystem fs, URI fileUri) throws URISyntaxException {
+        if (fs.getUri().getScheme().equals(fileUri.getScheme())
+                && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
+                && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
+                        || fs.getUri().getPort() == fileUri.getPort())) {
+            return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(),
+                    fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
+        }
+        return fileUri;
+    }
+
+    /**
+     * This class is used for filtering out unwanted jars.
+     */
+    private static class JarFilter {
+        private String sparkVersion = "1.X.X";
+        private String sparkYarnJar;
+        private String applicationJar;
+        private LinkedList<URI> listUris = null;
+
+        /**
+         * @param listUris List of URIs to be filtered
+         * @param jarPath Application jar
+         * @throws IOException
+         * @throws URISyntaxException
+         */
+        public JarFilter(LinkedList<URI> listUris, String jarPath) throws URISyntaxException, IOException {
+            this.listUris = listUris;
+            applicationJar = jarPath;
+            Path p = new Path(jarPath);
+            if (p.isAbsolute()) {
+                applicationJar = getFixedUri(p.toUri()).toString();
+            }
+        }
+
+        /**
+         * Filters out the Spark yarn jar and application jar. Also records
+         * spark yarn jar's version.
+         *
+         * @throws OozieActionConfiguratorException
+         */
+        private void filter() throws OozieActionConfiguratorException {
+            Iterator<URI> iterator = listUris.iterator();
+            File matchedFile = null;
+            while (iterator.hasNext()) {
+                URI uri = iterator.next();
+                Path p = new Path(uri);
+                if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
+                    matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
+                }
+                else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
+                    matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
+                }
+                if (matchedFile != null) {
+                    sparkYarnJar = uri.toString();
+                    try {
+                        sparkVersion = getJarVersion(matchedFile);
+                        System.out.println("Spark Version " + sparkVersion);
+                    }
+                    catch (IOException io) {
+                        System.out.println(
+                                "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
+                    }
+                    iterator.remove();
+                    matchedFile = null;
+                }
+                // Here we skip the application jar, because
+                // (if uris are same,) it will get distributed multiple times
+                // - one time with --files and another time as application jar.
+                if (p.getName().equals(applicationJar) || uri.toString().equals(applicationJar)) {
+                    applicationJar = uri.toString();
+                    iterator.remove();
+                }
+            }
+        }
+
+        public String getApplicationJar() {
+            return applicationJar;
+        }
+
+        public String getSparkYarnJar() {
+            return sparkYarnJar;
+        }
+
+        public String getSparkVersion() {
+            return sparkVersion;
+        }
+
+    }
 }