You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by fe...@apache.org on 2023/05/15 02:42:02 UTC

[kyuubi] branch branch-1.7 updated: [KYUUBI #4838] Fix spark operation exception leak in `withLocalProperties` method

This is an automated email from the ASF dual-hosted git repository.

feiwang pushed a commit to branch branch-1.7
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/branch-1.7 by this push:
     new a01986dca [KYUUBI #4838] Fix spark operation exception leak in `withLocalProperties` method
a01986dca is described below

commit a01986dcaec3e617d2d8832e184157ee59d972ce
Author: fwang12 <fw...@ebay.com>
AuthorDate: Mon May 15 10:41:19 2023 +0800

    [KYUUBI #4838] Fix spark operation exception leak in `withLocalProperties` method
    
    ### _Why are the changes needed?_
    
    For `ExecutePython` operation.
    https://github.com/apache/kyuubi/blob/474f0972a443c62c3d77a9977d61b59a18a0f0a6/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala#L139-L141
    
    If exception thrown in `withLocalProperties` method(for example, python worker is died), the operation will be stuck in pending state forever.
    <img width="1314" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/2929f0e4-dc64-4aa4-8d3e-e9d858f8e683">
    
    We need to catch exception thrown in `withLocalProperties` method.
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    After this pr:
    <img width="1479" alt="image" src="https://github.com/apache/kyuubi/assets/6757692/e17aadfe-81a3-4ec7-a595-26eb978dd2b0">
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4838 from turboFei/exception_leak.
    
    Closes #4838
    
    5544691ef [fwang12] fix operation exception leak
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: fwang12 <fw...@ebay.com>
    (cherry picked from commit e8db3da440a9fc183177b0f17fea66bbfa962613)
    Signed-off-by: fwang12 <fw...@ebay.com>
---
 .../engine/spark/operation/ExecutePython.scala     | 35 +++++-----
 .../engine/spark/operation/ExecuteScala.scala      | 81 +++++++++++-----------
 .../engine/spark/operation/ExecuteStatement.scala  | 21 +++---
 .../engine/spark/operation/PlanOnlyStatement.scala | 29 ++++----
 .../engine/spark/operation/SparkOperation.scala    | 15 ++--
 5 files changed, 93 insertions(+), 88 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 17cc967e6..0d01348a7 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -77,30 +77,31 @@ class ExecutePython(
     OperationLog.removeCurrentOperationLog()
   }
 
-  private def executePython(): Unit = withLocalProperties {
+  private def executePython(): Unit =
     try {
-      setState(OperationState.RUNNING)
-      info(diagnostics)
-      addOperationListener()
-      val response = worker.runCode(statement)
-      val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
-      if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {
-        val output = response.map(_.content.getOutput()).getOrElse("")
-        val ename = response.map(_.content.getEname()).getOrElse("")
-        val evalue = response.map(_.content.getEvalue()).getOrElse("")
-        val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty)
-        iter =
-          new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback)))
-        setState(OperationState.FINISHED)
-      } else {
-        throw KyuubiSQLException(s"Interpret error:\n$statement\n $response")
+      withLocalProperties {
+        setState(OperationState.RUNNING)
+        info(diagnostics)
+        addOperationListener()
+        val response = worker.runCode(statement)
+        val status = response.map(_.content.status).getOrElse("UNKNOWN_STATUS")
+        if (PythonResponse.OK_STATUS.equalsIgnoreCase(status)) {
+          val output = response.map(_.content.getOutput()).getOrElse("")
+          val ename = response.map(_.content.getEname()).getOrElse("")
+          val evalue = response.map(_.content.getEvalue()).getOrElse("")
+          val traceback = response.map(_.content.getTraceback()).getOrElse(Seq.empty)
+          iter =
+            new ArrayFetchIterator[Row](Array(Row(output, status, ename, evalue, traceback)))
+          setState(OperationState.FINISHED)
+        } else {
+          throw KyuubiSQLException(s"Interpret error:\n$statement\n $response")
+        }
       }
     } catch {
       onError(cancel = true)
     } finally {
       shutdownTimeoutMonitor()
     }
-  }
 
   override protected def runInternal(): Unit = {
     addTimeoutMonitor(queryTimeout)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index ff686cca0..24e17d281 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -76,59 +76,60 @@ class ExecuteScala(
     OperationLog.removeCurrentOperationLog()
   }
 
-  private def executeScala(): Unit = withLocalProperties {
+  private def executeScala(): Unit =
     try {
-      setState(OperationState.RUNNING)
-      info(diagnostics)
-      Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
-      addOperationListener()
-      val legacyOutput = repl.getOutput
-      if (legacyOutput.nonEmpty) {
-        warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
-      }
-      val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs
-      spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar =>
-        try {
-          if ("file".equals(jar.toURI.getScheme)) {
-            repl.addUrlsToClassPath(jar)
-          } else {
-            spark.sparkContext.addFile(jar.toString)
-            val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName))
-            val localJarUrl = localJarFile.toURI.toURL
-            if (!replUrls.contains(localJarUrl)) {
-              repl.addUrlsToClassPath(localJarUrl)
+      withLocalProperties {
+        setState(OperationState.RUNNING)
+        info(diagnostics)
+        Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
+        addOperationListener()
+        val legacyOutput = repl.getOutput
+        if (legacyOutput.nonEmpty) {
+          warn(s"Clearing legacy output from last interpreting:\n $legacyOutput")
+        }
+        val replUrls = repl.classLoader.getParent.asInstanceOf[URLClassLoader].getURLs
+        spark.sharedState.jarClassLoader.getURLs.filterNot(replUrls.contains).foreach { jar =>
+          try {
+            if ("file".equals(jar.toURI.getScheme)) {
+              repl.addUrlsToClassPath(jar)
+            } else {
+              spark.sparkContext.addFile(jar.toString)
+              val localJarFile = new File(SparkFiles.get(new Path(jar.toURI.getPath).getName))
+              val localJarUrl = localJarFile.toURI.toURL
+              if (!replUrls.contains(localJarUrl)) {
+                repl.addUrlsToClassPath(localJarUrl)
+              }
             }
+          } catch {
+            case e: Throwable => error(s"Error adding $jar to repl class path", e)
           }
-        } catch {
-          case e: Throwable => error(s"Error adding $jar to repl class path", e)
         }
-      }
 
-      repl.interpretWithRedirectOutError(statement) match {
-        case Success =>
-          iter = {
-            result = repl.getResult(statementId)
-            if (result != null) {
-              new ArrayFetchIterator[Row](result.collect())
-            } else {
-              val output = repl.getOutput
-              debug("scala repl output:\n" + output)
-              new ArrayFetchIterator[Row](Array(Row(output)))
+        repl.interpretWithRedirectOutError(statement) match {
+          case Success =>
+            iter = {
+              result = repl.getResult(statementId)
+              if (result != null) {
+                new ArrayFetchIterator[Row](result.collect())
+              } else {
+                val output = repl.getOutput
+                debug("scala repl output:\n" + output)
+                new ArrayFetchIterator[Row](Array(Row(output)))
+              }
             }
-          }
-        case Error =>
-          throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
-        case Incomplete =>
-          throw KyuubiSQLException(s"Incomplete code:\n$statement")
+          case Error =>
+            throw KyuubiSQLException(s"Interpret error:\n$statement\n ${repl.getOutput}")
+          case Incomplete =>
+            throw KyuubiSQLException(s"Incomplete code:\n$statement")
+        }
+        setState(OperationState.FINISHED)
       }
-      setState(OperationState.FINISHED)
     } catch {
       onError(cancel = true)
     } finally {
       repl.clearResult(statementId)
       shutdownTimeoutMonitor()
     }
-  }
 
   override protected def runInternal(): Unit = {
     addTimeoutMonitor(queryTimeout)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index 015c4ba4a..8732fafea 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -74,22 +74,23 @@ class ExecuteStatement(
     resultDF.take(maxRows)
   }
 
-  protected def executeStatement(): Unit = withLocalProperties {
+  protected def executeStatement(): Unit =
     try {
-      setState(OperationState.RUNNING)
-      info(diagnostics)
-      Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
-      addOperationListener()
-      result = spark.sql(statement)
-      iter = collectAsIterator(result)
-      setCompiledStateIfNeeded()
-      setState(OperationState.FINISHED)
+      withLocalProperties {
+        setState(OperationState.RUNNING)
+        info(diagnostics)
+        Thread.currentThread().setContextClassLoader(spark.sharedState.jarClassLoader)
+        addOperationListener()
+        result = spark.sql(statement)
+        iter = collectAsIterator(result)
+        setCompiledStateIfNeeded()
+        setState(OperationState.FINISHED)
+      }
     } catch {
       onError(cancel = true)
     } finally {
       shutdownTimeoutMonitor()
     }
-  }
 
   override protected def runInternal(): Unit = {
     addTimeoutMonitor(queryTimeout)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index b7e5451ec..c5a7679d6 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -65,28 +65,29 @@ class PlanOnlyStatement(
     super.beforeRun()
   }
 
-  override protected def runInternal(): Unit = withLocalProperties {
+  override protected def runInternal(): Unit =
     try {
-      SQLConf.withExistingConf(spark.sessionState.conf) {
-        val parsed = spark.sessionState.sqlParser.parsePlan(statement)
+      withLocalProperties {
+        SQLConf.withExistingConf(spark.sessionState.conf) {
+          val parsed = spark.sessionState.sqlParser.parsePlan(statement)
 
-        parsed match {
-          case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
-            result = spark.sql(statement)
-            iter = new ArrayFetchIterator(result.collect())
+          parsed match {
+            case cmd if planExcludes.contains(cmd.getClass.getSimpleName) =>
+              result = spark.sql(statement)
+              iter = new ArrayFetchIterator(result.collect())
 
-          case plan => style match {
-              case PlainStyle => explainWithPlainStyle(plan)
-              case JsonStyle => explainWithJsonStyle(plan)
-              case UnknownStyle => unknownStyleError(style)
-              case other => throw notSupportedStyleError(other, "Spark SQL")
-            }
+            case plan => style match {
+                case PlainStyle => explainWithPlainStyle(plan)
+                case JsonStyle => explainWithJsonStyle(plan)
+                case UnknownStyle => unknownStyleError(style)
+                case other => throw notSupportedStyleError(other, "Spark SQL")
+              }
+          }
         }
       }
     } catch {
       onError()
     }
-  }
 
   private def explainWithPlainStyle(plan: LogicalPlan): Unit = {
     mode match {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
index 6b2a5d9eb..320e67635 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkOperation.scala
@@ -233,10 +233,10 @@ abstract class SparkOperation(session: Session)
     resp
   }
 
-  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet =
-    withLocalProperties {
-      var resultRowSet: TRowSet = null
-      try {
+  override def getNextRowSet(order: FetchOrientation, rowSetSize: Int): TRowSet = {
+    var resultRowSet: TRowSet = null
+    try {
+      withLocalProperties {
         validateDefaultFetchOrientation(order)
         assertState(OperationState.FINISHED)
         setHasResultSet(true)
@@ -261,10 +261,11 @@ abstract class SparkOperation(session: Session)
               getProtocolVersion)
           }
         resultRowSet.setStartRowOffset(iter.getPosition)
-      } catch onError(cancel = true)
+      }
+    } catch onError(cancel = true)
 
-      resultRowSet
-    }
+    resultRowSet
+  }
 
   override def shouldRunAsync: Boolean = false