You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/16 12:22:37 UTC
[kyuubi] branch master updated: [KYUUBI #4415][FOLLOWUP] Align the operation handle in server/engine for ExecuteScala, ExecutePython and PlanOnlyStatement
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 514a6a961 [KYUUBI #4415][FOLLOWUP] Align the operation handle in server/engine for ExecuteScala, ExecutePython and PlanOnlyStatement
514a6a961 is described below
commit 514a6a961ade407c96059e1eec8eeb19253a3d5c
Author: fwang12 <fw...@ebay.com>
AuthorDate: Tue May 16 20:22:26 2023 +0800
[KYUUBI #4415][FOLLOWUP] Align the operation handle in server/engine for ExecuteScala, ExecutePython and PlanOnlyStatement
### _Why are the changes needed?_
As title.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4832 from turboFei/scala_python_handle.
Closes #4415
a5a44dfa0 [fwang12] ut
eaf7f004f [fwang12] ut
c8d7a5c82 [fwang12] save
Authored-by: fwang12 <fw...@ebay.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../engine/spark/operation/ExecutePython.scala | 5 +++--
.../engine/spark/operation/ExecuteScala.scala | 5 +++--
.../engine/spark/operation/PlanOnlyStatement.scala | 5 +++--
.../spark/operation/SparkSQLOperationManager.scala | 6 ++---
.../operation/KyuubiOperationPerUserSuite.scala | 26 +++++++++++++++++-----
5 files changed, 33 insertions(+), 14 deletions(-)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
index 0d01348a7..badd83530 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecutePython.scala
@@ -40,7 +40,7 @@ import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SPARK_PYTHON_ENV_ARCHIVE, ENGINE_SPARK_PYTHON_ENV_ARCHIVE_EXEC_PATH, ENGINE_SPARK_PYTHON_HOME_ARCHIVE}
import org.apache.kyuubi.config.KyuubiReservedKeys.{KYUUBI_SESSION_USER_KEY, KYUUBI_STATEMENT_ID_KEY}
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
-import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -49,7 +49,8 @@ class ExecutePython(
override val statement: String,
override val shouldRunAsync: Boolean,
queryTimeout: Long,
- worker: SessionPythonWorker) extends SparkOperation(session) {
+ worker: SessionPythonWorker,
+ override protected val handle: OperationHandle) extends SparkOperation(session) {
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
override def getOperationLog: Option[OperationLog] = Option(operationLog)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
index 24e17d281..691c4fb32 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteScala.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.engine.spark.KyuubiSparkUtil._
import org.apache.kyuubi.engine.spark.repl.KyuubiSparkILoop
-import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationState}
+import org.apache.kyuubi.operation.{ArrayFetchIterator, OperationHandle, OperationState}
import org.apache.kyuubi.operation.log.OperationLog
import org.apache.kyuubi.session.Session
@@ -51,7 +51,8 @@ class ExecuteScala(
repl: KyuubiSparkILoop,
override val statement: String,
override val shouldRunAsync: Boolean,
- queryTimeout: Long)
+ queryTimeout: Long,
+ override protected val handle: OperationHandle)
extends SparkOperation(session) {
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
index c5a7679d6..726b7b0c2 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/PlanOnlyStatement.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf.{OPERATION_PLAN_ONLY_EXCLUDES, OPERATION_PLAN_ONLY_OUT_STYLE}
-import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
+import org.apache.kyuubi.operation.{AnalyzeMode, ArrayFetchIterator, ExecutionMode, IterableFetchIterator, JsonStyle, OperationHandle, OptimizeMode, OptimizeWithStatsMode, ParseMode, PhysicalMode, PlainStyle, PlanOnlyMode, PlanOnlyStyle, UnknownMode, UnknownStyle}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
import org.apache.kyuubi.operation.PlanOnlyStyle.{notSupportedStyleError, unknownStyleError}
import org.apache.kyuubi.operation.log.OperationLog
@@ -36,7 +36,8 @@ import org.apache.kyuubi.session.Session
class PlanOnlyStatement(
session: Session,
override val statement: String,
- mode: PlanOnlyMode)
+ mode: PlanOnlyMode,
+ override protected val handle: OperationHandle)
extends SparkOperation(session) {
private val operationLog: OperationLog = OperationLog.createOperationLog(session, getHandle)
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index 8fd58b338..cb444aa77 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -106,18 +106,18 @@ class SparkSQLOperationManager private (name: String) extends OperationManager(n
opHandle)
}
case mode =>
- new PlanOnlyStatement(session, statement, mode)
+ new PlanOnlyStatement(session, statement, mode, opHandle)
}
case OperationLanguages.SCALA =>
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
- new ExecuteScala(session, repl, statement, runAsync, queryTimeout)
+ new ExecuteScala(session, repl, statement, runAsync, queryTimeout, opHandle)
case OperationLanguages.PYTHON =>
try {
ExecutePython.init()
val worker = sessionToPythonProcess.getOrElseUpdate(
session.handle,
ExecutePython.createSessionPythonWorker(spark, session))
- new ExecutePython(session, statement, runAsync, queryTimeout, worker)
+ new ExecutePython(session, statement, runAsync, queryTimeout, worker, opHandle)
} catch {
case e: Throwable =>
spark.conf.set(OPERATION_LANGUAGE.key, OperationLanguages.SQL.toString)
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
index f86d75247..2ae2340b2 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiOperationPerUserSuite.scala
@@ -330,11 +330,27 @@ class KyuubiOperationPerUserSuite
eventually(timeout(10.seconds)) {
assert(session.handle === SessionHandle.apply(session.client.remoteSessionHandle))
}
- val opHandle = session.executeStatement("SELECT engine_id()", Map.empty, true, 0L)
- eventually(timeout(10.seconds)) {
- val operation = session.sessionManager.operationManager.getOperation(
- opHandle).asInstanceOf[KyuubiOperation]
- assert(opHandle == OperationHandle.apply(operation.remoteOpHandle()))
+
+ def checkOpHandleAlign(statement: String, confOverlay: Map[String, String]): Unit = {
+ val opHandle = session.executeStatement(statement, confOverlay, true, 0L)
+ eventually(timeout(10.seconds)) {
+ val operation = session.sessionManager.operationManager.getOperation(
+ opHandle).asInstanceOf[KyuubiOperation]
+ assert(opHandle == OperationHandle.apply(operation.remoteOpHandle()))
+ }
+ }
+
+ val statement = "SELECT engine_id()"
+
+ val confOverlay = Map(KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> "PARSE")
+ checkOpHandleAlign(statement, confOverlay)
+
+ Map(
+ statement -> "SQL",
+ s"""spark.sql("$statement")""" -> "SCALA",
+ s"spark.sql('$statement')" -> "PYTHON").foreach { case (statement, lang) =>
+ val confOverlay = Map(KyuubiConf.OPERATION_LANGUAGE.key -> lang)
+ checkOpHandleAlign(statement, confOverlay)
}
}
}