You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by pb...@apache.org on 2017/03/22 11:23:33 UTC
[28/50] [abbrv] oozie git commit: OOZIE-2787 Oozie distributes
application jar twice making the spark job fail (satishsaley)
OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/d249fbf4
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/d249fbf4
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/d249fbf4
Branch: refs/heads/oya
Commit: d249fbf43cd75b97f594330884189e8d8df55ddd
Parents: c9881fc
Author: Satish Subhashrao Saley <sa...@yahoo-inc.com>
Authored: Mon Feb 6 09:40:44 2017 -0800
Committer: Satish Subhashrao Saley <sa...@yahoo-inc.com>
Committed: Mon Feb 6 09:40:44 2017 -0800
----------------------------------------------------------------------
pom.xml | 2 +-
release-log.txt | 1 +
.../apache/oozie/action/hadoop/SparkMain.java | 184 ++++++++++++-------
3 files changed, 122 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/d249fbf4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 15f4195..c00e3b8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1999,7 +1999,7 @@
<activeByDefault>false</activeByDefault>
</activation>
<properties>
- <spark.version>2.0.0</spark.version>
+ <spark.version>2.1.0</spark.version>
<spark.streaming.kafka.version>1.6.2</spark.streaming.kafka.version>
<spark.bagel.version>1.6.2</spark.bagel.version>
</properties>
http://git-wip-us.apache.org/repos/asf/oozie/blob/d249fbf4/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 309de5d..16beee2 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.4.0 release (trunk - unreleased)
+OOZIE-2787 Oozie distributes application jar twice making the spark job fail (satishsaley)
OOZIE-2789 Maven complains about checkstyle error during build (xzheng via abhishekbafna)
OOZIE-2777 Config-default.xml longer than 64k results in java.io.UTFDataFormatException (gezapeti via harsh)
OOZIE-2782 Input logic wait documentation is confusing (puru)
http://git-wip-us.apache.org/repos/asf/oozie/blob/d249fbf4/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 0da74d4..8f2f438 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -72,8 +72,6 @@ public class SparkMain extends LauncherMain {
private static final Pattern SPARK_VERSION_1 = Pattern.compile("^1.*");
private static final String SPARK_YARN_JAR = "spark.yarn.jar";
private static final String SPARK_YARN_JARS = "spark.yarn.jars";
- private String sparkYarnJar = null;
- private String sparkVersion = "1.X.X";
public static void main(String[] args) throws Exception {
run(SparkMain.class, args);
}
@@ -216,19 +214,22 @@ public class SparkMain extends LauncherMain {
}
if ((yarnClusterMode || yarnClientMode)) {
- LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf), jarPath);
- String cachedFiles = filterSparkYarnJar(fixedUris);
+ LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf));
+ JarFilter jarfilter = new JarFilter(fixedUris, jarPath);
+ jarfilter.filter();
+ jarPath = jarfilter.getApplicationJar();
+ String cachedFiles = StringUtils.join(fixedUris, ",");
if (cachedFiles != null && !cachedFiles.isEmpty()) {
sparkArgs.add("--files");
sparkArgs.add(cachedFiles);
}
- fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf), jarPath);
+ fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf));
String cachedArchives = StringUtils.join(fixedUris, ",");
if (cachedArchives != null && !cachedArchives.isEmpty()) {
sparkArgs.add("--archives");
sparkArgs.add(cachedArchives);
}
- setSparkYarnJarsConf(sparkArgs);
+ setSparkYarnJarsConf(sparkArgs, jarfilter.getSparkYarnJar(), jarfilter.getSparkVersion());
}
if (!sparkArgs.contains(VERBOSE_OPTION)) {
@@ -319,7 +320,7 @@ public class SparkMain extends LauncherMain {
* @param fileNamePattern the pattern to look for
* @return the file if there is one else it returns null
*/
- private File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
+ private static File getMatchingFile(Pattern fileNamePattern) throws OozieActionConfiguratorException {
File localDir = new File(".");
for(String fileName : localDir.list()){
if(fileNamePattern.matcher(fileName).find()){
@@ -424,7 +425,7 @@ public class SparkMain extends LauncherMain {
* @throws IOException
* @throws URISyntaxException
*/
- private LinkedList<URI> fixFsDefaultUris(URI[] files, String jarPath) throws IOException, URISyntaxException {
+ private LinkedList<URI> fixFsDefaultUris(URI[] files) throws IOException, URISyntaxException {
if (files == null) {
return null;
}
@@ -432,70 +433,19 @@ public class SparkMain extends LauncherMain {
FileSystem fs = FileSystem.get(new Configuration(true));
for (int i = 0; i < files.length; i++) {
URI fileUri = files[i];
- // Spark compares URIs based on scheme, host and port.
- // Here we convert URIs into the default format so that Spark
- // won't think those belong to different file system.
- // This will avoid an extra copy of files which already exists on
- // same hdfs.
- if (!fileUri.toString().equals(jarPath) && fs.getUri().getScheme().equals(fileUri.getScheme())
- && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
- && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
- || fs.getUri().getPort() == fileUri.getPort())) {
- URI uri = new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(),
- fs.getUri().getPort(), fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
- // Here we skip the application jar, because
- // (if uris are same,) it will get distributed multiple times
- // - one time with --files and another time as application jar.
- if (!uri.toString().equals(jarPath)) {
- listUris.add(uri);
- }
- }
+ listUris.add(getFixedUri(fs, fileUri));
}
return listUris;
}
/**
- * Filters out the Spark yarn jar and records its version
- *
- * @param listUris string containing uris separated by comma
- * @return
- * @throws OozieActionConfiguratorException
- */
- private String filterSparkYarnJar(LinkedList<URI> listUris) throws OozieActionConfiguratorException {
- Iterator<URI> iterator = listUris.iterator();
- File matchedFile = null;
- while (iterator.hasNext()) {
- URI uri = iterator.next();
- Path p = new Path(uri);
- if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
- matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
- }
- else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
- matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
- }
- if (matchedFile != null) {
- sparkYarnJar = uri.toString();
- try {
- sparkVersion = getJarVersion(matchedFile);
- System.out.println("Spark Version " + sparkVersion);
- }
- catch (IOException io) {
- System.out.println(
- "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
- }
- iterator.remove();
- break;
- }
- }
- return StringUtils.join(listUris, ",");
- }
-
- /**
* Sets spark.yarn.jars for Spark 2.X. Sets spark.yarn.jar for Spark 1.X.
*
* @param sparkArgs
+ * @param sparkYarnJar
+ * @param sparkVersion
*/
- private void setSparkYarnJarsConf(List<String> sparkArgs) {
+ private void setSparkYarnJarsConf(List<String> sparkArgs, String sparkYarnJar, String sparkVersion) {
if (SPARK_VERSION_1.matcher(sparkVersion).find()) {
// In Spark 1.X.X, set spark.yarn.jar to avoid
// multiple distribution
@@ -509,7 +459,7 @@ public class SparkMain extends LauncherMain {
}
}
- private String getJarVersion(File jarFile) throws IOException {
+ private static String getJarVersion(File jarFile) throws IOException {
@SuppressWarnings("resource")
Manifest manifest = new JarFile(jarFile).getManifest();
return manifest.getMainAttributes().getValue("Specification-Version");
@@ -521,4 +471,110 @@ public class SparkMain extends LauncherMain {
}
to.append(what);
}
+
+ private static URI getFixedUri(URI fileUri) throws URISyntaxException, IOException {
+ FileSystem fs = FileSystem.get(new Configuration(true));
+ return getFixedUri(fs, fileUri);
+ }
+
+ /**
+ * Spark compares URIs based on scheme, host and port. Here we convert URIs
+ * into the default format so that Spark won't think those belong to
+ * different file system. This will avoid an extra copy of files which
+ * already exists on same hdfs.
+ *
+ * @param fs
+ * @param fileUri
+ * @return fixed uri
+ * @throws URISyntaxException
+ */
+ private static URI getFixedUri(FileSystem fs, URI fileUri) throws URISyntaxException {
+ if (fs.getUri().getScheme().equals(fileUri.getScheme())
+ && (fs.getUri().getHost().equals(fileUri.getHost()) || fileUri.getHost() == null)
+ && (fs.getUri().getPort() == -1 || fileUri.getPort() == -1
+ || fs.getUri().getPort() == fileUri.getPort())) {
+ return new URI(fs.getUri().getScheme(), fileUri.getUserInfo(), fs.getUri().getHost(), fs.getUri().getPort(),
+ fileUri.getPath(), fileUri.getQuery(), fileUri.getFragment());
+ }
+ return fileUri;
+ }
+
+ /**
+ * This class is used for filtering out unwanted jars.
+ */
+ private static class JarFilter {
+ private String sparkVersion = "1.X.X";
+ private String sparkYarnJar;
+ private String applicationJar;
+ private LinkedList<URI> listUris = null;
+
+ /**
+ * @param listUris List of URIs to be filtered
+ * @param jarPath Application jar
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ public JarFilter(LinkedList<URI> listUris, String jarPath) throws URISyntaxException, IOException {
+ this.listUris = listUris;
+ applicationJar = jarPath;
+ Path p = new Path(jarPath);
+ if (p.isAbsolute()) {
+ applicationJar = getFixedUri(p.toUri()).toString();
+ }
+ }
+
+ /**
+ * Filters out the Spark yarn jar and application jar. Also records
+ * spark yarn jar's version.
+ *
+ * @throws OozieActionConfiguratorException
+ */
+ private void filter() throws OozieActionConfiguratorException {
+ Iterator<URI> iterator = listUris.iterator();
+ File matchedFile = null;
+ while (iterator.hasNext()) {
+ URI uri = iterator.next();
+ Path p = new Path(uri);
+ if (SPARK_YARN_JAR_PATTERN.matcher(p.getName()).find()) {
+ matchedFile = getMatchingFile(SPARK_YARN_JAR_PATTERN);
+ }
+ else if (SPARK_ASSEMBLY_JAR_PATTERN.matcher(p.getName()).find()) {
+ matchedFile = getMatchingFile(SPARK_ASSEMBLY_JAR_PATTERN);
+ }
+ if (matchedFile != null) {
+ sparkYarnJar = uri.toString();
+ try {
+ sparkVersion = getJarVersion(matchedFile);
+ System.out.println("Spark Version " + sparkVersion);
+ }
+ catch (IOException io) {
+ System.out.println(
+ "Unable to open " + matchedFile.getPath() + ". Default Spark Version " + sparkVersion);
+ }
+ iterator.remove();
+ matchedFile = null;
+ }
+ // Here we skip the application jar, because
+ // (if uris are same,) it will get distributed multiple times
+ // - one time with --files and another time as application jar.
+ if (p.getName().equals(applicationJar) || uri.toString().equals(applicationJar)) {
+ applicationJar = uri.toString();
+ iterator.remove();
+ }
+ }
+ }
+
+ public String getApplicationJar() {
+ return applicationJar;
+ }
+
+ public String getSparkYarnJar() {
+ return sparkYarnJar;
+ }
+
+ public String getSparkVersion() {
+ return sparkVersion;
+ }
+
+ }
}