You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/05/15 02:42:02 UTC
[kyuubi] branch branch-1.7 updated: [KYUUBI #4838] Fix spark operation exception leak in `withLocalProperties` method
This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.7 by this push:
new a01986dca [KYUUBI #4838] Fix spark operation exception leak in `withLocalProperties` method
a01986dca is described below
commit a01986dcaec3e617d2d8832e184157ee59d972ce
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon May 15 10:41:19 2023 +0800
[KYUUBI #4838] Fix spark operation exception leak in `withLocalProperties` method
### _Why are the changes needed?_
For `ExecutePython` operation.
https://github.com/apache/kyuubi/blob/474f0972a443c62c3d77a9977d61b59a18a0f0a6/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala#L139-L141
If exception thrown in `withLocalProperties` method(for example, python worker is died), the operation will be stuck in pending state forever.
<img width="1314" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/2929f0e4-dc64-4aa4-8d3e-e9d858f8e683">
We need to catch exception thrown in `withLocalProperties` method.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
After this pr:
<img width="1479" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/e17aadfe-81a3-4ec7-a595-26eb978dd2b0">
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4838 from turboFei/exception_leak.
Closes #4838
5544691ef [fwang12] fix operation exception leak
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: fwang12 <fw...@ebay.com>
(cherry picked from commit e8db3da440a9fc183177b0f17fea66bbfa962613)
Signed-off-by: fwang12 <fw...@ebay.com>
---
.../engine/spark/operation/ExecutePython.scala | 35 +++++-----
.../engine/spark/operation/ExecuteScala.scala | 81 +++++++++++-----------
.../engine/spark/operation/ExecuteStatement.scala | 21 +++---
.../engine/spark/operation/PlanOnlyStatement.scala | 29 ++++----
.../engine/spark/operation/SparkOperation.scala | 15 ++--
5 files changed, 93 insertions(+), 88 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 17cc967e6..0d01348a7 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -77,30 +77,31 @@ class ExecutePython(
OperationLog.removeCurrentOperationLog()
}
- private def executePython(): Unit = withLocalProperties {
+ private def executePython(): Unit =
try {
- setState(OperationState.RUNNING)
- info(diagnostics)
- addOperationListener()
- val response = worker.runCode(statement)
- val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
- if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {
- val output = response.map(_.content.getOutput()).getOrElse("")
- val ename = response.map(_.content.getEname()).getOrElse("")
- val evalue = response.map(_.content.getEvalue()).getOrElse("")
- val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty)
- iter =
- new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback)))
- setState(OperationState.FINISHED)
- } else {
- throw KyuubiSQLException(s"Interpret error:\n$statement\n $response")
+ withLocalProperties {
+ setState(OperationState.RUNNING)
+ info(diagnostics)
+ addOperationListener()
+ val response = worker.runCode(statement)
+ val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
+ if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {
+ val output = response.map(_.content.getOutput()).getOrElse("")
+ val ename = response.map(_.content.getEname()).getOrElse("")
+ val evalue = response.map(_.content.getEvalue()).getOrElse("")
+ val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty)
+ iter =
+ new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback)))
+ setState(OperationState.FINISHED)
+ } else {
+ throw KyuubiSQLException(s"Interpret error:\n$statement\n $response")
+ }
}
} catch {
onError(cancel = true)
} finally {
shutdownTimeoutMonitor()
}
- }
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index ff686cca0..24e17d281 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -76,59 +76,60 @@ class ExecuteScala(
OperationLog.removeCurrentOperationLog()
}
- private def executeScala(): Unit = withLocalProperties {
+ private def executeScala(): Unit =
try {
- setState(OperationState.RUNNING)
- info(diagnostics)
- Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
- addOperationListener()
- val legacyOutput = repl.getOutput
- if (legacyOutput.nonEmpty) {
- warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
- }
- val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs
- spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar =>
- try {
- if ("file".equals(jar.toURI.getScheme)) {
- repl.addUrlsToClassPath(jar)
- } else {
- spark.sparkContext.addFile(jar.toString)
- val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName))
- val localJarUrl = localJarFile.toURI.toURL
- if (!replUrls.contains(localJarUrl)) {
- repl.addUrlsToClassPath(localJarUrl)
+ withLocalProperties {
+ setState(OperationState.RUNNING)
+ info(diagnostics)
+ Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
+ addOperationListener()
+ val legacyOutput = repl.getOutput
+ if (legacyOutput.nonEmpty) {
+ warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
+ }
+ val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs
+ spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar =>
+ try {
+ if ("file".equals(jar.toURI.getScheme)) {
+ repl.addUrlsToClassPath(jar)
+ } else {
+ spark.sparkContext.addFile(jar.toString)
+ val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName))
+ val localJarUrl = localJarFile.toURI.toURL
+ if (!replUrls.contains(localJarUrl)) {
+ repl.addUrlsToClassPath(localJarUrl)
+ }
}
+ } catch {
+ case e: Throwable => error(s"Error adding $jar to repl class path", e)
}
- } catch {
- case e: Throwable => error(s"Error adding $jar to repl class path", e)
}
- }
- repl.interpretWithRedirectOutError(statement) match {
- case Success =>
- iter = {
- result = repl.getResult(statementId)
- if (result != null) {
- new ArrayFetchIterator[Row](result.collect())
- } else {
- val output = repl.getOutput
- debug("scala repl output:\n" + output)
- new ArrayFetchIterator[Row](Array(Row(output)))
+ repl.interpretWithRedirectOutError(statement) match {
+ case Success =>
+ iter = {
+ result = repl.getResult(statementId)
+ if (result != null) {
+ new ArrayFetchIterator[Row](result.collect())
+ } else {
+ val output = repl.getOutput
+ debug("scala repl output:\n" + output)
+ new ArrayFetchIterator[Row](Array(Row(output)))
+ }
}
- }
- case Error =>
- throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
- case Incomplete =>
- throw KyuubiSQLException(s"Incomplete code:\n$statement")
+ case Error =>
+ throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
+ case Incomplete =>
+ throw KyuubiSQLException(s"Incomplete code:\n$statement")
+ }
+ setState(OperationState.FINISHED)
}
- setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
} finally {
repl.clearResult(statementId)
shutdownTimeoutMonitor()
}
- }
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 015c4ba4a..8732fafea 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -74,22 +74,23 @@ class ExecuteStatement(
resultDF.take(maxRows)
}
- protected def executeStatement(): Unit = withLocalProperties {
+ protected def executeStatement(): Unit =
try {
- setState(OperationState.RUNNING)
- info(diagnostics)
- Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
- addOperationListener()
- result = spark.sql(statement)
- iter = collectAsIterator(result)
- setCompiledStateIfNeeded()
- setState(OperationState.FINISHED)
+ withLocalProperties {
+ setState(OperationState.RUNNING)
+ info(diagnostics)
+ Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
+ addOperationListener()
+ result = spark.sql(statement)
+ iter = collectAsIterator(result)
+ setCompiledStateIfNeeded()
+ setState(OperationState.FINISHED)
+ }
} catch {
onError(cancel = true)
} finally {
shutdownTimeoutMonitor()
}
- }
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index b7e5451ec..c5a7679d6 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -65,28 +65,29 @@ class PlanOnlyStatement(
super.beforeRun()
}
- override protected def runInternal(): Unit = withLocalProperties {
+ override protected def runInternal(): Unit =
try {
- SQLConf.withExistingConf(spark.sessionState.conf) {
- val parsed = spark.sessionState.sqlParser.parsePlan(statement)
+ withLocalProperties {
+ SQLConf.withExistingConf(spark.sessionState.conf) {
+ val parsed = spark.sessionState.sqlParser.parsePlan(statement)
- parsed match {
- case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
- result = spark.sql(statement)
- iter = new ArrayFetchIterator(result.collect())
+ parsed match {
+ case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
+ result = spark.sql(statement)
+ iter = new ArrayFetchIterator(result.collect())
- case plan => style match {
- case PlainStyle => explainWithPlainStyle(plan)
- case JsonStyle => explainWithJsonStyle(plan)
- case UnknownStyle => unknownStyleError(style)
- case other => throw notSupportedStyleError(other, "Spark SQL")
- }
+ case plan => style match {
+ case PlainStyle => explainWithPlainStyle(plan)
+ case JsonStyle => explainWithJsonStyle(plan)
+ case UnknownStyle => unknownStyleError(style)
+ case other => throw notSupportedStyleError(other, "Spark SQL")
+ }
+ }
}
}
} catch {
onError()
}
- }
private def explainWithPlainStyle(plan: LogicalPlan): Unit = {
mode match {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 6b2a5d9eb..320e67635 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -233,10 +233,10 @@ abstract class SparkOperation(session: Session)
resp
}
- override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet =
- withLocalProperties {
- var resultRowSet: TRowSet = null
- try {
+ override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
+ var resultRowSet: TRowSet = null
+ try {
+ withLocalProperties {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
@@ -261,10 +261,11 @@ abstract class SparkOperation(session: Session)
getProtocolVersion)
}
resultRowSet.setStartRowOffset(iter.getPosition)
- } catch onError(cancel = true)
+ }
+ } catch onError(cancel = true)
- resultRowSet
- }
+ resultRowSet
+ }
override def shouldRunAsync: Boolean = false