You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2022/09/06 10:50:18 UTC
[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new fd16dd740 [KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader
fd16dd740 is described below
commit fd16dd7408cf468aad42cbe23cab50f25751b676
Author: Fei Wang <fw...@ebay.com>
AuthorDate: Tue Sep 6 17:29:52 2022 +0800
[KYUUBI #3064] Fix scala NPE issue when adding non-local jar URI to class loader
### _Why are the changes needed?_
Close #3064
Refer the comments by cxzl25
Spark uses the code
`URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory(hadoopConf))` supports the class path from HDFS.
But because the scala compiler only supports adding file schema urls to the class path, non-file schema urls will cause NPE.
```java
Error: Error operating ExecuteScala: java.lang.NullPointerException
at scala.tools.nsc.classpath.FileUtils$AbstractFileOps$.isJarOrZip$extension(FileUtils.scala:32)
at scala.tools.nsc.classpath.ClassPathFactory$.newClassPath(ClassPathFactory.scala:90)
at scala.tools.nsc.Global.$anonfun$extendCompilerClassPath$1(Global.scala:832)
```
scala.tools.nsc.Global#extendCompilerClassPath
```scala
AbstractFile.getURL(u)
```
scala.reflect.io.AbstractFile#getURL
```scala
def getURL(url: URL): AbstractFile =
if (url.getProtocol == "file") {
val f = new java.io.File(url.toURI)
if (f.isDirectory) getDirectory(f)
else getFile(f)
} else null
```
spark-shell supports --jars hdfs jar. At this time, submit will download the remote jar to the local and pass it to spark-shell through the `spark.repl.local.jars` configuration.
In this pr, I localize the remote jar url at first, and then add it into repl class path.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3292 from turboFei/scala_npe.
Closes #3064
095c40d8 [Fei Wang] filter
37637f76 [Fei Wang] add non-exist
ec87ea85 [Fei Wang] add ut
5a823d3b [Fei Wang] save
Authored-by: Fei Wang <fw...@ebay.com>
Signed-off-by: Fei Wang <fw...@ebay.com>
---
.../engine/spark/operation/ExecuteScala.scala | 24 ++++++++++-
.../apache/kyuubi/operation/SparkQueryTests.scala | 2 +-
.../org/apache/kyuubi/WithSecuredDFSService.scala | 1 +
.../org/apache/kyuubi/WithSimpleDFSService.scala | 1 +
.../operation/KyuubiOperationPerUserSuite.scala | 47 +++++++++++++++++++++-
.../org/apache/kyuubi/server/MiniDFSService.scala | 25 +++++++++++-
6 files changed, 93 insertions(+), 7 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index a67f7295e..0ef31a897 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -17,8 +17,13 @@
package org.apache.kyuubi.engine.spark.operation
+import java.io.File
+
+import scala.reflect.internal.util.ScalaClassLoader.URLClassLoader
import scala.tools.nsc.interpreter.Results.{Error, Incomplete, Success}
+import org.apache.hadoop.fs.Path
+import org.apache.spark.SparkFiles
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType
@@ -64,8 +69,23 @@ class ExecuteScala(
if (legacyOutput.nonEmpty) {
warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
}
- val jars = spark.sharedState.jarClassLoader.getURLs
- repl.addUrlsToClassPath(jars: _*)
+ val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs
+ spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar =>
+ try {
+ if ("file".equals(jar.toURI.getScheme)) {
+ repl.addUrlsToClassPath(jar)
+ } else {
+ spark.sparkContext.addFile(jar.toString)
+ val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName))
+ val localJarUrl = localJarFile.toURI.toURL
+ if (!replUrls.contains(localJarUrl)) {
+ repl.addUrlsToClassPath(localJarUrl)
+ }
+ }
+ } catch {
+ case e: Throwable => error(s"Error adding $jar to repl class path", e)
+ }
+ }
repl.interpretWithRedirectOutError(statement) match {
case Success =>
diff --git a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
index 8b8d26854..5b878cd47 100644
--- a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
+++ b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkQueryTests.scala
@@ -33,7 +33,7 @@ trait SparkQueryTests extends HiveJDBCTestHelper {
protected lazy val SPARK_ENGINE_MAJOR_MINOR_VERSION: (Int, Int) = sparkEngineMajorMinorVersion
- protected lazy val httpMode = false;
+ protected lazy val httpMode = false
test("execute statement - select null") {
withJdbcStatement() { statement =>
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
index cfea0d2bb..43bea7c6a 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSecuredDFSService.scala
@@ -60,4 +60,5 @@ trait WithSecuredDFSService extends KerberizedTestHelper {
}
def getHadoopConf: Configuration = miniDFSService.getHadoopConf
+ def getHadoopConfDir: String = miniDFSService.getHadoopConfDir
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala
index e7d0fbfe1..0e29f4411 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/WithSimpleDFSService.scala
@@ -41,6 +41,7 @@ trait WithSimpleDFSService extends KyuubiFunSuite {
}
def getHadoopConf: Configuration = miniDFSService.getHadoopConf
+ def getHadoopConfDir: String = miniDFSService.getHadoopConfDir
def getDefaultFS: String = miniDFSService.getHadoopConf.get("fs.defaultFS")
def getDFSPort: Int = miniDFSService.getDFSPort
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index 313335005..c2f1be1d4 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -17,14 +17,20 @@
package org.apache.kyuubi.operation
+import java.util.UUID
+
+import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hive.service.rpc.thrift.{TExecuteStatementReq, TStatusCode}
import org.scalatest.time.SpanSugar._
-import org.apache.kyuubi.{Utils, WithKyuubiServer}
+import org.apache.kyuubi.{Utils, WithKyuubiServer, WithSimpleDFSService}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf.KYUUBI_ENGINE_ENV_PREFIX
+import org.apache.kyuubi.jdbc.hive.KyuubiStatement
import org.apache.kyuubi.session.{KyuubiSessionImpl, KyuubiSessionManager, SessionHandle}
-class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests {
+class KyuubiOperationPerUserSuite
+ extends WithKyuubiServer with SparkQueryTests with WithSimpleDFSService {
override protected def jdbcUrl: String = getJdbcUrl
@@ -32,6 +38,11 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests
KyuubiConf().set(KyuubiConf.ENGINE_SHARE_LEVEL, "user")
}
+ override def beforeAll(): Unit = {
+ super.beforeAll()
+ conf.set(s"$KYUUBI_ENGINE_ENV_PREFIX.HADOOP_CONF_DIR", getHadoopConfDir)
+ }
+
test("kyuubi defined function - system_user/session_user") {
withJdbcStatement() { statement =>
val rs = statement.executeQuery("SELECT system_user(), session_user()")
@@ -200,4 +211,36 @@ class KyuubiOperationPerUserSuite extends WithKyuubiServer with SparkQueryTests
}
}
}
+
+ test("scala NPE issue with hdfs jar") {
+ val jarDir = Utils.createTempDir().toFile
+ val udfCode =
+ """
+ |package test.utils
+ |
+ |object Math {
+ |def add(x: Int, y: Int): Int = x + y
+ |}
+ |
+ |""".stripMargin
+ val jarFile = UserJarTestUtils.createJarFile(
+ udfCode,
+ "test",
+ s"test-function-${UUID.randomUUID}.jar",
+ jarDir.toString)
+ val hadoopConf = getHadoopConf
+ val dfs = FileSystem.get(hadoopConf)
+ val dfsJarDir = dfs.makeQualified(new Path(s"jars-${UUID.randomUUID()}"))
+ val localFs = FileSystem.getLocal(hadoopConf)
+ val localPath = new Path(jarFile.getAbsolutePath)
+ val dfsJarPath = new Path(dfsJarDir, "test-function.jar")
+ FileUtil.copy(localFs, localPath, dfs, dfsJarPath, false, false, hadoopConf)
+ withJdbcStatement() { statement =>
+ val kyuubiStatement = statement.asInstanceOf[KyuubiStatement]
+ statement.executeQuery(s"add jar $dfsJarPath")
+ val rs = kyuubiStatement.executeScala("println(test.utils.Math.add(1,2))")
+ rs.next()
+ assert(rs.getString(1) === "3")
+ }
+ }
}
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
index d7f01e728..caacbb6bf 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/MiniDFSService.scala
@@ -17,19 +17,24 @@
package org.apache.kyuubi.server
+import java.io.{File, FileWriter}
+import java.net.InetAddress
+
+import scala.collection.JavaConverters._
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hdfs.MiniDFSCluster
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys
import org.apache.hadoop.security.UserGroupInformation
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.service.AbstractService
class MiniDFSService(name: String, hdfsConf: Configuration)
extends AbstractService(name)
with Logging {
-
+ private val hadoopConfDir: File = Utils.createTempDir().toFile
private var hdfsCluster: MiniDFSCluster = _
def this(hdfsConf: Configuration = new Configuration()) =
@@ -55,6 +60,7 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
s"NameNode address in configuration is " +
s"${hdfsConf.get(HdfsClientConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY)}")
super.start()
+ saveHadoopConf()
}
override def stop(): Unit = {
@@ -62,6 +68,21 @@ class MiniDFSService(name: String, hdfsConf: Configuration)
super.stop()
}
+ private def saveHadoopConf(): Unit = {
+ val configToWrite = new Configuration(false)
+ val hostName = InetAddress.getLocalHost.getHostName
+ hdfsConf.iterator().asScala.foreach { kv =>
+ val key = kv.getKey
+ val value = kv.getValue.replaceAll(hostName, "localhost")
+ configToWrite.set(key, value)
+ getConf.set(key, value)
+ }
+ val writer = new FileWriter(new File(hadoopConfDir, "hdfs-site.xml"))
+ configToWrite.writeXml(writer)
+ writer.close()
+ }
+
def getHadoopConf: Configuration = hdfsConf
def getDFSPort: Int = hdfsCluster.getNameNodePort
+ def getHadoopConfDir: String = hadoopConfDir.getAbsolutePath
}