You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/05 09:30:58 UTC
[zeppelin] branch master updated: [ZEPPELIN-3986]. Cannot access
any JAR in yarn cluster mode
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new 78c49e9 [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode
78c49e9 is described below
commit 78c49e923e6c6ac228d259b20d67217db1b4315d
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Feb 15 21:00:25 2019 +0800
[ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode
### What is this PR for?
User specified jars is missing in yarn-cluster mode due to we didn't detect the user jar correctly. This PR fix the detecting jar logic in `BaseSparkScalaInterpreter`.
### What type of PR is it?
[Bug Fix]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://jira.apache.org/jira/browse/ZEPPELIN-3986
### How should this be tested?
* System integration test is added into SparkIntegrationTest, we tested the case of `spark.jars` & `spark.jars.packages`
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3308 from zjffdu/ZEPPELIN-3986 and squashes the following commits:
49a59b2a1 [Jeff Zhang] [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode
---
.../zeppelin/spark/NewSparkInterpreterTest.java | 28 ----------------
.../zeppelin/spark/BaseSparkScalaInterpreter.scala | 38 ++++++++++++++--------
zeppelin-interpreter-integration/pom.xml | 12 +++++++
.../interpreter/integration/DummyClass.java | 21 ++++++++++++
.../zeppelin/integration/SparkIntegrationTest.java | 25 +++++++++++---
5 files changed, 78 insertions(+), 46 deletions(-)
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 54835ae..7c7dad9 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -370,34 +370,6 @@ public class NewSparkInterpreterTest {
interpretThread.join();
}
- @Test
- public void testDependencies() throws IOException, InterpreterException {
- Properties properties = new Properties();
- properties.setProperty("spark.master", "local");
- properties.setProperty("spark.app.name", "test");
- properties.setProperty("zeppelin.spark.maxResult", "100");
- properties.setProperty("zeppelin.spark.useNew", "true");
- // disable color output for easy testing
- properties.setProperty("zeppelin.spark.scala.color", "false");
-
- // download spark-avro jar
- URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar");
- ReadableByteChannel rbc = Channels.newChannel(website.openStream());
- File avroJarFile = new File("spark-avro_2.11-3.2.0.jar");
- FileOutputStream fos = new FileOutputStream(avroJarFile);
- fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
-
- properties.setProperty("spark.jars", avroJarFile.getAbsolutePath());
-
- interpreter = new SparkInterpreter(properties);
- assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
- interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
- interpreter.open();
-
- InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- }
-
//TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed.
@Ignore
public void testDepInterpreter() throws InterpreterException {
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 183dee6..cd241a8 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -19,6 +19,8 @@ package org.apache.zeppelin.spark
import java.io.File
+import java.net.URLClassLoader
+import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.sql.SQLContext
@@ -35,6 +37,7 @@ import scala.util.control.NonFatal
/**
* Base class for different scala versions of SparkInterpreter. It should be
* binary compatible between multiple scala versions.
+ *
* @param conf
* @param depFiles
*/
@@ -86,6 +89,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
def interpret(code: String, context: InterpreterContext): InterpreterResult = {
val originalOut = System.out
+
def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
Console.withOut(interpreterOutput) {
System.setOut(Console.out)
@@ -236,7 +240,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
- || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
+ || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
val hiveSiteExisted: Boolean =
Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
val hiveClassesPresent =
@@ -370,20 +374,26 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
}
protected def getUserJars(): Seq[String] = {
- val sparkJars = conf.getOption("spark.jars").map(_.split(","))
- .map(_.filter(_.nonEmpty)).toSeq.flatten
- val depJars = depFiles.asScala.filter(_.endsWith(".jar"))
- // add zeppelin spark interpreter jar
- val zeppelinInterpreterJarURL = getClass.getProtectionDomain.getCodeSource.getLocation
- // zeppelinInterpreterJarURL might be a folder when under unit testing
- val result = if (new File(zeppelinInterpreterJarURL.getFile).isDirectory) {
- sparkJars ++ depJars
- } else {
- sparkJars ++ depJars ++ Seq(zeppelinInterpreterJarURL.getFile)
+ var classLoader = Thread.currentThread().getContextClassLoader
+ var extraJars = Seq.empty[String]
+ while (classLoader != null) {
+ if (classLoader.getClass.getCanonicalName ==
+ "org.apache.spark.util.MutableURLClassLoader") {
+ extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+ // Check if the file exists.
+ .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+ // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+ .filterNot {
+ u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+ }
+ .map(url => url.toString).toSeq
+ classLoader = null
+ } else {
+ classLoader = classLoader.getParent
+ }
}
- conf.set("spark.jars", result.mkString(","))
- LOGGER.debug("User jar for spark repl: " + conf.get("spark.jars"))
- result
+ LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+ extraJars
}
protected def getUserFiles(): Seq[String] = {
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index 09c9710..6140925 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -95,6 +95,18 @@
</exclusions>
</dependency>
+ <dependency>
+ <groupId>org.apache.maven</groupId>
+ <artifactId>maven-model</artifactId>
+ <version>3.0.3</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.codehaus.plexus</groupId>
+ <artifactId>plexus-utils</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
<!--test libraries-->
<dependency>
<groupId>junit</groupId>
diff --git a/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
new file mode 100644
index 0000000..1df4618
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.integration;
+
+public class DummyClass {
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 03a482d..94a6a40 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.maven.model.Model;
+import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
import org.apache.zeppelin.interpreter.Interpreter;
import org.apache.zeppelin.interpreter.InterpreterContext;
import org.apache.zeppelin.interpreter.InterpreterException;
@@ -29,12 +31,15 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
import org.apache.zeppelin.interpreter.InterpreterResult;
import org.apache.zeppelin.interpreter.InterpreterSetting;
import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
import java.util.EnumSet;
@@ -80,7 +85,14 @@ public abstract class SparkIntegrationTest {
}
}
- private void testInterpreterBasics() throws IOException, InterpreterException {
+ private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException {
+ // add jars & packages for testing
+ InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+ sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0");
+ MavenXpp3Reader reader = new MavenXpp3Reader();
+ Model model = reader.read(new FileReader("pom.xml"));
+ sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath());
+
// test SparkInterpreter
Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark", "test");
@@ -93,6 +105,11 @@ public abstract class SparkIntegrationTest {
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
assertTrue(interpreterResult.message().get(0).getData().contains("45"));
+ // test jars & packages can be loaded correctly
+ interpreterResult = sparkInterpreter.interpret("import org.apache.zeppelin.interpreter.integration.DummyClass\n" +
+ "import com.maxmind.geoip2._", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+
// test PySparkInterpreter
Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark", "test");
interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
@@ -123,7 +140,7 @@ public abstract class SparkIntegrationTest {
}
@Test
- public void testLocalMode() throws IOException, YarnException, InterpreterException {
+ public void testLocalMode() throws IOException, YarnException, InterpreterException, XmlPullParserException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "local[*]");
sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
@@ -143,7 +160,7 @@ public abstract class SparkIntegrationTest {
}
@Test
- public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+ public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-client");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
@@ -166,7 +183,7 @@ public abstract class SparkIntegrationTest {
}
@Test
- public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+ public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-cluster");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());