You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/09/06 10:50:18 UTC

[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new fd16dd740 [KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader
fd16dd740 is described below

commit fd16dd7408cf468aad42cbe23cab50f25751b676
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Tue Sep 6 17:29:52 2022 +0800

    [KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader
    
    ### _Why are the changes needed?_
    
    Close #3064
    
    Refer the comments by cxzl25
    
    Spark uses the code
     `URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf))` supports the class path from HDFS.
    But because the scala compiler only supports adding file schema urls to the class path, non-file schema urls will cause NPE.
    
    ```java
    Error: Error operating ExecuteScala: java.lang.NullPointerException
            at scala.tools.nsc.classpath.FileUtils$AbstractFileOps$.isJarOrZip$extension(FileUtils.scala:32)
            at scala.tools.nsc.classpath.ClassPathFactory$.newClassPath(ClassPathFactory.scala:90)
            at scala.tools.nsc.Global.$anonfun$extendCompilerClassPath$1(Global.scala:832)
    ```
    
    scala.tools.nsc.Global#extendCompilerClassPath
    ```scala
    AbstractFile.getURL(u)
    ```
    
    scala.reflect.io.AbstractFile#getURL
    ```scala
      def getURL(url: URL): AbstractFile =
        if (url.getProtocol == "file") {
          val f = new java.io.File(url.toURI)
          if (f.isDirectory) getDirectory(f)
          else getFile(f)
        } else null
    ```
    
    spark-shell supports --jars hdfs jar. At this time, submit will download the remote jar to the local and pass it to spark-shell through the `spark.repl.local.jars` configuration.
    
    In this pr, I localize the remote jar url at first, and then add it into repl class path.
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3292 from turboFei/scala_npe.
    
    Closes #3064
    
    095c40d8 [Fei Wang] filter
    37637f76 [Fei Wang] add non-exist
    ec87ea85 [Fei Wang] add ut
    5a823d3b [Fei Wang] save
    
    Authored-by: Fei Wang <fw...@ebay.com>
    Signed-off-by: Fei Wang <fw...@ebay.com>
---
 .../engine/spark/operation/ExecuteScala.scala      | 24 ++++++++++-
 .../apache/kyuubi/operation/SparkQueryTests.scala  |  2 +-
 .../org/apache/kyuubi/WithSecuredDFSService.scala  |  1 +
 .../org/apache/kyuubi/WithSimpleDFSService.scala   |  1 +
 .../operation/KyuubiOperationPerUserSuite.scala    | 47 +++++++++++++++++++++-
 .../org/apache/kyuubi/server/MiniDFSService.scala  | 25 +++++++++++-
 6 files changed, 93 insertions(+), 7 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index a67f7295e..0ef31a897 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -17,8 +17,13 @@
 
 package org.apache.kyuubi.engine.spark.operation
 
+import java.io.File
+
+import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader
 import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success}
 
+import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkFiles
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.StructType
 
@@ -64,8 +69,23 @@ class ExecuteScala(
       if (legacyOutput.nonEmpty) {
         warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
       }
-      val jars = spark.sharedState.jarClassLoader.getURLs
-      repl.addUrlsToClassPath(jars: _*)
+      val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs
+      spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar =>
+        try {
+          if ("file".equals(jar.toURI.getScheme)) {
+            repl.addUrlsToClassPath(jar)
+          } else {
+            spark.sparkContext.addFile(jar.toString)
+            val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName))
+            val localJarUrl = localJarFile.toURI.toURL
+            if (!replUrls.contains(localJarUrl)) {
+              repl.addUrlsToClassPath(localJarUrl)
+            }
+          }
+        } catch {
+          case e: Throwable => error(s"Error adding $jar to repl class path", e)
+        }
+      }
 
       repl.interpretWithRedirectOutError(statement) match {
         case Success =>
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
index 8b8d26854..5b878cd47 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
@@ -33,7 +33,7 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
 
   protected lazy val SPARK_ENGINE_MAJOR_MINOR_VERSION: (Int, Int) = sparkEngineMajorMinorVersion
 
-  protected lazy val httpMode = false;
+  protected lazy val httpMode = false
 
   test("execute statement - select null") {
     withJdbcStatement() { statement =>
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
index cfea0d2bb..43bea7c6a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
@@ -60,4 +60,5 @@ trait WithSecuredDFSService extends KerberizedTestHelper {
   }
 
   def getHadoopConf: Configuration = miniDFSService.getHadoopConf
+  def getHadoopConfDir: String = miniDFSService.getHadoopConfDir
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala
index e7d0fbfe1..0e29f4411 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala
@@ -41,6 +41,7 @@ trait WithSimpleDFSService extends KyuubiFunSuite {
   }
 
   def getHadoopConf: Configuration = miniDFSService.getHadoopConf
+  def getHadoopConfDir: String = miniDFSService.getHadoopConfDir
 
   def getDefaultFS: String = miniDFSService.getHadoopConf.get("fs.defaultFS")
   def getDFSPort: Int = miniDFSService.getDFSPort
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index 313335005..c2f1be1d4 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -17,14 +17,20 @@
 
 package org.apache.kyuubi.operation
 
+import java.util.UUID
+
+import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
 import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TStatusCode}
 import org.scalatest.time.SpanSugar._
 
-import org.apache.kyuubi.{Utils, WithKyuubiServer}
+import org.apache.kyuubi.{Utils, WithKyuubiServer, WithSimpleDFSService}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement
 import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}
 
-class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests {
+class KyuubiOperationPerUserSuite
+  extends WithKyuubiServer with SparkQueryTests with WithSimpleDFSService {
 
   override protected def jdbcUrl: String = getJdbcUrl
 
@@ -32,6 +38,11 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests
     KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "user")
   }
 
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", getHadoopConfDir)
+  }
+
   test("kyuubi defined function - system_user/session_user") {
     withJdbcStatement() { statement =>
       val rs = statement.executeQuery("SELECT system_user(), session_user()")
@@ -200,4 +211,36 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests
       }
     }
   }
+
+  test("scala NPE issue with hdfs jar") {
+    val jarDir = Utils.createTempDir().toFile
+    val udfCode =
+      """
+        |package test.utils
+        |
+        |object Math {
+        |def add(x: Int, y: Int): Int = x + y
+        |}
+        |
+        |""".stripMargin
+    val jarFile = UserJarTestUtils.createJarFile(
+      udfCode,
+      "test",
+      s"test-function-${UUID.randomUUID}.jar",
+      jarDir.toString)
+    val hadoopConf = getHadoopConf
+    val dfs = FileSystem.get(hadoopConf)
+    val dfsJarDir = dfs.makeQualified(new Path(s"jars-${UUID.randomUUID()}"))
+    val localFs = FileSystem.getLocal(hadoopConf)
+    val localPath = new Path(jarFile.getAbsolutePath)
+    val dfsJarPath = new Path(dfsJarDir, "test-function.jar")
+    FileUtil.copy(localFs, localPath, dfs, dfsJarPath, false, false, hadoopConf)
+    withJdbcStatement() { statement =>
+      val kyuubiStatement = statement.asInstanceOf[KyuubiStatement]
+      statement.executeQuery(s"add jar $dfsJarPath")
+      val rs = kyuubiStatement.executeScala("println(test.utils.Math.add(1,2))")
+      rs.next()
+      assert(rs.getString(1) === "3")
+    }
+  }
 }
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
index d7f01e728..caacbb6bf 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
@@ -17,19 +17,24 @@
 
 package org.apache.kyuubi.server
 
+import java.io.{File, FileWriter}
+import java.net.InetAddress
+
+import scala.collection.JavaConverters._
+
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hdfs.MiniDFSCluster
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys
 import org.apache.hadoop.security.UserGroupInformation
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.service.AbstractService
 
 class MiniDFSService(name: String, hdfsConf: Configuration)
   extends AbstractService(name)
   with Logging {
-
+  private val hadoopConfDir: File = Utils.createTempDir().toFile
   private var hdfsCluster: MiniDFSCluster = _
 
   def this(hdfsConf: Configuration = new Configuration()) =
@@ -55,6 +60,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
       s"NameNode address in configuration is " +
         s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
     super.start()
+    saveHadoopConf()
   }
 
   override def stop(): Unit = {
@@ -62,6 +68,21 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
     super.stop()
   }
 
+  private def saveHadoopConf(): Unit = {
+    val configToWrite = new Configuration(false)
+    val hostName = InetAddress.getLocalHost.getHostName
+    hdfsConf.iterator().asScala.foreach { kv =>
+      val key = kv.getKey
+      val value = kv.getValue.replaceAll(hostName, "localhost")
+      configToWrite.set(key, value)
+      getConf.set(key, value)
+    }
+    val writer = new FileWriter(new File(hadoopConfDir, "hdfs-site.xml"))
+    configToWrite.writeXml(writer)
+    writer.close()
+  }
+
   def getHadoopConf: Configuration = hdfsConf
   def getDFSPort: Int = hdfsCluster.getNameNodePort
+  def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
 }