You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2016/09/30 17:38:01 UTC

oozie git commit: OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0 with Oozie (satishsaley via rohini) (cherry picked from commit a4dbeda9550e4bec5ef5adf0f86f4ea7be6cd155)

Repository: oozie
Updated Branches:
  refs/heads/branch-4.3 c24c5e5df -> 4b6607c90


OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0 with Oozie (satishsaley via rohini)
(cherry picked from commit a4dbeda9550e4bec5ef5adf0f86f4ea7be6cd155)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4b6607c9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4b6607c9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4b6607c9

Branch: refs/heads/branch-4.3
Commit: 4b6607c90fd2b9777f95ddc89ea43b0f6e58f111
Parents: c24c5e5
Author: Rohini Palaniswamy <ro...@apache.org>
Authored: Fri Sep 30 10:37:35 2016 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Fri Sep 30 10:37:59 2016 -0700

----------------------------------------------------------------------
 pom.xml                                         |  24 +++++
 release-log.txt                                 |   1 +
 sharelib/spark/pom.xml                          |   4 +-
 .../apache/oozie/action/hadoop/SparkMain.java   | 105 ++++++++++++++++---
 .../oozie/action/hadoop/TestSparkMain.java      |  42 ++++++--
 5 files changed, 155 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 08b70b6..1cbc3e0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,8 @@
          <pig.classifier></pig.classifier>
          <sqoop.version>1.4.3</sqoop.version>
          <spark.version>1.6.1</spark.version>
+         <spark.streaming.kafka.version>1.6.1</spark.streaming.kafka.version>
+         <spark.bagel.version>1.6.1</spark.bagel.version>
          <spark.guava.version>14.0.1</spark.guava.version>
          <spark.scala.binary.version>2.10</spark.scala.binary.version>
          <sqoop.classifier>hadoop100</sqoop.classifier>
@@ -1895,5 +1897,27 @@
                 <module>login</module>
             </modules>
         </profile>
+        <profile>
+            <id>spark-1</id>
+            <activation>
+                <activeByDefault>true</activeByDefault>
+            </activation>
+            <properties>
+                <spark.version>1.6.1</spark.version>
+                <spark.streaming.kafka.version>1.6.1</spark.streaming.kafka.version>
+                <spark.bagel.version>1.6.1</spark.bagel.version>
+            </properties>
+        </profile>
+        <profile>
+            <id>spark-2</id>
+            <activation>
+                <activeByDefault>false</activeByDefault>
+            </activation>
+            <properties>
+                <spark.version>2.0.0</spark.version>
+                <spark.streaming.kafka.version>1.6.2</spark.streaming.kafka.version>
+                <spark.bagel.version>1.6.2</spark.bagel.version>
+            </properties>
+        </profile>
     </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 34eb1ef..da1a0b3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 4.3.0 release
 
+OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0 with Oozie (satishsaley via rohini)
 OOZIE-2673 Include XSD for shell-action:0.3 in documentation (abhishekbafna via rkanter)
 OOZIE-2194 oozie job -kill doesn't work with spark action (abhishekbafna via rohini)
 OOZIE-2501 ZK reentrant lock doesn't work for few cases (puru)

http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index bfce949..d828113 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -202,13 +202,13 @@
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-streaming-kafka_${spark.scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
+            <version>${spark.streaming.kafka.version}</version>
             <scope>compile</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.spark</groupId>
             <artifactId>spark-bagel_${spark.scala.binary.version}</artifactId>
-            <version>${spark.version}</version>
+            <version>${spark.bagel.version}</version>
             <scope>compile</scope>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 407ba4b..539fb5c 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -18,14 +18,6 @@
 
 package org.apache.oozie.action.hadoop;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.spark.deploy.SparkSubmit;
-
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
@@ -34,10 +26,23 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.spark.deploy.SparkSubmit;
+
 public class SparkMain extends LauncherMain {
     private static final String MASTER_OPTION = "--master";
     private static final String MODE_OPTION = "--deploy-mode";
@@ -56,6 +61,15 @@ public class SparkMain extends LauncherMain {
     private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
     private static final Pattern[] SPARK_JOB_IDS_PATTERNS = {
             Pattern.compile("Submitted application (application[0-9_]*)") };
+    public static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern
+            .compile("^spark-assembly((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
+    public static final Pattern SPARK_YARN_JAR_PATTERN = Pattern
+            .compile("^spark-yarn((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
+    private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
+    private static final String SPARK_YARN_JAR = "spark.yarn.jar";
+    private static final String SPARK_YARN_JARS = "spark.yarn.jars";
+    private String sparkYarnJar = null;
+    private String sparkVersion = "1.X.X";
     public static void main(String[] args) throws Exception {
         run(SparkMain.class, args);
     }
@@ -179,16 +193,19 @@ public class SparkMain extends LauncherMain {
         }
 
         if ((yarnClusterMode || yarnClientMode)) {
-            String cachedFiles = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
+            LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
+            String cachedFiles = filterSparkYarnJar(fixedUris);
             if (cachedFiles != null && !cachedFiles.isEmpty()) {
                 sparkArgs.add("--files");
                 sparkArgs.add(cachedFiles);
             }
-            String cachedArchives = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+            fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+            String cachedArchives = StringUtils.join(fixedUris, ",");
             if (cachedArchives != null && !cachedArchives.isEmpty()) {
                 sparkArgs.add("--archives");
                 sparkArgs.add(cachedArchives);
             }
+            setSparkYarnJarsConf(sparkArgs);
         }
 
         if (!sparkArgs.contains(VERBOSE_OPTION)) {
@@ -384,11 +401,11 @@ public class SparkMain extends LauncherMain {
      * @throws IOException
      * @throws URISyntaxException
      */
-    private String fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
+    private LinkedList<URI> fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
         if (files == null) {
             return null;
         }
-        ArrayList<URI> listUris = new ArrayList<URI>();
+        LinkedList<URI> listUris = new LinkedList<URI>();
         FileSystem fs = FileSystem.get(new Configuration(true));
         for (int i = 0; i < files.length; i++) {
             URI fileUri = files[i];
@@ -411,6 +428,68 @@ public class SparkMain extends LauncherMain {
                 }
             }
         }
+        return listUris;
+    }
+
+    /**
+     * Filters out the Spark yarn jar and records its version
+     *
+     * @param listUris string containing uris separated by comma
+     * @return
+     * @throws OozieActionConfiguratorException
+     */
+    private String filterSparkYarnJar(LinkedList<URI> listUris) throws OozieActionConfiguratorException {
+        Iterator<URI> iterator = listUris.iterator();
+        File matchedFile = null;
+        while (iterator.hasNext()) {
+            URI uri = iterator.next();
+            Path p = new Path(uri);
+            if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
+                matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
+            }
+            else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
+                matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
+            }
+            if (matchedFile != null) {
+                sparkYarnJar = uri.toString();
+                try {
+                    sparkVersion = getJarVersion(matchedFile);
+                    System.out.println("Spark Version " + sparkVersion);
+                }
+                catch (IOException io) {
+                    System.out.println(
+                            "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
+                }
+                iterator.remove();
+                break;
+            }
+        }
         return StringUtils.join(listUris, ",");
     }
-}
+
+    /**
+     * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
+     *
+     * @param sparkArgs
+     */
+    private void setSparkYarnJarsConf(List<String> sparkArgs) {
+        if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
+            // In Spark 1.X.X, set spark.yarn.jar to avoid
+            // multiple distribution
+            sparkArgs.add("--conf");
+            sparkArgs.add(SPARK_YARN_JAR + "=" + sparkYarnJar);
+                }
+        else {
+            // In Spark 2.X.X, set spark.yarn.jars
+            sparkArgs.add("--conf");
+            sparkArgs.add(SPARK_YARN_JARS + "=" + sparkYarnJar);
+        }
+    }
+
+    private String getJarVersion(File jarFile) throws IOException {
+        @SuppressWarnings("resource")
+        Manifest manifest = new JarFile(jarFile).getManifest();
+        return manifest.getMainAttributes().getValue("Specification-Version");
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
index 5ef4649..f044048 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
@@ -18,11 +18,6 @@
 
 package org.apache.oozie.action.hadoop;
 
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XConfiguration;
-
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
@@ -30,6 +25,13 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Writer;
+import java.util.ArrayList;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
 
 public class TestSparkMain extends MainTestCase {
 
@@ -90,4 +92,32 @@ public class TestSparkMain extends MainTestCase {
         assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT)));
         return null;
     }
-}
+
+    public void testPatterns() {
+        patternHelper("spark-yarn", SparkMain.SPARK_YARN_JAR_PATTERN);
+        patternHelper("spark-assembly", SparkMain.SPARK_ASSEMBLY_JAR_PATTERN);
+    }
+
+    private void patternHelper(String jarName, Pattern pattern) {
+        ArrayList<String> jarList = new ArrayList<String>();
+        jarList.add(jarName + "-1.2.jar");
+        jarList.add(jarName + "-1.2.4.jar");
+        jarList.add(jarName + "1.2.4.jar");
+        jarList.add(jarName + "-1.2.4_1.2.3.4.jar");
+        jarList.add(jarName + ".jar");
+
+        // all should pass
+        for (String s : jarList) {
+            assertTrue(pattern.matcher(s).find());
+        }
+
+        jarList.clear();
+        jarList.add(jarName + "-1.2.3-sources.jar");
+        jarList.add(jarName + "-sources-1.2.3.jar");
+        jarList.add(jarName + "-sources.jar");
+        // all should not pass
+        for (String s : jarList) {
+            assertFalse(pattern.matcher(s).find());
+        }
+    }
+}
\ No newline at end of file