You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2024/04/08 11:52:36 UTC

(kyuubi) branch master updated: [KYUUBI #6250] Drop support for Spark 3.1

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 42c619a5a [KYUUBI #6250] Drop support for Spark 3.1
42c619a5a is described below

commit 42c619a5ac7aa7f8ce86edda0730a2982b62dc03
Author: liuxiao <li...@qq.com>
AuthorDate: Mon Apr 8 19:52:27 2024 +0800

    [KYUUBI #6250] Drop support for Spark 3.1
    
    # :mag: Description
    ## Issue References ๐Ÿ”—
    
    This pull request fixes #6250
    
    ## Describe Your Solution ๐Ÿ”ง
    
    Please include a summary of the change and which issue is fixed. Please also include relevant motivation and context. List any dependencies that are required for this change.
    
    ## Types of changes :bookmark:
    
    - [ ] Bugfix (non-breaking change which fixes an issue)
    - [ ] New feature (non-breaking change which adds functionality)
    - [x] Breaking change (fix or feature that would cause existing functionality to change)
    
    ## Test Plan ๐Ÿงช
    
    Pass CI
    
    ---
    
    # Checklist ๐Ÿ“
    
    - [ ] This patch was not authored or co-authored using [Generative Tooling](https://www.apache.org/legal/generative-tooling.html)
    
    **Be nice. Be informative.**
    
    Closes #6273 from liuxiaocs7/issue-6250.
    
    Closes #6250
    
    c6ba1e88a [liuxiao] remove unused import
    db887ef9b [liuxiao] inline method
    769da013b [Cheng Pan] Update externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
    21dbd37a7 [liuxiao] remove unused import
    e869d571e [liuxiao] update for miss
    7d755a879 [liuxiao] Drop support for Spark 3.1
    
    Lead-authored-by: liuxiao <li...@qq.com>
    Co-authored-by: Cheng Pan <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .github/workflows/master.yml                       |  5 --
 docs/deployment/migration-guide.md                 |  1 +
 docs/quick_start/quick_start.rst                   |  2 +-
 .../kyuubi/engine/spark/SparkSQLEngine.scala       |  3 -
 .../engine/spark/operation/ExecuteStatement.scala  |  5 +-
 .../engine/spark/operation/FetchOrcStatement.scala | 34 +--------
 .../execution/arrow/KyuubiArrowConverters.scala    |  7 +-
 .../spark/sql/kyuubi/SparkDatasetHelper.scala      | 89 +++++-----------------
 .../kyuubi/engine/spark/WithSparkSQLEngine.scala   |  3 +-
 .../operation/SparkArrowbasedOperationSuite.scala  | 42 ++--------
 10 files changed, 33 insertions(+), 158 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index 1dfcc873e..712bc1b90 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -56,11 +56,6 @@ jobs:
         exclude-tags: [""]
         comment: ["normal"]
         include:
-          - java: 8
-            spark: '3.5'
-            spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.1.3 -Dspark.archive.name=spark-3.1.3-bin-hadoop3.2.tgz -Pzookeeper-3.6'
-            exclude-tags: '-Dmaven.plugin.scalatest.exclude.tags=org.scalatest.tags.Slow,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.IcebergTest,org.apache.kyuubi.tags.DeltaTest,org.apache.kyuubi.tags.HudiTest,org.apache.kyuubi.tags.SparkLocalClusterTest'
-            comment: 'verify-on-spark-3.1-binary'
           - java: 8
             spark: '3.5'
             spark-archive: '-Dspark.archive.mirror=https://archive.apache.org/dist/spark/spark-3.2.4 -Dspark.archive.name=spark-3.2.4-bin-hadoop3.2.tgz -Pzookeeper-3.6'
diff --git a/docs/deployment/migration-guide.md b/docs/deployment/migration-guide.md
index 4767f154d..37e1c9a9d 100644
--- a/docs/deployment/migration-guide.md
+++ b/docs/deployment/migration-guide.md
@@ -20,6 +20,7 @@
 ## Upgrading from Kyuubi 1.9 to 1.10
 
 * Since Kyuubi 1.10, `beeline` is deprecated and will be removed in the future, please use `kyuubi-beeline` instead.
+* Since Kyuubi 1.10, the support of Spark engine for Spark 3.1 is removed.
 * Since Kyuubi 1.10, the support of Flink engine for Flink 1.16 is removed.
 
 ## Upgrading from Kyuubi 1.8 to 1.9
diff --git a/docs/quick_start/quick_start.rst b/docs/quick_start/quick_start.rst
index a77f7ee20..0b954daec 100644
--- a/docs/quick_start/quick_start.rst
+++ b/docs/quick_start/quick_start.rst
@@ -43,7 +43,7 @@ pre-installed and the ``JAVA_HOME`` is correctly set to each component.
   **Kyuubi**       Gateway      \ |release| \        - Kyuubi Server
                    Engine lib                        - Kyuubi Engine
                    Beeline                           - Kyuubi Beeline
-  **Spark**        Engine       3.1 to 3.5           A Spark distribution
+  **Spark**        Engine       3.2 to 3.5           A Spark distribution
   **Flink**        Engine       1.17 to 1.19         A Flink distribution
   **Trino**        Engine       N/A                  A Trino cluster allows to access via trino-client v411
   **Doris**        Engine       N/A                  A Doris cluster
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
index 6dd438ffd..d1331cd02 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/SparkSQLEngine.scala
@@ -381,9 +381,6 @@ object SparkSQLEngine extends Logging {
   }
 
   def main(args: Array[String]): Unit = {
-    if (KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION === "3.1") {
-      warn("The support for Spark 3.1 is deprecated, and will be removed in the next version.")
-    }
     val startedTime = System.currentTimeMillis()
     val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match {
       case Some(t) => t.toLong
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
index bf68f18f0..a52d32be9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/ExecuteStatement.scala
@@ -185,14 +185,13 @@ class ExecuteStatement(
         // Rename all col name to avoid duplicate columns
         val colName = range(0, result.schema.size).map(x => "col" + x)
 
-        val codec = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") "zstd" else "zlib"
         // df.write will introduce an extra shuffle for the outermost limit, and hurt performance
         if (resultMaxRows > 0) {
           result.toDF(colName: _*).limit(resultMaxRows).write
-            .option("compression", codec).format("orc").save(saveFileName.get)
+            .option("compression", "zstd").format("orc").save(saveFileName.get)
         } else {
           result.toDF(colName: _*).write
-            .option("compression", codec).format("orc").save(saveFileName.get)
+            .option("compression", "zstd").format("orc").save(saveFileName.get)
         }
         info(s"Save result to ${saveFileName.get}")
         fetchOrcStatement = Some(new FetchOrcStatement(spark))
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
index 861539b95..64a9855f9 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/FetchOrcStatement.scala
@@ -35,10 +35,7 @@ import org.apache.spark.sql.execution.datasources.RecordReaderIterator
 import org.apache.spark.sql.execution.datasources.orc.OrcDeserializer
 import org.apache.spark.sql.types.StructType
 
-import org.apache.kyuubi.KyuubiException
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
 import org.apache.kyuubi.operation.{FetchIterator, IterableFetchIterator}
-import org.apache.kyuubi.util.reflect.DynConstructors
 
 class FetchOrcStatement(spark: SparkSession) {
 
@@ -62,7 +59,7 @@ class FetchOrcStatement(spark: SparkSession) {
     val fullSchema = orcSchema.map(f =>
       AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
     val unsafeProjection = GenerateUnsafeProjection.generate(fullSchema, fullSchema)
-    val deserializer = getOrcDeserializer(orcSchema, colId)
+    val deserializer = new OrcDeserializer(orcSchema, colId)
     orcIter = new OrcFileIterator(list)
     val iterRow = orcIter.map(value =>
       unsafeProjection(deserializer.deserialize(value)))
@@ -73,35 +70,6 @@ class FetchOrcStatement(spark: SparkSession) {
   def close(): Unit = {
     orcIter.close()
   }
-
-  private def getOrcDeserializer(orcSchema: StructType, colId: Array[Int]): OrcDeserializer = {
-    try {
-      if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
-        // SPARK-34535 changed the constructor signature of OrcDeserializer
-        DynConstructors.builder()
-          .impl(classOf[OrcDeserializer], classOf[StructType], classOf[Array[Int]])
-          .build[OrcDeserializer]()
-          .newInstance(
-            orcSchema,
-            colId)
-      } else {
-        DynConstructors.builder()
-          .impl(
-            classOf[OrcDeserializer],
-            classOf[StructType],
-            classOf[StructType],
-            classOf[Array[Int]])
-          .build[OrcDeserializer]()
-          .newInstance(
-            new StructType,
-            orcSchema,
-            colId)
-      }
-    } catch {
-      case e: Throwable =>
-        throw new KyuubiException("Failed to create OrcDeserializer", e)
-    }
-  }
 }
 
 class OrcFileIterator(fileList: ListBuffer[LocatedFileStatus]) extends Iterator[OrcStruct] {
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
index 4a54180cc..04f4ede6c 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/execution/arrow/KyuubiArrowConverters.scala
@@ -30,7 +30,6 @@ import org.apache.arrow.vector.ipc.message.{IpcOption, MessageSerializer}
 import org.apache.arrow.vector.types.pojo.{Schema => ArrowSchema}
 import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.{InternalRow, SQLConfHelper}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.CollectLimitExec
@@ -158,9 +157,7 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
         val partsToScan =
           partsScanned.until(math.min(partsScanned + numPartsToTry, totalParts))
 
-        // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
-        // drop Spark 3.1 support.
-        val sc = SparkSession.active.sparkContext
+        val sc = collectLimitExec.session.sparkContext
         val res = sc.runJob(
           childRDD,
           (it: Iterator[InternalRow]) => {
@@ -347,6 +344,6 @@ object KyuubiArrowConverters extends SQLConfHelper with Logging {
       largeVarTypes)
   }
 
-  // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.1/3.2
+  // IpcOption.DEFAULT was introduced in ARROW-11081(ARROW-4.0.0), add this for adapt Spark 3.2
   final private val ARROW_IPC_OPTION_DEFAULT = new IpcOption()
 }
diff --git a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
index 7af51abfe..2dbfe7348 100644
--- a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
+++ b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/spark/sql/kyuubi/SparkDatasetHelper.scala
@@ -23,11 +23,10 @@ import org.apache.spark.SparkContext
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.util.{ByteUnit, JavaUtils}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}
-import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.catalyst.plans.logical.GlobalLimit
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
-import org.apache.spark.sql.execution.{CollectLimitExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
+import org.apache.spark.sql.execution.{CollectLimitExec, CommandResultExec, HiveResult, LocalTableScanExec, QueryExecution, SparkPlan, SQLExecution}
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.arrow.KyuubiArrowConverters
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
@@ -38,7 +37,6 @@ import org.apache.kyuubi.engine.spark.KyuubiSparkUtil
 import org.apache.kyuubi.engine.spark.schema.RowSet
 import org.apache.kyuubi.engine.spark.util.SparkCatalogUtils.quoteIfNeeded
 import org.apache.kyuubi.util.reflect.DynMethods
-import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 object SparkDatasetHelper extends Logging {
 
@@ -48,7 +46,7 @@ object SparkDatasetHelper extends Logging {
 
   def executeArrowBatchCollect: SparkPlan => Array[Array[Byte]] = {
     case adaptiveSparkPlan: AdaptiveSparkPlanExec =>
-      executeArrowBatchCollect(finalPhysicalPlan(adaptiveSparkPlan))
+      executeArrowBatchCollect(adaptiveSparkPlan.finalPhysicalPlan)
     // TODO: avoid extra shuffle if `offset` > 0
     case collectLimit: CollectLimitExec if offset(collectLimit) > 0 =>
       logWarning("unsupported offset > 0, an extra shuffle will be introduced.")
@@ -57,9 +55,8 @@ object SparkDatasetHelper extends Logging {
       doCollectLimit(collectLimit)
     case collectLimit: CollectLimitExec if collectLimit.limit < 0 =>
       executeArrowBatchCollect(collectLimit.child)
-    // TODO: replace with pattern match once we drop Spark 3.1 support.
-    case command: SparkPlan if isCommandResultExec(command) =>
-      doCommandResultExec(command)
+    case commandResult: CommandResultExec =>
+      doCommandResultExec(commandResult)
     case localTableScan: LocalTableScanExec =>
       doLocalTableScan(localTableScan)
     case plan: SparkPlan =>
@@ -76,10 +73,8 @@ object SparkDatasetHelper extends Logging {
    */
   def toArrowBatchRdd(plan: SparkPlan): RDD[Array[Byte]] = {
     val schemaCaptured = plan.schema
-    // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
-    // drop Spark 3.1 support.
-    val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
-    val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
+    val maxRecordsPerBatch = plan.session.sessionState.conf.arrowMaxRecordsPerBatch
+    val timeZoneId = plan.session.sessionState.conf.sessionLocalTimeZone
     // note that, we can't pass the lazy variable `maxBatchSize` directly, this is because input
     // arguments are serialized and sent to the executor side for execution.
     val maxBatchSizePerBatch = maxBatchSize
@@ -160,10 +155,8 @@ object SparkDatasetHelper extends Logging {
   }
 
   private def doCollectLimit(collectLimit: CollectLimitExec): Array[Array[Byte]] = {
-    // TODO: SparkPlan.session introduced in SPARK-35798, replace with SparkPlan.session once we
-    // drop Spark 3.1 support.
-    val timeZoneId = SparkSession.active.sessionState.conf.sessionLocalTimeZone
-    val maxRecordsPerBatch = SparkSession.active.sessionState.conf.arrowMaxRecordsPerBatch
+    val timeZoneId = collectLimit.session.sessionState.conf.sessionLocalTimeZone
+    val maxRecordsPerBatch = collectLimit.session.sessionState.conf.arrowMaxRecordsPerBatch
 
     val batches = KyuubiArrowConverters.takeAsArrowBatches(
       collectLimit,
@@ -191,19 +184,13 @@ object SparkDatasetHelper extends Logging {
     result.toArray
   }
 
-  private lazy val commandResultExecRowsMethod = DynMethods.builder("rows")
-    .impl("org.apache.spark.sql.execution.CommandResultExec")
-    .build()
-
-  private def doCommandResultExec(command: SparkPlan): Array[Array[Byte]] = {
-    val spark = SparkSession.active
-    // TODO: replace with `command.rows` once we drop Spark 3.1 support.
-    val rows = commandResultExecRowsMethod.invoke[Seq[InternalRow]](command)
-    command.longMetric("numOutputRows").add(rows.size)
-    sendDriverMetrics(spark.sparkContext, command.metrics)
+  private def doCommandResultExec(commandResult: CommandResultExec): Array[Array[Byte]] = {
+    val spark = commandResult.session
+    commandResult.longMetric("numOutputRows").add(commandResult.rows.size)
+    sendDriverMetrics(spark.sparkContext, commandResult.metrics)
     KyuubiArrowConverters.toBatchIterator(
-      rows.iterator,
-      command.schema,
+      commandResult.rows.iterator,
+      commandResult.schema,
       spark.sessionState.conf.arrowMaxRecordsPerBatch,
       maxBatchSize,
       -1,
@@ -211,7 +198,7 @@ object SparkDatasetHelper extends Logging {
   }
 
   private def doLocalTableScan(localTableScan: LocalTableScanExec): Array[Array[Byte]] = {
-    val spark = SparkSession.active
+    val spark = localTableScan.session
     localTableScan.longMetric("numOutputRows").add(localTableScan.rows.size)
     sendDriverMetrics(spark.sparkContext, localTableScan.metrics)
     KyuubiArrowConverters.toBatchIterator(
@@ -224,31 +211,7 @@ object SparkDatasetHelper extends Logging {
   }
 
   /**
-   * This method provides a reflection-based implementation of
-   * [[AdaptiveSparkPlanExec.finalPhysicalPlan]] that enables us to adapt to the Spark runtime
-   * without patching SPARK-41914.
-   *
-   * TODO: Once we drop support for Spark 3.1.x, we can directly call
-   * [[AdaptiveSparkPlanExec.finalPhysicalPlan]].
-   */
-  def finalPhysicalPlan(adaptiveSparkPlanExec: AdaptiveSparkPlanExec): SparkPlan = {
-    withFinalPlanUpdate(adaptiveSparkPlanExec, identity)
-  }
-
-  /**
-   * A reflection-based implementation of [[AdaptiveSparkPlanExec.withFinalPlanUpdate]].
-   */
-  private def withFinalPlanUpdate[T](
-      adaptiveSparkPlanExec: AdaptiveSparkPlanExec,
-      fun: SparkPlan => T): T = {
-    val plan = invokeAs[SparkPlan](adaptiveSparkPlanExec, "getFinalPhysicalPlan")
-    val result = fun(plan)
-    invokeAs[Unit](adaptiveSparkPlanExec, "finalPlanUpdate")
-    result
-  }
-
-  /**
-   * offset support was add since Spark-3.4(set SPARK-28330), to ensure backward compatibility with
+   * offset support was add in SPARK-28330(3.4.0), to ensure backward compatibility with
    * earlier versions of Spark, this function uses reflective calls to the "offset".
    */
   private def offset(collectLimitExec: CollectLimitExec): Int = {
@@ -261,24 +224,6 @@ object SparkDatasetHelper extends Logging {
       .getOrElse(0)
   }
 
-  private def isCommandResultExec(sparkPlan: SparkPlan): Boolean = {
-    // scalastyle:off line.size.limit
-    // the CommandResultExec was introduced in SPARK-35378 (Spark 3.2), after SPARK-35378 the
-    // physical plan of runnable command is CommandResultExec.
-    // for instance:
-    // ```
-    // scala> spark.sql("show tables").queryExecution.executedPlan
-    // res0: org.apache.spark.sql.execution.SparkPlan =
-    // CommandResult <empty>, [namespace#0, tableName#1, isTemporary#2]
-    //   +- ShowTables [namespace#0, tableName#1, isTemporary#2], V2SessionCatalog(spark_catalog), [default]
-    //
-    // scala > spark.sql("show tables").queryExecution.executedPlan.getClass
-    // res1: Class[_ <: org.apache.spark.sql.execution.SparkPlan] = class org.apache.spark.sql.execution.CommandResultExec
-    // ```
-    // scalastyle:on line.size.limit
-    sparkPlan.getClass.getName == "org.apache.spark.sql.execution.CommandResultExec"
-  }
-
   /**
    * refer to org.apache.spark.sql.Dataset#withAction(), assign a new execution id for arrow-based
    * operation, so that we can track the arrow-based queries on the UI tab.
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
index 3b98c2efb..e6b140704 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/WithSparkSQLEngine.scala
@@ -21,7 +21,6 @@ import org.apache.spark.sql.SparkSession
 
 import org.apache.kyuubi.{KyuubiFunSuite, Utils}
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.spark.KyuubiSparkUtil.SPARK_ENGINE_RUNTIME_VERSION
 
 trait WithSparkSQLEngine extends KyuubiFunSuite {
   protected var spark: SparkSession = _
@@ -35,7 +34,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {
   // Affected by such configuration' default value
   //    engine.initialize.sql='SHOW DATABASES'
   // SPARK-35378
-  protected lazy val initJobId: Int = if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") 1 else 0
+  protected val initJobId: Int = 1
 
   override def beforeAll(): Unit = {
     startSparkEngine()
diff --git a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
index 4e0414824..ba245f50a 100644
--- a/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
+++ b/externals/kyuubi-spark-sql-engine/src/test/scala/org/apache/kyuubi/engine/spark/operation/SparkArrowbasedOperationSuite.scala
@@ -19,7 +19,7 @@ package org.apache.kyuubi.engine.spark.operation
 
 import java.lang.{Boolean => JBoolean}
 import java.sql.Statement
-import java.util.{Locale, Set => JSet}
+import java.util.Locale
 
 import org.apache.spark.{KyuubiSparkContextHelper, TaskContext}
 import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@@ -43,7 +43,6 @@ import org.apache.kyuubi.engine.spark.{SparkSQLEngine, WithSparkSQLEngine}
 import org.apache.kyuubi.engine.spark.session.SparkSessionImpl
 import org.apache.kyuubi.operation.SparkDataTypeTests
 import org.apache.kyuubi.util.reflect.{DynFields, DynMethods}
-import org.apache.kyuubi.util.reflect.ReflectUtils._
 
 class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTypeTests
   with SparkMetricsTestUtils {
@@ -188,12 +187,9 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
       returnSize.foreach { size =>
         val df = spark.sql(s"select * from t_1 limit $size")
         val headPlan = df.queryExecution.executedPlan.collectLeaves().head
-        if (SPARK_ENGINE_RUNTIME_VERSION >= "3.2") {
-          assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec])
-          val finalPhysicalPlan =
-            SparkDatasetHelper.finalPhysicalPlan(headPlan.asInstanceOf[AdaptiveSparkPlanExec])
-          assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec])
-        }
+        assert(headPlan.isInstanceOf[AdaptiveSparkPlanExec])
+        val finalPhysicalPlan = headPlan.asInstanceOf[AdaptiveSparkPlanExec].finalPhysicalPlan
+        assert(finalPhysicalPlan.isInstanceOf[CollectLimitExec])
         if (size > 1000) {
           runAndCheck(df.queryExecution.executedPlan, 1000)
         } else {
@@ -298,11 +294,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     val listener = new JobCountListener
     val l2 = new SQLMetricsListener
     val nodeName = spark.sql("SHOW TABLES").queryExecution.executedPlan.getClass.getName
-    if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
-      assert(nodeName == "org.apache.spark.sql.execution.command.ExecutedCommandExec")
-    } else {
-      assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec")
-    }
+    assert(nodeName == "org.apache.spark.sql.execution.CommandResultExec")
     withJdbcStatement("table_1") { statement =>
       statement.executeQuery("CREATE TABLE table_1 (id bigint) USING parquet")
       withSparkListener(listener) {
@@ -314,15 +306,8 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
       }
     }
 
-    if (SPARK_ENGINE_RUNTIME_VERSION < "3.2") {
-      // Note that before Spark 3.2, a LocalTableScan SparkPlan will be submitted, and the issue of
-      // preventing LocalTableScan from triggering a job submission was addressed in [KYUUBI #4710].
-      assert(l2.queryExecution.executedPlan.getClass.getName ==
-        "org.apache.spark.sql.execution.LocalTableScanExec")
-    } else {
-      assert(l2.queryExecution.executedPlan.getClass.getName ==
-        "org.apache.spark.sql.execution.CommandResultExec")
-    }
+    assert(l2.queryExecution.executedPlan.getClass.getName ==
+      "org.apache.spark.sql.execution.CommandResultExec")
     assert(listener.numJobs == 0)
   }
 
@@ -378,7 +363,6 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
 
   test("post CommandResultExec driver-side metrics") {
     spark.sql("show tables").show(truncate = false)
-    assume(SPARK_ENGINE_RUNTIME_VERSION >= "3.2")
     val expectedMetrics = Map(
       0L -> (("CommandResult", Map("number of output rows" -> "2"))))
     withTables("table_1", "table_2") {
@@ -493,7 +477,7 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
       }
     }
     (keys, values).zipped.foreach { (k, v) =>
-      if (isStaticConfigKey(k)) {
+      if (SQLConf.isStaticConfigKey(k)) {
         throw new KyuubiException(s"Cannot modify the value of a static config: $k")
       }
       conf.setConfString(k, v)
@@ -521,16 +505,6 @@ class SparkArrowbasedOperationSuite extends WithSparkSQLEngine with SparkDataTyp
     }
   }
 
-  /**
-   * This method provides a reflection-based implementation of [[SQLConf.isStaticConfigKey]] to
-   * adapt Spark 3.1
-   *
-   * TODO: Once we drop support for Spark 3.1, we can directly call
-   * [[SQLConf.isStaticConfigKey()]].
-   */
-  private def isStaticConfigKey(key: String): Boolean =
-    getField[JSet[String]]((SQLConf.getClass, SQLConf), "staticConfKeys").contains(key)
-
   // the signature of function [[ArrowConverters.fromBatchIterator]] is changed in SPARK-43528
   // (since Spark 3.5)
   private lazy val fromBatchIteratorMethod = DynMethods.builder("fromBatchIterator")