You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/06 02:40:38 UTC

[zeppelin] branch branch-0.8 updated: ZEPPELIN-3986. Cannot access any JAR in yarn cluster mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch branch-0.8
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/branch-0.8 by this push:
     new 9f552e4  ZEPPELIN-3986. Cannot access any JAR in yarn cluster mode
9f552e4 is described below

commit 9f552e434b8f0da812369c4f449015008d53e648
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Tue Mar 5 17:41:11 2019 +0800

    ZEPPELIN-3986. Cannot access any JAR in yarn cluster mode
---
 .../org/apache/zeppelin/spark/SparkVersion.java    |  3 +-
 .../zeppelin/spark/NewSparkInterpreterTest.java    | 26 ---------------
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 38 ++++++++++++++--------
 .../interpreter/integration/DummyClass.java        | 21 ++++++++++++
 .../zeppelin/interpreter/SparkIntegrationTest.java | 29 ++++++++++++-----
 5 files changed, 68 insertions(+), 49 deletions(-)

diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
index 947a36f..f28ddf5 100644
--- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
+++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/SparkVersion.java
@@ -64,7 +64,8 @@ public class SparkVersion {
       this.minorVersion = Integer.parseInt(versions[1]);
       this.patchVersion = Integer.parseInt(versions[2]);
       // version is always 5 digits. (e.g. 2.0.0 -> 20000, 1.6.2 -> 10602)
-      version = Integer.parseInt(String.format("%d%02d%02d", majorVersion, minorVersion, patchVersion));
+      version = Integer.parseInt(String.format("%d%02d%02d", majorVersion, minorVersion,
+              patchVersion));
     } catch (Exception e) {
       logger.error("Can not recognize Spark version " + versionString +
           ". Assume it's a future release", e);
diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index a3e4d22..a0d1d67 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -357,32 +357,6 @@ public class NewSparkInterpreterTest {
     interpretThread.join();
   }
 
-  @Test
-  public void testDependencies() throws IOException, InterpreterException {
-    Properties properties = new Properties();
-    properties.setProperty("spark.master", "local");
-    properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.useNew", "true");
-
-    // download spark-avro jar
-    URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar");
-    ReadableByteChannel rbc = Channels.newChannel(website.openStream());
-    File avroJarFile = new File("spark-avro_2.11-3.2.0.jar");
-    FileOutputStream fos = new FileOutputStream(avroJarFile);
-    fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
-
-    properties.setProperty("spark.jars", avroJarFile.getAbsolutePath());
-
-    interpreter = new SparkInterpreter(properties);
-    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
-    interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
-    interpreter.open();
-
-    InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-  }
-
   //TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed.
   @Ignore
   public void testDepInterpreter() throws InterpreterException {
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 7fcf6e8..c8cb862 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -19,6 +19,8 @@ package org.apache.zeppelin.spark
 
 
 import java.io.File
+import java.net.URLClassLoader
+import java.nio.file.Paths
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.sql.SQLContext
@@ -35,6 +37,7 @@ import scala.util.control.NonFatal
 /**
   * Base class for different scala versions of SparkInterpreter. It should be
   * binary compatible between multiple scala versions.
+  *
   * @param conf
   * @param depFiles
   */
@@ -86,6 +89,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
   def interpret(code: String, context: InterpreterContext): InterpreterResult = {
 
     val originalOut = System.out
+
     def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
       Console.withOut(interpreterOutput) {
         System.setOut(Console.out)
@@ -236,7 +240,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
 
     if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
-        || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
+      || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
       val hiveSiteExisted: Boolean =
         Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
       val hiveClassesPresent =
@@ -370,20 +374,26 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
   }
 
   protected def getUserJars(): Seq[String] = {
-    val sparkJars = conf.getOption("spark.jars").map(_.split(","))
-      .map(_.filter(_.nonEmpty)).toSeq.flatten
-    val depJars = depFiles.asScala.filter(_.endsWith(".jar"))
-    // add zeppelin spark interpreter jar
-    val zeppelinInterpreterJarURL = getClass.getProtectionDomain.getCodeSource.getLocation
-    // zeppelinInterpreterJarURL might be a folder when under unit testing
-    val result = if (new File(zeppelinInterpreterJarURL.getFile).isDirectory) {
-      sparkJars ++ depJars
-    } else {
-      sparkJars ++ depJars ++ Seq(zeppelinInterpreterJarURL.getFile)
+    var classLoader = Thread.currentThread().getContextClassLoader
+    var extraJars = Seq.empty[String]
+    while (classLoader != null) {
+      if (classLoader.getClass.getCanonicalName ==
+        "org.apache.spark.util.MutableURLClassLoader") {
+        extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+          // Check if the file exists.
+          .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+          // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+          .filterNot {
+            u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+          }
+          .map(url => url.toString).toSeq
+        classLoader = null
+      } else {
+        classLoader = classLoader.getParent
+      }
     }
-    conf.set("spark.jars", result.mkString(","))
-    LOGGER.debug("User jar for spark repl: " + conf.get("spark.jars"))
-    result
+    LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+    extraJars
   }
 
   protected def getUserFiles(): Seq[String] = {
diff --git a/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
new file mode 100644
index 0000000..1df4618
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.integration;
+
+public class DummyClass {
+}
diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
index 397bb70..6782b47 100644
--- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
+++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/SparkIntegrationTest.java
@@ -5,18 +5,19 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.maven.model.Model;
+import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
+import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.EnumSet;
-import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -59,7 +60,14 @@ public abstract class SparkIntegrationTest {
     }
   }
 
-  private void testInterpreterBasics() throws IOException, InterpreterException {
+  private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException {
+    // add jars & packages for testing
+    InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0");
+    MavenXpp3Reader reader = new MavenXpp3Reader();
+    Model model = reader.read(new FileReader("pom.xml"));
+    sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-zengine-" + model.getVersion() + ".jar").getAbsolutePath());
+
     // test SparkInterpreter
     interpreterSettingManager.setInterpreterBinding("user1", "note1", interpreterSettingManager.getInterpreterSettingIds());
     Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark");
@@ -73,6 +81,11 @@ public abstract class SparkIntegrationTest {
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code);
     assertTrue(interpreterResult.msg.get(0).getData().contains("45"));
 
+    // test jars & packages can be loaded correctly
+    interpreterResult = sparkInterpreter.interpret("import org.apache.zeppelin.interpreter.install.InstallInterpreter\n" +
+            "import com.maxmind.geoip2._", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+
     // test PySparkInterpreter
     Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark");
     interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
@@ -103,7 +116,7 @@ public abstract class SparkIntegrationTest {
   }
 
   @Test
-  public void testLocalMode() throws IOException, YarnException, InterpreterException, InterruptedException {
+  public void testLocalMode() throws IOException, YarnException, InterpreterException, InterruptedException, XmlPullParserException {
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
     sparkInterpreterSetting.setProperty("master", "local[*]");
     sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
@@ -123,7 +136,7 @@ public abstract class SparkIntegrationTest {
   }
 
   @Test
-  public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+  public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
     sparkInterpreterSetting.setProperty("master", "yarn-client");
     sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
@@ -145,7 +158,7 @@ public abstract class SparkIntegrationTest {
   }
 
   @Test
-  public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+  public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
     sparkInterpreterSetting.setProperty("master", "yarn-cluster");
     sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());