You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2019/03/05 09:30:58 UTC

[zeppelin] branch master updated: [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c49e9  [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode
78c49e9 is described below

commit 78c49e923e6c6ac228d259b20d67217db1b4315d
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Feb 15 21:00:25 2019 +0800

    [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode
    
    ### What is this PR for?
    
    User specified jars is missing in yarn-cluster mode due to we didn't detect the user jar correctly. This PR fix the detecting jar logic in `BaseSparkScalaInterpreter`.
    
    ### What type of PR is it?
    [Bug Fix]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://jira.apache.org/jira/browse/ZEPPELIN-3986
    
    ### How should this be tested?
    * System integration test is added into SparkIntegrationTest, we tested the case of `spark.jars` & `spark.jars.packages`
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3308 from zjffdu/ZEPPELIN-3986 and squashes the following commits:
    
    49a59b2a1 [Jeff Zhang] [ZEPPELIN-3986]. Cannot access any JAR in yarn cluster mode
---
 .../zeppelin/spark/NewSparkInterpreterTest.java    | 28 ----------------
 .../zeppelin/spark/BaseSparkScalaInterpreter.scala | 38 ++++++++++++++--------
 zeppelin-interpreter-integration/pom.xml           | 12 +++++++
 .../interpreter/integration/DummyClass.java        | 21 ++++++++++++
 .../zeppelin/integration/SparkIntegrationTest.java | 25 +++++++++++---
 5 files changed, 78 insertions(+), 46 deletions(-)

diff --git a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
index 54835ae..7c7dad9 100644
--- a/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
+++ b/spark/interpreter/src/test/java/org/apache/zeppelin/spark/NewSparkInterpreterTest.java
@@ -370,34 +370,6 @@ public class NewSparkInterpreterTest {
     interpretThread.join();
   }
 
-  @Test
-  public void testDependencies() throws IOException, InterpreterException {
-    Properties properties = new Properties();
-    properties.setProperty("spark.master", "local");
-    properties.setProperty("spark.app.name", "test");
-    properties.setProperty("zeppelin.spark.maxResult", "100");
-    properties.setProperty("zeppelin.spark.useNew", "true");
-    // disable color output for easy testing
-    properties.setProperty("zeppelin.spark.scala.color", "false");
-
-    // download spark-avro jar
-    URL website = new URL("http://repo1.maven.org/maven2/com/databricks/spark-avro_2.11/3.2.0/spark-avro_2.11-3.2.0.jar");
-    ReadableByteChannel rbc = Channels.newChannel(website.openStream());
-    File avroJarFile = new File("spark-avro_2.11-3.2.0.jar");
-    FileOutputStream fos = new FileOutputStream(avroJarFile);
-    fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE);
-
-    properties.setProperty("spark.jars", avroJarFile.getAbsolutePath());
-
-    interpreter = new SparkInterpreter(properties);
-    assertTrue(interpreter.getDelegation() instanceof NewSparkInterpreter);
-    interpreter.setInterpreterGroup(mock(InterpreterGroup.class));
-    interpreter.open();
-
-    InterpreterResult result = interpreter.interpret("import com.databricks.spark.avro._", getInterpreterContext());
-    assertEquals(InterpreterResult.Code.SUCCESS, result.code());
-  }
-
   //TODO(zjffdu) This unit test will fail due to classpath issue, should enable it after the classpath issue is fixed.
   @Ignore
   public void testDepInterpreter() throws InterpreterException {
diff --git a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
index 183dee6..cd241a8 100644
--- a/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
+++ b/spark/spark-scala-parent/src/main/scala/org/apache/zeppelin/spark/BaseSparkScalaInterpreter.scala
@@ -19,6 +19,8 @@ package org.apache.zeppelin.spark
 
 
 import java.io.File
+import java.net.URLClassLoader
+import java.nio.file.Paths
 import java.util.concurrent.atomic.AtomicInteger
 
 import org.apache.spark.sql.SQLContext
@@ -35,6 +37,7 @@ import scala.util.control.NonFatal
 /**
   * Base class for different scala versions of SparkInterpreter. It should be
   * binary compatible between multiple scala versions.
+  *
   * @param conf
   * @param depFiles
   */
@@ -86,6 +89,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
   def interpret(code: String, context: InterpreterContext): InterpreterResult = {
 
     val originalOut = System.out
+
     def _interpret(code: String): scala.tools.nsc.interpreter.Results.Result = {
       Console.withOut(interpreterOutput) {
         System.setOut(Console.out)
@@ -236,7 +240,7 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
     builder.getClass.getMethod("config", classOf[SparkConf]).invoke(builder, conf)
 
     if (conf.get("spark.sql.catalogImplementation", "in-memory").toLowerCase == "hive"
-        || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
+      || conf.get("spark.useHiveContext", "false").toLowerCase == "true") {
       val hiveSiteExisted: Boolean =
         Thread.currentThread().getContextClassLoader.getResource("hive-site.xml") != null
       val hiveClassesPresent =
@@ -370,20 +374,26 @@ abstract class BaseSparkScalaInterpreter(val conf: SparkConf,
   }
 
   protected def getUserJars(): Seq[String] = {
-    val sparkJars = conf.getOption("spark.jars").map(_.split(","))
-      .map(_.filter(_.nonEmpty)).toSeq.flatten
-    val depJars = depFiles.asScala.filter(_.endsWith(".jar"))
-    // add zeppelin spark interpreter jar
-    val zeppelinInterpreterJarURL = getClass.getProtectionDomain.getCodeSource.getLocation
-    // zeppelinInterpreterJarURL might be a folder when under unit testing
-    val result = if (new File(zeppelinInterpreterJarURL.getFile).isDirectory) {
-      sparkJars ++ depJars
-    } else {
-      sparkJars ++ depJars ++ Seq(zeppelinInterpreterJarURL.getFile)
+    var classLoader = Thread.currentThread().getContextClassLoader
+    var extraJars = Seq.empty[String]
+    while (classLoader != null) {
+      if (classLoader.getClass.getCanonicalName ==
+        "org.apache.spark.util.MutableURLClassLoader") {
+        extraJars = classLoader.asInstanceOf[URLClassLoader].getURLs()
+          // Check if the file exists.
+          .filter { u => u.getProtocol == "file" && new File(u.getPath).isFile }
+          // Some bad spark packages depend on the wrong version of scala-reflect. Blacklist it.
+          .filterNot {
+            u => Paths.get(u.toURI).getFileName.toString.contains("org.scala-lang_scala-reflect")
+          }
+          .map(url => url.toString).toSeq
+        classLoader = null
+      } else {
+        classLoader = classLoader.getParent
+      }
     }
-    conf.set("spark.jars", result.mkString(","))
-    LOGGER.debug("User jar for spark repl: " + conf.get("spark.jars"))
-    result
+    LOGGER.debug("User jar for spark repl: " + extraJars.mkString(","))
+    extraJars
   }
 
   protected def getUserFiles(): Seq[String] = {
diff --git a/zeppelin-interpreter-integration/pom.xml b/zeppelin-interpreter-integration/pom.xml
index 09c9710..6140925 100644
--- a/zeppelin-interpreter-integration/pom.xml
+++ b/zeppelin-interpreter-integration/pom.xml
@@ -95,6 +95,18 @@
       </exclusions>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.maven</groupId>
+      <artifactId>maven-model</artifactId>
+      <version>3.0.3</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.codehaus.plexus</groupId>
+          <artifactId>plexus-utils</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
     <!--test libraries-->
     <dependency>
       <groupId>junit</groupId>
diff --git a/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
new file mode 100644
index 0000000..1df4618
--- /dev/null
+++ b/zeppelin-interpreter-integration/src/main/java/org/apache/zeppelin/interpreter/integration/DummyClass.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zeppelin.interpreter.integration;
+
+public class DummyClass {
+}
diff --git a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
index 03a482d..94a6a40 100644
--- a/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
+++ b/zeppelin-interpreter-integration/src/test/java/org/apache/zeppelin/integration/SparkIntegrationTest.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.maven.model.Model;
+import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
 import org.apache.zeppelin.interpreter.Interpreter;
 import org.apache.zeppelin.interpreter.InterpreterContext;
 import org.apache.zeppelin.interpreter.InterpreterException;
@@ -29,12 +31,15 @@ import org.apache.zeppelin.interpreter.InterpreterFactory;
 import org.apache.zeppelin.interpreter.InterpreterResult;
 import org.apache.zeppelin.interpreter.InterpreterSetting;
 import org.apache.zeppelin.interpreter.InterpreterSettingManager;
+import org.codehaus.plexus.util.xml.pull.XmlPullParserException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.io.FileReader;
 import java.io.IOException;
 import java.util.EnumSet;
 
@@ -80,7 +85,14 @@ public abstract class SparkIntegrationTest {
     }
   }
 
-  private void testInterpreterBasics() throws IOException, InterpreterException {
+  private void testInterpreterBasics() throws IOException, InterpreterException, XmlPullParserException {
+    // add jars & packages for testing
+    InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
+    sparkInterpreterSetting.setProperty("spark.jars.packages", "com.maxmind.geoip2:geoip2:2.5.0");
+    MavenXpp3Reader reader = new MavenXpp3Reader();
+    Model model = reader.read(new FileReader("pom.xml"));
+    sparkInterpreterSetting.setProperty("spark.jars", new File("target/zeppelin-interpreter-integration-" + model.getVersion() + ".jar").getAbsolutePath());
+
     // test SparkInterpreter
     Interpreter sparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.spark", "test");
 
@@ -93,6 +105,11 @@ public abstract class SparkIntegrationTest {
     assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
     assertTrue(interpreterResult.message().get(0).getData().contains("45"));
 
+    // test jars & packages can be loaded correctly
+    interpreterResult = sparkInterpreter.interpret("import org.apache.zeppelin.interpreter.integration.DummyClass\n" +
+            "import com.maxmind.geoip2._", context);
+    assertEquals(InterpreterResult.Code.SUCCESS, interpreterResult.code());
+
     // test PySparkInterpreter
     Interpreter pySparkInterpreter = interpreterFactory.getInterpreter("user1", "note1", "spark.pyspark", "test");
     interpreterResult = pySparkInterpreter.interpret("sqlContext.createDataFrame([(1,'a'),(2,'b')], ['id','name']).registerTempTable('test')", context);
@@ -123,7 +140,7 @@ public abstract class SparkIntegrationTest {
   }
 
   @Test
-  public void testLocalMode() throws IOException, YarnException, InterpreterException {
+  public void testLocalMode() throws IOException, YarnException, InterpreterException, XmlPullParserException {
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
     sparkInterpreterSetting.setProperty("master", "local[*]");
     sparkInterpreterSetting.setProperty("SPARK_HOME", sparkHome);
@@ -143,7 +160,7 @@ public abstract class SparkIntegrationTest {
   }
 
   @Test
-  public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+  public void testYarnClientMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
     sparkInterpreterSetting.setProperty("master", "yarn-client");
     sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());
@@ -166,7 +183,7 @@ public abstract class SparkIntegrationTest {
   }
 
   @Test
-  public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException {
+  public void testYarnClusterMode() throws IOException, YarnException, InterruptedException, InterpreterException, XmlPullParserException {
     InterpreterSetting sparkInterpreterSetting = interpreterSettingManager.getInterpreterSettingByName("spark");
     sparkInterpreterSetting.setProperty("master", "yarn-cluster");
     sparkInterpreterSetting.setProperty("HADOOP_CONF_DIR", hadoopCluster.getConfigPath());