You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/16 12:22:37 UTC

[kyuubi] branch master updated: [KYUUBI #4415][FOLLOWUP] Align the operation handle in server/engine for ExecuteScala, ExecutePython and PlanOnlyStatement

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 514a6a961 [KYUUBI #4415][FOLLOWUP] Align the operation handle in server/engine for ExecuteScala, ExecutePython and PlanOnlyStatement
514a6a961 is described below

commit 514a6a961ade407c96059e1eec8eeb19253a3d5c
Author: fwang12 <fw...@ebay.com>
AuthorDate: Tue May 16 20:22:26 2023 +0800

    [KYUUBI #4415][FOLLOWUP] Align the operation handle in server/engine for ExecuteScala, ExecutePython and PlanOnlyStatement
    
    ### _Why are the changes needed?_
    
    As title.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4832 from turboFei/scala_python_handle.
    
    Closes #4415
    
    a5a44dfa0 [fwang12] ut
    eaf7f004f [fwang12] ut
    c8d7a5c82 [fwang12] save
    
    Authored-by: fwang12 <fw...@ebay.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../engine/spark/operation/ExecutePython.scala     |  5 +++--
 .../engine/spark/operation/ExecuteScala.scala      |  5 +++--
 .../engine/spark/operation/PlanOnlyStatement.scala |  5 +++--
 .../spark/operation/SparkSQLOperationManager.scala |  6 ++---
 .../operation/KyuubiOperationPerUserSuite.scala    | 26 +++++++++++++++++-----
 5 files changed, 33 insertions(+), 14 deletions(-)

diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 0d01348a7..badd83530 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -40,7 +40,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
 import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE}
 import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY}
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, OperationState}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
@@ -49,7 +49,8 @@ class ExecutePython(
     override val statement: String,
     override val shouldRunAsync: Boolean,
     queryTimeout: Long,
-    worker: SessionPythonWorker) extends SparkOperation(session) {
+    worker: SessionPythonWorker,
+    override protected val handle: OperationHandle) extends SparkOperation(session) {
 
   private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
   override def getOperationLog: Option[OperationLog] = Option(operationLog)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index 24e17d281..691c4fb32 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
 import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
-import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, OperationState}
 import org.apache.kyuubi.operation.log.OperationLog
 import org.apache.kyuubi.session.Session
 
@@ -51,7 +51,8 @@ class ExecuteScala(
     repl: KyuubiSparkILoop,
     override val statement: String,
     override val shouldRunAsync: Boolean,
-    queryTimeout: Long)
+    queryTimeout: Long,
+    override protected val handle: OperationHandle)
   extends SparkOperation(session) {
 
   private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index c5a7679d6..726b7b0c2 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types.StructType
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
-import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
+import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
 import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
 import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
 import org.apache.kyuubi.operation.log.OperationLog
@@ -36,7 +36,8 @@ import org.apache.kyuubi.session.Session
 class PlanOnlyStatement(
     session: Session,
     override val statement: String,
-    mode: PlanOnlyMode)
+    mode: PlanOnlyMode,
+    override protected val handle: OperationHandle)
   extends SparkOperation(session) {
 
   private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index 8fd58b338..cb444aa77 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -106,18 +106,18 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
                     opHandle)
               }
             case mode =>
-              new PlanOnlyStatement(session, statement, mode)
+              new PlanOnlyStatement(session, statement, mode, opHandle)
           }
         case OperationLanguages.SCALA =>
           val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
-          new ExecuteScala(session, repl, statement, runAsync, queryTimeout)
+          new ExecuteScala(session, repl, statement, runAsync, queryTimeout, opHandle)
         case OperationLanguages.PYTHON =>
           try {
             ExecutePython.init()
             val worker = sessionToPythonProcess.getOrElseUpdate(
               session.handle,
               ExecutePython.createSessionPythonWorker(spark, session))
-            new ExecutePython(session, statement, runAsync, queryTimeout, worker)
+            new ExecutePython(session, statement, runAsync, queryTimeout, worker, opHandle)
           } catch {
             case e: Throwable =>
               spark.conf.set(OPERATION_LANGUAGE.key, OperationLanguages.SQL.toString)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index f86d75247..2ae2340b2 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -330,11 +330,27 @@ class KyuubiOperationPerUserSuite
         eventually(timeout(10.seconds)) {
           assert(session.handle === SessionHandle.apply(session.client.remoteSessionHandle))
         }
-        val opHandle = session.executeStatement("SELECT engine_id()", Map.empty, true, 0L)
-        eventually(timeout(10.seconds)) {
-          val operation = session.sessionManager.operationManager.getOperation(
-            opHandle).asInstanceOf[KyuubiOperation]
-          assert(opHandle == OperationHandle.apply(operation.remoteOpHandle()))
+
+        def checkOpHandleAlign(statement: String, confOverlay: Map[String, String]): Unit = {
+          val opHandle = session.executeStatement(statement, confOverlay, true, 0L)
+          eventually(timeout(10.seconds)) {
+            val operation = session.sessionManager.operationManager.getOperation(
+              opHandle).asInstanceOf[KyuubiOperation]
+            assert(opHandle == OperationHandle.apply(operation.remoteOpHandle()))
+          }
+        }
+
+        val statement = "SELECT engine_id()"
+
+        val confOverlay = Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> "PARSE")
+        checkOpHandleAlign(statement, confOverlay)
+
+        Map(
+          statement -> "SQL",
+          s"""spark.sql("$statement")""" -> "SCALA",
+          s"spark.sql('$statement')" -> "PYTHON").foreach { case (statement, lang) =>
+          val confOverlay = Map(KyuubiConf.OPERATION_LANGUAGE.key -> lang)
+          checkOpHandleAlign(statement, confOverlay)
         }
       }
     }