You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by ro...@apache.org on 2016/09/30 17:38:01 UTC
oozie git commit: OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0
with Oozie (satishsaley via rohini) (cherry picked from commit
a4dbeda9550e4bec5ef5adf0f86f4ea7be6cd155)
Repository: oozie
Updated Branches:
refs/heads/branch-4.3 c24c5e5df -> 4b6607c90
OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0 with Oozie (satishsaley via rohini)
(cherry picked from commit a4dbeda9550e4bec5ef5adf0f86f4ea7be6cd155)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/4b6607c9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/4b6607c9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/4b6607c9
Branch: refs/heads/branch-4.3
Commit: 4b6607c90fd2b9777f95ddc89ea43b0f6e58f111
Parents: c24c5e5
Author: Rohini Palaniswamy <ro...@apache.org>
Authored: Fri Sep 30 10:37:35 2016 -0700
Committer: Rohini Palaniswamy <ro...@yahoo-inc.com>
Committed: Fri Sep 30 10:37:59 2016 -0700
----------------------------------------------------------------------
pom.xml | 24 +++++
release-log.txt | 1 +
sharelib/spark/pom.xml | 4 +-
.../apache/oozie/action/hadoop/SparkMain.java | 105 ++++++++++++++++---
.../oozie/action/hadoop/TestSparkMain.java | 42 ++++++--
5 files changed, 155 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 08b70b6..1cbc3e0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -95,6 +95,8 @@
<pig.classifier></pig.classifier>
<sqoop.version>1.4.3</sqoop.version>
<spark.version>1.6.1</spark.version>
+ <spark.streaming.kafka.version>1.6.1</spark.streaming.kafka.version>
+ <spark.bagel.version>1.6.1</spark.bagel.version>
<spark.guava.version>14.0.1</spark.guava.version>
<spark.scala.binary.version>2.10</spark.scala.binary.version>
<sqoop.classifier>hadoop100</sqoop.classifier>
@@ -1895,5 +1897,27 @@
<module>login</module>
</modules>
</profile>
+ <profile>
+ <id>spark-1</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <properties>
+ <spark.version>1.6.1</spark.version>
+ <spark.streaming.kafka.version>1.6.1</spark.streaming.kafka.version>
+ <spark.bagel.version>1.6.1</spark.bagel.version>
+ </properties>
+ </profile>
+ <profile>
+ <id>spark-2</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <properties>
+ <spark.version>2.0.0</spark.version>
+ <spark.streaming.kafka.version>1.6.2</spark.streaming.kafka.version>
+ <spark.bagel.version>1.6.2</spark.bagel.version>
+ </properties>
+ </profile>
</profiles>
</project>
http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 34eb1ef..da1a0b3 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.3.0 release
+OOZIE-2606 Set spark.yarn.jars to fix Spark 2.0 with Oozie (satishsaley via rohini)
OOZIE-2673 Include XSD for shell-action:0.3 in documentation (abhishekbafna via rkanter)
OOZIE-2194 oozie job -kill doesn't work with spark action (abhishekbafna via rohini)
OOZIE-2501 ZK reentrant lock doesn't work for few cases (puru)
http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/sharelib/spark/pom.xml
----------------------------------------------------------------------
diff --git a/sharelib/spark/pom.xml b/sharelib/spark/pom.xml
index bfce949..d828113 100644
--- a/sharelib/spark/pom.xml
+++ b/sharelib/spark/pom.xml
@@ -202,13 +202,13 @@
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_${spark.scala.binary.version}</artifactId>
- <version>${spark.version}</version>
+ <version>${spark.streaming.kafka.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-bagel_${spark.scala.binary.version}</artifactId>
- <version>${spark.version}</version>
+ <version>${spark.bagel.version}</version>
<scope>compile</scope>
</dependency>
http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 407ba4b..539fb5c 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -18,14 +18,6 @@
package org.apache.oozie.action.hadoop;
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.spark.deploy.SparkSubmit;
-
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@@ -34,10 +26,23 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
+import java.util.jar.JarFile;
+import java.util.jar.Manifest;
import java.util.regex.Pattern;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.PropertyConfigurator;
+import org.apache.spark.deploy.SparkSubmit;
+
public class SparkMain extends LauncherMain {
private static final String MASTER_OPTION = "--master";
private static final String MODE_OPTION = "--deploy-mode";
@@ -56,6 +61,15 @@ public class SparkMain extends LauncherMain {
private static final String SPARK_LOG4J_PROPS = "spark-log4j.properties";
private static final Pattern[] SPARK_JOB_IDS_PATTERNS = {
Pattern.compile("Submitted application (application[0-9_]*)") };
+ public static final Pattern SPARK_ASSEMBLY_JAR_PATTERN = Pattern
+ .compile("^spark-assembly((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
+ public static final Pattern SPARK_YARN_JAR_PATTERN = Pattern
+ .compile("^spark-yarn((?:(-|_|(\\d+\\.))\\d+(?:\\.\\d+)*))*\\.jar$");
+ private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
+ private static final String SPARK_YARN_JAR = "spark.yarn.jar";
+ private static final String SPARK_YARN_JARS = "spark.yarn.jars";
+ private String sparkYarnJar = null;
+ private String sparkVersion = "1.X.X";
public static void main(String[] args) throws Exception {
run(SparkMain.class, args);
}
@@ -179,16 +193,19 @@ public class SparkMain extends LauncherMain {
}
if ((yarnClusterMode || yarnClientMode)) {
- String cachedFiles = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
+ LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
+ String cachedFiles = filterSparkYarnJar(fixedUris);
if (cachedFiles != null && !cachedFiles.isEmpty()) {
sparkArgs.add("--files");
sparkArgs.add(cachedFiles);
}
- String cachedArchives = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+ fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+ String cachedArchives = StringUtils.join(fixedUris, ",");
if (cachedArchives != null && !cachedArchives.isEmpty()) {
sparkArgs.add("--archives");
sparkArgs.add(cachedArchives);
}
+ setSparkYarnJarsConf(sparkArgs);
}
if (!sparkArgs.contains(VERBOSE_OPTION)) {
@@ -384,11 +401,11 @@ public class SparkMain extends LauncherMain {
* @throws IOException
* @throws URISyntaxException
*/
- private String fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
+ private LinkedList<URI> fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
if (files == null) {
return null;
}
- ArrayList<URI> listUris = new ArrayList<URI>();
+ LinkedList<URI> listUris = new LinkedList<URI>();
FileSystem fs = FileSystem.get(new Configuration(true));
for (int i = 0; i < files.length; i++) {
URI fileUri = files[i];
@@ -411,6 +428,68 @@ public class SparkMain extends LauncherMain {
}
}
}
+ return listUris;
+ }
+
+ /**
+ * Filters out the Spark yarn jar and records its version
+ *
+ * @param listUris string containing uris separated by comma
+ * @return
+ * @throws OozieActionConfiguratorException
+ */
+ private String filterSparkYarnJar(LinkedList<URI> listUris) throws OozieActionConfiguratorException {
+ Iterator<URI> iterator = listUris.iterator();
+ File matchedFile = null;
+ while (iterator.hasNext()) {
+ URI uri = iterator.next();
+ Path p = new Path(uri);
+ if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
+ matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
+ }
+ else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
+ matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
+ }
+ if (matchedFile != null) {
+ sparkYarnJar = uri.toString();
+ try {
+ sparkVersion = getJarVersion(matchedFile);
+ System.out.println("Spark Version " + sparkVersion);
+ }
+ catch (IOException io) {
+ System.out.println(
+ "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
+ }
+ iterator.remove();
+ break;
+ }
+ }
return StringUtils.join(listUris, ",");
}
-}
+
+ /**
+ * Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
+ *
+ * @param sparkArgs
+ */
+ private void setSparkYarnJarsConf(List<String> sparkArgs) {
+ if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
+ // In Spark 1.X.X, set spark.yarn.jar to avoid
+ // multiple distribution
+ sparkArgs.add("--conf");
+ sparkArgs.add(SPARK_YARN_JAR + "=" + sparkYarnJar);
+ }
+ else {
+ // In Spark 2.X.X, set spark.yarn.jars
+ sparkArgs.add("--conf");
+ sparkArgs.add(SPARK_YARN_JARS + "=" + sparkYarnJar);
+ }
+ }
+
+ private String getJarVersion(File jarFile) throws IOException {
+ @SuppressWarnings("resource")
+ Manifest manifest = new JarFile(jarFile).getManifest();
+ return manifest.getMainAttributes().getValue("Specification-Version");
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/oozie/blob/4b6607c9/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
index 5ef4649..f044048 100644
--- a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestSparkMain.java
@@ -18,11 +18,6 @@
package org.apache.oozie.action.hadoop;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.oozie.util.IOUtils;
-import org.apache.oozie.util.XConfiguration;
-
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
@@ -30,6 +25,13 @@ import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
+import java.util.ArrayList;
+import java.util.regex.Pattern;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.oozie.util.IOUtils;
+import org.apache.oozie.util.XConfiguration;
public class TestSparkMain extends MainTestCase {
@@ -90,4 +92,32 @@ public class TestSparkMain extends MainTestCase {
assertTrue(getFileSystem().exists(new Path(getFsTestCaseDir() + "/" + OUTPUT)));
return null;
}
-}
+
+ public void testPatterns() {
+ patternHelper("spark-yarn", SparkMain.SPARK_YARN_JAR_PATTERN);
+ patternHelper("spark-assembly", SparkMain.SPARK_ASSEMBLY_JAR_PATTERN);
+ }
+
+ private void patternHelper(String jarName, Pattern pattern) {
+ ArrayList<String> jarList = new ArrayList<String>();
+ jarList.add(jarName + "-1.2.jar");
+ jarList.add(jarName + "-1.2.4.jar");
+ jarList.add(jarName + "1.2.4.jar");
+ jarList.add(jarName + "-1.2.4_1.2.3.4.jar");
+ jarList.add(jarName + ".jar");
+
+ // all should pass
+ for (String s : jarList) {
+ assertTrue(pattern.matcher(s).find());
+ }
+
+ jarList.clear();
+ jarList.add(jarName + "-1.2.3-sources.jar");
+ jarList.add(jarName + "-sources-1.2.3.jar");
+ jarList.add(jarName + "-sources.jar");
+ // all should not pass
+ for (String s : jarList) {
+ assertFalse(pattern.matcher(s).find());
+ }
+ }
+}
\ No newline at end of file