You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/06 02:40:38 UTC
[zeppelin] branch branch-0.8 updated: ZEPPELIN-3986. Cannot access
any JAR in yarn cluster mode
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.8 by this push:
new 9f552e4 ZEPPELIN-3986. Cannot access any JAR in yarn cluster mode
9f552e4 is described below
commit 9f552e434b8f0da812369c4f449015008d53e648
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Mar 5 17:41:11 2019 +0800
ZEPPELIN-3986. Cannot access any JAR in yarn cluster mode
---
.../org/apache/zeppelin/spark/SparkVersion.java | 3 +-
.../zeppelin/spark/NewSparkInterpreterTest.java | 26 ---------------
.../zeppelin/spark/BaseSparkScalaInterpreter.scala | 38 ++++++++++++++--------
.../interpreter/integration/DummyClass.java | 21 ++++++++++++
.../zeppelin/interpreter/SparkIntegrationTest.java | 29 ++++++++++++-----
5 files changed, 68 insertions(+), 49 deletions(-)
diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 947a36f..f28ddf5 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -64,7 +64,8 @@ public class SparkVersion {
this.minorVersion = Integer.parseInt(versions[1]);
this.patchVersion = Integer.parseInt(versions[2]);
// version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
- version = Integer.parseInt(String.format("%d%02d%02d", majorVersion, minorVersion, patchVersion));
+ version = Integer.parseInt(String.format("%d%02d%02d", majorVersion, minorVersion,
+ patchVersion));
} catch (Exception e) {
logger.error("Can not recognize Spark version " + versionString +
". Assume it's a future release", e);
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index a3e4d22..a0d1d67 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -357,32 +357,6 @@ public class NewSparkInterpreterTest {
interpretThread.join();
}
- @Test
- public void testDependencies() throws IOException, InterpreterException {
- Properties properties = new Properties();
- properties.setProperty("spark.master", "local");
- properties.setProperty("spark.app.name", "test");
- properties.setProperty("zeppelin.spark.maxResult", "100");
- properties.setProperty("zeppelin.spark.useNew", "true");
-
- // download spark-avro jar
- URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar");
- ReadableByteChannel rbc = Channels.newChannel(website.openStream());
- File avroJarFile = new File("spark-avro_2.11-3.2.0.jar");
- FileOutputStream fos = new FileOutputStream(avroJarFile);
- fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
-
- properties.setProperty("spark.jars", avroJarFile.getAbsolutePath());
-
- interpreter = new SparkInterpreter(properties);
- assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
- interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
- interpreter.open();
-
- InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
- assertEquals(InterpreterResult.Code.SUCCESS, result.code());
- }
-
//TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed.
@Ignore
public void testDepInterpreter() throws InterpreterException {
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 7fcf6e8..c8cb862 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -19,6 +19,8 @@ package org.apache.zeppelin.spark
import java.io.File
+import java.net.URLClassLoader
+import java.nio.file.Paths
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.sql.SQLContext
@@ -35,6 +37,7 @@ import scala.util.control.NonFatal
/**
* Base class for different scala versions of SparkInterpreter. It should be
* binary compatible between multiple scala versions.
+ *
* @param conf
* @param depFiles
*/
@@ -86,6 +89,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
def interpret(code: String, context: InterpreterContext): InterpreterResult = {
val originalOut = System.out
+
def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
Console.withOut(interpreterOutput) {
System.setOut(Console.out)
@@ -236,7 +240,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
- || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
+ || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
val hiveSiteExisted: Boolean =
Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
val hiveClassesPresent =
@@ -370,20 +374,26 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
}
protected def getUserJars(): Seq[String] = {
- val sparkJars = conf.getOption("spark.jars").map(_.split(","))
- .map(_.filter(_.nonEmpty)).toSeq.flatten
- val depJars = depFiles.asScala.filter(_.endsWith(".jar"))
- // add zeppelin spark interpreter jar
- val zeppelinInterpreterJarURL = getClass.getProtectionDomain.getCodeSource.getLocation
- // zeppelinInterpreterJarURL might be a folder when under unit testing
- val result = if (new File(zeppelinInterpreterJarURL.getFile).isDirectory) {
- sparkJars ++ depJars
- } else {
- sparkJars ++ depJars ++ Seq(zeppelinInterpreterJarURL.getFile)
+ var classLoader = Thread.currentThread().getContextClassLoader
+ var extraJars = Seq.empty[String]
+ while (classLoader != null) {
+ if (classLoader.getClass.getCanonicalName ==
+ "org.apache.spark.util.MutableURLClassLoader") {
+ extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+ // Check if the file exists.
+ .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+ // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+ .filterNot {
+ u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+ }
+ .map(url => url.toString).toSeq
+ classLoader = null
+ } else {
+ classLoader = classLoader.getParent
+ }
}
- conf.set("spark.jars", result.mkString(","))
- LOGGER.debug("User jar for spark repl: " + conf.get("spark.jars"))
- result
+ LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+ extraJars
}
protected def getUserFiles(): Seq[String] = {
diff --git a/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
new file mode 100644
index 0000000..1df4618
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.integration;
+
+public class DummyClass {
+}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
index 397bb70..6782b47 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
@@ -5,18 +5,19 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.maven.model.Model;
+import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
+import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.io.FileReader;
import java.io.IOException;
-import java.util.Arrays;
import java.util.EnumSet;
-import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -59,7 +60,14 @@ public abstract class SparkIntegrationTest {
}
}
- private void testInterpreterBasics() throws IOException, InterpreterException {
+ private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException {
+ // add jars & packages for testing
+ InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+ sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0");
+ MavenXpp3Reader reader = new MavenXpp3Reader();
+ Model model = reader.read(new FileReader("pom.xml"));
+ sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-zengine-" + model.getVersion() + ".jar").getAbsolutePath());
+
// test SparkInterpreter
interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
@@ -73,6 +81,11 @@ public abstract class SparkIntegrationTest {
assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
+ // test jars & packages can be loaded correctly
+ interpreterResult = sparkInterpreter.interpret("import org.apache.zeppelin.interpreter.install.InstallInterpreter\n" +
+ "import com.maxmind.geoip2._", context);
+ assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+
// test PySparkInterpreter
Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
@@ -103,7 +116,7 @@ public abstract class SparkIntegrationTest {
}
@Test
- public void testLocalMode() throws IOException, YarnException, InterpreterException, InterruptedException {
+ public void testLocalMode() throws IOException, YarnException, InterpreterException, InterruptedException, XmlPullParserException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "local[*]");
sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
@@ -123,7 +136,7 @@ public abstract class SparkIntegrationTest {
}
@Test
- public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+ public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-client");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
@@ -145,7 +158,7 @@ public abstract class SparkIntegrationTest {
}
@Test
- public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+ public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
sparkInterpreterSetting.setProperty("master", "yarn-cluster");
sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());