You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@oozie.apache.org by rk...@apache.org on 2017/03/03 00:58:19 UTC
oozie git commit: OOZIE-2802 Spark action failure on Spark 2.1.0 due
to duplicate sharelibs (gezapeti via rkanter)
Repository: oozie
Updated Branches:
refs/heads/master 64a58212f -> e8bd9fc92
OOZIE-2802 Spark action failure on Spark 2.1.0 due to duplicate sharelibs (gezapeti via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/e8bd9fc9
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/e8bd9fc9
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/e8bd9fc9
Branch: refs/heads/master
Commit: e8bd9fc9271d07aaa074df3fd45e3b8d51974ce1
Parents: 64a5821
Author: Robert Kanter <rk...@apache.org>
Authored: Thu Mar 2 16:57:50 2017 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Thu Mar 2 16:57:50 2017 -0800
----------------------------------------------------------------------
release-log.txt | 1 +
.../apache/oozie/action/hadoop/SparkMain.java | 61 ++++++++++----
.../TestDuplicateFilteringInSparkMain.java | 87 ++++++++++++++++++++
3 files changed, 133 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index fdf6f2b..6937e24 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 4.4.0 release (trunk - unreleased)
+OOZIE-2802 Spark action failure on Spark 2.1.0 due to duplicate sharelibs (gezapeti via rkanter)
OOZIE-2803 Mask passwords when printing out configs/args in MapReduceMain and SparkMain (pbacsko via rkanter)
OOZIE-2799 Setting log location for spark sql on hive (satishsaley)
OOZIE-2792 Hive2 action is not parsing Spark application ID from log file properly when Hive is on Spark (zhengxb2005 via rkanter)
http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
index 88ac64e..c24d95c 100644
--- a/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
+++ b/sharelib/spark/src/main/java/org/apache/oozie/action/hadoop/SparkMain.java
@@ -27,8 +27,9 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -79,6 +80,8 @@ public class SparkMain extends LauncherMain {
private static final String SPARK_YARN_JAR = "spark.yarn.jar";
private static final String SPARK_YARN_JARS = "spark.yarn.jars";
public static final String HIVE_SITE_CONF = "hive-site.xml";
+ public static final String FILES_OPTION = "--files";
+ public static final String ARCHIVES_OPTION = "--archives";
public static void main(String[] args) throws Exception {
run(SparkMain.class, args);
@@ -135,6 +138,7 @@ public class SparkMain extends LauncherMain {
boolean addedLog4jExecutorSettings = false;
StringBuilder driverClassPath = new StringBuilder();
StringBuilder executorClassPath = new StringBuilder();
+ String userFiles = null, userArchives = null;
String sparkOpts = actionConf.get(SparkActionExecutor.SPARK_OPTS);
if (StringUtils.isNotEmpty(sparkOpts)) {
List<String> sparkOptions = splitSparkOpts(sparkOpts);
@@ -177,6 +181,16 @@ public class SparkMain extends LauncherMain {
addedLog4jDriverSettings = true;
}
}
+ if(opt.startsWith(FILES_OPTION)) {
+ userFiles = sparkOptions.get(i + 1);
+ i++;
+ addToSparkArgs = false;
+ }
+ if(opt.startsWith(ARCHIVES_OPTION)) {
+ userArchives = sparkOptions.get(i + 1);
+ i++;
+ addToSparkArgs = false;
+ }
if(addToSparkArgs) {
sparkArgs.add(opt);
}
@@ -225,19 +239,24 @@ public class SparkMain extends LauncherMain {
}
if ((yarnClusterMode || yarnClientMode)) {
- LinkedList<URI> fixedUris = fixFsDefaultUris(DistributedCache.getCacheFiles(actionConf));
- JarFilter jarfilter = new JarFilter(fixedUris, jarPath);
+ Map<String, URI> fixedFileUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
+ fixedFileUrisMap.put(SPARK_LOG4J_PROPS, new Path(SPARK_LOG4J_PROPS).toUri());
+ fixedFileUrisMap.put(HIVE_SITE_CONF, new Path(HIVE_SITE_CONF).toUri());
+ addUserDefined(userFiles, fixedFileUrisMap);
+ Collection<URI> fixedFileUris = fixedFileUrisMap.values();
+ JarFilter jarfilter = new JarFilter(fixedFileUris, jarPath);
jarfilter.filter();
jarPath = jarfilter.getApplicationJar();
- fixedUris.add(new Path(SPARK_LOG4J_PROPS).toUri());
- fixedUris.add(new Path(HIVE_SITE_CONF).toUri());
- String cachedFiles = StringUtils.join(fixedUris, ",");
+
+ String cachedFiles = StringUtils.join(fixedFileUris, ",");
if (cachedFiles != null && !cachedFiles.isEmpty()) {
sparkArgs.add("--files");
sparkArgs.add(cachedFiles);
}
- fixedUris = fixFsDefaultUris(DistributedCache.getCacheArchives(actionConf));
- String cachedArchives = StringUtils.join(fixedUris, ",");
+ Map<String, URI> fixedArchiveUrisMap = fixFsDefaultUrisAndFilterDuplicates(DistributedCache.
+ getCacheArchives(actionConf));
+ addUserDefined(userArchives, fixedArchiveUrisMap);
+ String cachedArchives = StringUtils.join(fixedArchiveUrisMap.values(), ",");
if (cachedArchives != null && !cachedArchives.isEmpty()) {
sparkArgs.add("--archives");
sparkArgs.add(cachedArchives);
@@ -277,6 +296,15 @@ public class SparkMain extends LauncherMain {
}
}
+ private void addUserDefined(String userList, Map<String, URI> urisMap) {
+ if(userList != null) {
+ for (String file : userList.split(",")) {
+ Path p = new Path(file);
+ urisMap.put(p.getName(), p.toUri());
+ }
+ }
+ }
+
private void prepareHadoopConfig(Configuration actionConf) throws IOException {
// Copying oozie.action.conf.xml into hadoop configuration *-site files.
if (actionConf.getBoolean(CONF_OOZIE_SPARK_SETUP_HADOOP_CONF_DIR, false)) {
@@ -433,23 +461,24 @@ public class SparkMain extends LauncherMain {
/**
* Convert URIs into the default format which Spark expects
- *
+ * Also filters out duplicate entries
* @param files
* @return
* @throws IOException
* @throws URISyntaxException
*/
- private LinkedList<URI> fixFsDefaultUris(URI[] files) throws IOException, URISyntaxException {
+ static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(URI[] files) throws IOException, URISyntaxException {
+ Map<String, URI> map= new HashMap<>();
if (files == null) {
- return null;
+ return map;
}
- LinkedList<URI> listUris = new LinkedList<URI>();
FileSystem fs = FileSystem.get(new Configuration(true));
for (int i = 0; i < files.length; i++) {
URI fileUri = files[i];
- listUris.add(getFixedUri(fs, fileUri));
+ Path p = new Path(fileUri);
+ map.put(p.getName(), getFixedUri(fs, fileUri));
}
- return listUris;
+ return map;
}
/**
@@ -573,7 +602,7 @@ public class SparkMain extends LauncherMain {
private String sparkVersion = "1.X.X";
private String sparkYarnJar;
private String applicationJar;
- private LinkedList<URI> listUris = null;
+ private Collection<URI> listUris = null;
/**
* @param listUris List of URIs to be filtered
@@ -581,7 +610,7 @@ public class SparkMain extends LauncherMain {
* @throws IOException
* @throws URISyntaxException
*/
- public JarFilter(LinkedList<URI> listUris, String jarPath) throws URISyntaxException, IOException {
+ public JarFilter(Collection<URI> listUris, String jarPath) throws URISyntaxException, IOException {
this.listUris = listUris;
applicationJar = jarPath;
Path p = new Path(jarPath);
http://git-wip-us.apache.org/repos/asf/oozie/blob/e8bd9fc9/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java
----------------------------------------------------------------------
diff --git a/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java
new file mode 100644
index 0000000..9a231b1
--- /dev/null
+++ b/sharelib/spark/src/test/java/org/apache/oozie/action/hadoop/TestDuplicateFilteringInSparkMain.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.oozie.action.hadoop;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.junit.Assert.*;
+import static org.hamcrest.CoreMatchers.*;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+@RunWith(Parameterized.class)
+public class TestDuplicateFilteringInSparkMain {
+
+ private static final String PREFIX = "hdfs://namenode.address:8020/folder/";
+
+ static List<URI> getURIs(String... uriStrings) throws URISyntaxException {
+ URI[] uris = new URI[uriStrings.length];
+ for (int i = 0; i != uriStrings.length; ++i) {
+ uris[i] = new URI(PREFIX + uriStrings[i]);
+ }
+ return Arrays.asList(uris);
+ }
+
+ static Object[] testCase(List<URI> inputs, List<URI> expectedOutputs) {
+ return new Object[] {inputs, expectedOutputs};
+ }
+
+ @Parameterized.Parameters
+ public static List<Object[]> params() throws Exception {
+ return Arrays.asList(
+ testCase(getURIs("file.io"),
+ getURIs("file.io")),
+
+ testCase(getURIs("file.io", "file.io", "file.io"),
+ getURIs("file.io")),
+
+ testCase(getURIs("file.io", "file3.io", "file.io"),
+ getURIs("file.io", "file3.io")),
+
+ testCase(getURIs("file.io", "file3.io", "file2.io"),
+ getURIs("file.io", "file2.io", "file3.io"))
+ );
+ }
+
+ private List<URI> input;
+
+ private List<URI> expectedOutput;
+
+ public TestDuplicateFilteringInSparkMain(List<URI> input, List<URI> result) {
+ this.input = input;
+ this.expectedOutput = result;
+ }
+
+ @Test
+ public void test() throws Exception{
+ Map<String, URI> uriMap = SparkMain.fixFsDefaultUrisAndFilterDuplicates(input.toArray(new URI[input.size()]));
+ assertThat("Duplicate filtering failed for >>" + input + "<<", uriMap.size(), is(expectedOutput.size()));
+ List<URI> outputList = Arrays.asList(uriMap.values().toArray(new URI[0]));
+ Collections.sort(outputList);
+ assertThat("Files are different in result ", outputList, is(expectedOutput));
+ }
+
+}