You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/17 12:11:33 UTC

[kyuubi] branch master updated: [KYUUBI #4367] Support Flink 1.17

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 553b2aafe [KYUUBI #4367] Support Flink 1.17
553b2aafe is described below

commit 553b2aafe7a29546ee410f44640d8e92cdf58951
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Apr 17 20:11:22 2023 +0800

    [KYUUBI #4367] Support Flink 1.17
    
    ### _Why are the changes needed?_
    
    Support Flink 1.17 and Flink SQL gateway.
    
    1. Drop Flink 1.15
    2. Migrate API to Flink SQL Gateway
    3. Support Flink 1.17
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #4368 from link3280/flink-1.17.
    
    Closes #4367
    
    8eb4da6c0 [Paul Lin] [KYUUBI #4367] Fix test failure
    81a10f6be [Paul Lin] [KYUUBI #4367] Fix test failure
    23d87ba1d [Paul Lin] [KYUUBI #4367] Rename delegation package to shim
    5c9d0aa84 [Paul Lin] [KYUUBI #4367] Improve code style
    56567fcd7 [Paul Lin] [KYUUBI #4367] Improve java.long.Long usage
    417d37b27 [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
    08f89991a [Paul Lin] [KYUUBI #4367] Fix ambiguous reference
    ed950600c [Paul Lin] [KYUUBI #4367] Fix spotless
    7b28eaf11 [Paul Lin] [KYUUBI #4367] Improve code style for iterations
    c2a23d571 [Paul Lin] [KYUUBI #4367] Improve code style for error messages and iterations
    7e36e70c7 [Paul Lin] [KYUUBI #4367] Improve code style for java.lang.Boolean
    4ef8c5209 [Paul Lin] [KYUUBI #4367] Improve code style for java.util.*
    8530aec2a [Paul Lin] [KYUUBI #4367] Remove unnecessary classes
    1c41e4400 [Paul Lin] [KYUUBI #4367] Remove unnecessary variables
    33eeb37ee [Paul Lin] [KYUUBI #4367] Remove unnecessary reflection code
    e1e5cd2cf [Paul Lin] [KYUUBI #4367] Fix IncompatibleClassChangeError
    3520a5153 [Paul Lin] [KYUUBI #4367] Fix IncompatibleClassChangeError
    42cce7a54 [Paul Lin] [KYUUBI #4367] Replace vanilla reflection with kyuubi refection tools
    20e9913e3 [Paul Lin] [KYUUBI #4367] Fix FlinkProcessBuilder test error
    a02e01adf [Paul Lin] [KYUUBI #4367] Improve code style
    20e1a559e [Paul Lin] [KYUUBI #4367] Use kyuubi refection tools
    9b2072e45 [Paul Lin] [KYUUBI #4367] Improve flink version match
    7ce1e9a12 [Paul Lin] [KYUUBI #4367] Fix local engine tagged as YARN app
    fd0c88d15 [Paul Lin] Revert "[KYUUBI #4367] Filter out non kyuubi prefixed conf in flink login engine"
    f71c6014e [Paul Lin] [KYUUBI #4367] Fix local engine tagged as YARN app
    b7d46f57d [Paul Lin] [KYUUBI #4367] Filter out non kyuubi prefixed conf in flink login engine
    47beb1a78 [Paul Lin] [KYUUBI #4367] Refactor Flink engine tests
    7e1a198ca [Paul Lin] [KYUUBI #4367] Fix flink sql gateway jar not included in local mode
    e851d9732 [Paul Lin] [KYUUBI #4367] Disable query id test for flink 1.16
    7291e27fa [Paul Lin] [KYUUBI #4367] Remove profile for flink-1.15
    54cfe3bbc [Paul Lin] [KYUUBI #4367] Fix udf not found in local flink engine tests
    1a7833bf2 [Paul Lin] [KYUUBI #4367] Fix test failure in PlanOnlyStatementSuit
    700ee04db [Paul Lin] [KYUUBI #4367] Fix FLINK_CONF_DIR not set in ut
    b685ff139 [Paul Lin] [KYUUBI #4367] Improve code style
    29728c042 [Paul Lin] [KYUUBI #4367] Fix Flink conf dir not found
    799c93876 [Paul Lin] [KYUUBI #4367] Fix NoSuchFieldException
    614ecc335 [Paul Lin] [KYUUBI #4367] Fix reflection failures
    6a08d0bbe [Paul Lin] [KYUUBI #4367] Fix NPE in dependencies
    d289495c0 [Paul Lin] [KYUUBI #4367] Flink FlinkSQLEngine capabilities with Flink 1.16
    ef6f4d4ff [Paul Lin] [KYUUBI #4367] Remove support for Flink 1.15
    e18b3c2ed [Paul Lin] [KYUUBI #4367] Fix Flink SessionManager compatibility issue
    49e0a94be [Paul Lin] feat: Support Flink 1.17
    
    Authored-by: Paul Lin <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .github/workflows/master.yml                       |   8 +-
 externals/kyuubi-flink-sql-engine/pom.xml          |   6 +
 .../kyuubi/engine/flink/FlinkEngineUtils.scala     | 151 ++++++++++-----
 .../engine/flink/FlinkSQLBackendService.scala      |   2 +-
 .../kyuubi/engine/flink/FlinkSQLEngine.scala       |  55 +-----
 .../engine/flink/operation/ExecuteStatement.scala  | 203 +--------------------
 .../engine/flink/operation/FlinkOperation.scala    |  13 +-
 .../flink/operation/FlinkSQLOperationManager.scala |  14 +-
 .../engine/flink/operation/GetCatalogs.scala       |   6 +-
 .../kyuubi/engine/flink/operation/GetColumns.scala |  11 +-
 .../engine/flink/operation/GetCurrentCatalog.scala |   3 +-
 .../flink/operation/GetCurrentDatabase.scala       |   3 +-
 .../engine/flink/operation/GetFunctions.scala      |  18 +-
 .../engine/flink/operation/GetPrimaryKeys.scala    |  19 +-
 .../kyuubi/engine/flink/operation/GetSchemas.scala |  11 +-
 .../kyuubi/engine/flink/operation/GetTables.scala  |   6 +-
 .../engine/flink/operation/OperationUtils.scala    | 172 -----------------
 .../engine/flink/operation/PlanOnlyStatement.scala |  28 +--
 .../engine/flink/operation/SetCurrentCatalog.scala |   4 +-
 .../flink/operation/SetCurrentDatabase.scala       |   4 +-
 .../kyuubi/engine/flink/result/ResultSet.scala     |  13 +-
 .../kyuubi/engine/flink/result/ResultSetUtil.scala |  71 +++++++
 .../flink/session/FlinkSQLSessionManager.scala     |  33 +++-
 .../engine/flink/session/FlinkSessionImpl.scala    |  21 ++-
 .../kyuubi/engine/flink/shim/FlinkResultSet.scala  |  78 ++++++++
 .../engine/flink/shim/FlinkSessionManager.scala    | 130 +++++++++++++
 .../engine/flink/WithDiscoveryFlinkSQLEngine.scala |  26 +--
 .../engine/flink/WithFlinkSQLEngineLocal.scala     | 190 ++++++++++++++++---
 .../engine/flink/WithFlinkSQLEngineOnYarn.scala    |   2 +-
 .../engine/flink/WithFlinkTestResources.scala      |   7 +-
 .../flink/operation/FlinkOperationLocalSuite.scala |  30 ++-
 .../operation/FlinkOperationOnYarnSuite.scala      |  25 ++-
 .../flink/operation/FlinkOperationSuite.scala      |  61 +++++--
 .../flink/operation/PlanOnlyOperationSuite.scala   |  22 ++-
 .../kyuubi/engine/KyuubiApplicationManager.scala   |   2 +-
 .../kyuubi/engine/flink/FlinkProcessBuilder.scala  |  21 ++-
 .../engine/flink/FlinkProcessBuilderSuite.scala    |   5 +-
 pom.xml                                            |  16 +-
 38 files changed, 844 insertions(+), 646 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index ece87e265..04ecb1a60 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -157,15 +157,15 @@ jobs:
           - 8
           - 11
         flink:
-          - '1.15'
           - '1.16'
+          - '1.17'
         flink-archive: [ "" ]
         comment: [ "normal" ]
         include:
           - java: 8
-            flink: '1.16'
-            flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.15.4 -Dflink.archive.name=flink-1.15.4-bin-scala_2.12.tgz'
-            comment: 'verify-on-flink-1.15-binary'
+            flink: '1.17'
+            flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.16.1 -Dflink.archive.name=flink-1.16.1-bin-scala_2.12.tgz'
+            comment: 'verify-on-flink-1.16-binary'
     steps:
       - uses: actions/checkout@v3
       - name: Tune Runner VM
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index 0e499f978..c73310a64 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -75,6 +75,12 @@
             <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-sql-gateway</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-table-common</artifactId>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 69fc8c695..f9289ea81 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -18,29 +18,37 @@
 package org.apache.kyuubi.engine.flink
 
 import java.io.File
+import java.lang.{Boolean => JBoolean}
 import java.net.URL
+import java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
 
 import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
 
-import org.apache.commons.cli.{CommandLine, DefaultParser, Option, Options, ParseException}
+import org.apache.commons.cli.{CommandLine, DefaultParser, Options}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI, GenericCLI}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.core.fs.Path
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.table.client.SqlClientException
-import org.apache.flink.table.client.cli.CliOptions
+import org.apache.flink.table.client.cli.CliOptionsParser
 import org.apache.flink.table.client.cli.CliOptionsParser._
-import org.apache.flink.table.client.gateway.context.SessionContext
-import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.table.gateway.service.context.{DefaultContext, SessionContext}
+import org.apache.flink.table.gateway.service.result.ResultFetcher
+import org.apache.flink.table.gateway.service.session.Session
+import org.apache.flink.util.JarUtils
 
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
 import org.apache.kyuubi.engine.SemanticVersion
+import org.apache.kyuubi.reflection.{DynConstructors, DynFields, DynMethods}
 
 object FlinkEngineUtils extends Logging {
 
-  val MODE_EMBEDDED = "embedded"
-  val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options);
+  val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options)
 
   val SUPPORTED_FLINK_VERSIONS: Array[SemanticVersion] =
-    Array("1.15", "1.16").map(SemanticVersion.apply)
+    Array("1.16", "1.17").map(SemanticVersion.apply)
 
   def checkFlinkVersion(): Unit = {
     val flinkVersion = EnvironmentInformation.getVersion
@@ -62,47 +70,106 @@ object FlinkEngineUtils extends Logging {
   def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
     SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
 
-  def parseCliOptions(args: Array[String]): CliOptions = {
-    val (mode, modeArgs) =
-      if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
-      else (args(0), args.drop(1))
-    val options = parseEmbeddedModeClient(modeArgs)
-    if (mode == MODE_EMBEDDED) {
-      if (options.isPrintHelp) {
-        printHelpEmbeddedModeClient()
+  /**
+   * Copied and modified from [[org.apache.flink.table.client.cli.CliOptionsParser]]
+   * to avoid loading flink-python classes which we doesn't support yet.
+   */
+  private def discoverDependencies(
+      jars: JList[URL],
+      libraries: JList[URL]): JList[URL] = {
+    val dependencies: JList[URL] = new JArrayList[URL]
+    try { // find jar files
+      for (url <- jars) {
+        JarUtils.checkJarFile(url)
+        dependencies.add(url)
       }
-      options
+      // find jar files in library directories
+      libraries.foreach { libUrl =>
+        val dir: File = new File(libUrl.toURI)
+        if (!dir.isDirectory) throw new SqlClientException(s"Directory expected: $dir")
+        if (!dir.canRead) throw new SqlClientException(s"Directory cannot be read: $dir")
+        val files: Array[File] = dir.listFiles
+        if (files == null) throw new SqlClientException(s"Directory cannot be read: $dir")
+        files.filter { f => f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar") }
+          .foreach { f =>
+            val url: URL = f.toURI.toURL
+            JarUtils.checkJarFile(url)
+            dependencies.add(url)
+          }
+      }
+    } catch {
+      case e: Exception =>
+        throw new SqlClientException("Could not load all required JAR files.", e)
+    }
+    dependencies
+  }
+
+  def getDefaultContext(
+      args: Array[String],
+      flinkConf: Configuration,
+      flinkConfDir: String): DefaultContext = {
+    val parser = new DefaultParser
+    val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
+    val jars: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_JAR))
+      .getOrElse(JCollections.emptyList())
+    val libDirs: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_LIBRARY))
+      .getOrElse(JCollections.emptyList())
+    val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      val commandLines: JList[CustomCommandLine] =
+        Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
+      DynConstructors.builder()
+        .impl(
+          classOf[DefaultContext],
+          classOf[Configuration],
+          classOf[JList[CustomCommandLine]])
+        .build()
+        .newInstance(flinkConf, commandLines)
+        .asInstanceOf[DefaultContext]
+    } else if (FlinkEngineUtils.isFlinkVersionEqualTo("1.17")) {
+      DynMethods.builder("load")
+        .impl(
+          classOf[DefaultContext],
+          classOf[Configuration],
+          classOf[JList[URL]],
+          classOf[Boolean],
+          classOf[Boolean])
+        .buildStatic()
+        .invoke[DefaultContext](
+          flinkConf,
+          dependencies,
+          new JBoolean(true),
+          new JBoolean(false))
     } else {
-      throw new SqlClientException("Other mode is not supported yet.")
+      throw new KyuubiException(
+        s"Flink version ${EnvironmentInformation.getVersion} are not supported currently.")
     }
   }
 
-  def getSessionContext(localExecutor: LocalExecutor, sessionId: String): SessionContext = {
-    val method = classOf[LocalExecutor].getDeclaredMethod("getSessionContext", classOf[String])
-    method.setAccessible(true)
-    method.invoke(localExecutor, sessionId).asInstanceOf[SessionContext]
+  def getSessionContext(session: Session): SessionContext = {
+    DynFields.builder()
+      .hiddenImpl(classOf[Session], "sessionContext")
+      .build()
+      .get(session)
+      .asInstanceOf[SessionContext]
   }
 
-  def parseEmbeddedModeClient(args: Array[String]): CliOptions =
+  def getResultJobId(resultFetch: ResultFetcher): Option[JobID] = {
+    if (FlinkEngineUtils.isFlinkVersionAtMost("1.16")) {
+      return None
+    }
     try {
-      val parser = new DefaultParser
-      val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
-      val jarUrls = checkUrls(line, OPTION_JAR)
-      val libraryUrls = checkUrls(line, OPTION_LIBRARY)
-      new CliOptions(
-        line.hasOption(OPTION_HELP.getOpt),
-        checkSessionId(line),
-        checkUrl(line, OPTION_INIT_FILE),
-        checkUrl(line, OPTION_FILE),
-        if (jarUrls != null && jarUrls.nonEmpty) jarUrls.asJava else null,
-        if (libraryUrls != null && libraryUrls.nonEmpty) libraryUrls.asJava else null,
-        line.getOptionValue(OPTION_UPDATE.getOpt),
-        line.getOptionValue(OPTION_HISTORY.getOpt),
-        null)
+      Option(DynFields.builder()
+        .hiddenImpl(classOf[ResultFetcher], "jobID")
+        .build()
+        .get(resultFetch)
+        .asInstanceOf[JobID])
     } catch {
-      case e: ParseException =>
-        throw new SqlClientException(e.getMessage)
+      case _: NullPointerException => None
+      case e: Throwable =>
+        throw new IllegalStateException("Unexpected error occurred while fetching query ID", e)
     }
+  }
 
   def checkSessionId(line: CommandLine): String = {
     val sessionId = line.getOptionValue(OPTION_SESSION.getOpt)
@@ -111,13 +178,13 @@ object FlinkEngineUtils extends Logging {
     } else sessionId
   }
 
-  def checkUrl(line: CommandLine, option: Option): URL = {
-    val urls: List[URL] = checkUrls(line, option)
+  def checkUrl(line: CommandLine, option: org.apache.commons.cli.Option): URL = {
+    val urls: JList[URL] = checkUrls(line, option)
     if (urls != null && urls.nonEmpty) urls.head
     else null
   }
 
-  def checkUrls(line: CommandLine, option: Option): List[URL] = {
+  def checkUrls(line: CommandLine, option: org.apache.commons.cli.Option): JList[URL] = {
     if (line.hasOption(option.getOpt)) {
       line.getOptionValues(option.getOpt).distinct.map((url: String) => {
         checkFilePath(url)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
index d049e3c80..9802f1955 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
@@ -17,7 +17,7 @@
 
 package org.apache.kyuubi.engine.flink
 
-import org.apache.flink.table.client.gateway.context.DefaultContext
+import org.apache.flink.table.gateway.service.context.DefaultContext
 
 import org.apache.kyuubi.engine.flink.session.FlinkSQLSessionManager
 import org.apache.kyuubi.service.AbstractBackendService
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 42061a369..48a354b0f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -18,22 +18,17 @@
 package org.apache.kyuubi.engine.flink
 
 import java.io.File
-import java.net.URL
 import java.nio.file.Paths
 import java.time.Instant
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
-import org.apache.flink.client.cli.{DefaultCLI, GenericCLI}
 import org.apache.flink.configuration.{Configuration, DeploymentOptions, GlobalConfiguration}
 import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.client.SqlClientException
-import org.apache.flink.table.client.gateway.context.DefaultContext
-import org.apache.flink.util.JarUtils
+import org.apache.flink.table.gateway.service.context.DefaultContext
 
-import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
+import org.apache.kyuubi.{Logging, Utils}
 import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
 import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
@@ -118,18 +113,9 @@ object FlinkSQLEngine extends Logging {
           debug(s"Skip generating app name for execution target $other")
       }
 
-      val cliOptions = FlinkEngineUtils.parseCliOptions(args)
-      val jars = if (cliOptions.getJars != null) cliOptions.getJars.asScala else List.empty
-      val libDirs =
-        if (cliOptions.getLibraryDirs != null) cliOptions.getLibraryDirs.asScala else List.empty
-      val dependencies = discoverDependencies(jars, libDirs)
-      val engineContext = new DefaultContext(
-        dependencies.asJava,
-        flinkConf,
-        Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava)
-
       kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
 
+      val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf, flinkConfDir)
       startEngine(engineContext)
       info("Flink engine started")
 
@@ -146,7 +132,7 @@ object FlinkSQLEngine extends Logging {
           engine.stop()
         }
       case t: Throwable =>
-        error("Create FlinkSQL Engine Failed", t)
+        error("Failed to create FlinkSQL Engine", t)
     }
   }
 
@@ -167,37 +153,4 @@ object FlinkSQLEngine extends Logging {
     res.await()
     info("Initial Flink SQL finished.")
   }
-
-  private def discoverDependencies(
-      jars: Seq[URL],
-      libraries: Seq[URL]): List[URL] = {
-    try {
-      var dependencies: ListBuffer[URL] = ListBuffer()
-      // find jar files
-      jars.foreach { url =>
-        JarUtils.checkJarFile(url)
-        dependencies = dependencies += url
-      }
-      // find jar files in library directories
-      libraries.foreach { libUrl =>
-        val dir: File = new File(libUrl.toURI)
-        if (!dir.isDirectory) throw new SqlClientException("Directory expected: " + dir)
-        else if (!dir.canRead) throw new SqlClientException("Directory cannot be read: " + dir)
-        val files: Array[File] = dir.listFiles
-        if (files == null) throw new SqlClientException("Directory cannot be read: " + dir)
-        files.foreach { f =>
-          // only consider jars
-          if (f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar")) {
-            val url: URL = f.toURI.toURL
-            JarUtils.checkJarFile(url)
-            dependencies = dependencies += url
-          }
-        }
-      }
-      dependencies.toList
-    } catch {
-      case e: Exception =>
-        throw KyuubiSQLException(s"Could not load all required JAR files.", e)
-    }
-  }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 10ad5bf6d..4042756b6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,32 +17,15 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
-import java.time.{LocalDate, LocalTime}
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.flink.api.common.JobID
-import org.apache.flink.table.api.ResultKind
-import org.apache.flink.table.client.gateway.TypedResult
-import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
-import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
-import org.apache.flink.table.operations.command._
-import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.logical._
-import org.apache.flink.types.Row
+import org.apache.flink.table.gateway.api.operation.OperationHandle
 
 import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
-import org.apache.kyuubi.engine.flink.result.ResultSet
-import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.engine.flink.result.ResultSetUtil
 import org.apache.kyuubi.operation.OperationState
 import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.reflection.DynMethods
 import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.RowSetUtils
 
 class ExecuteStatement(
     session: Session,
@@ -77,22 +60,11 @@ class ExecuteStatement(
   private def executeStatement(): Unit = {
     try {
       setState(OperationState.RUNNING)
-      val operation = executor.parseStatement(sessionId, statement)
-      operation match {
-        case queryOperation: QueryOperation => runQueryOperation(queryOperation)
-        case modifyOperation: ModifyOperation => runModifyOperation(modifyOperation)
-        case setOperation: SetOperation =>
-          resultSet = OperationUtils.runSetOperation(setOperation, executor, sessionId)
-        case resetOperation: ResetOperation =>
-          resultSet = OperationUtils.runResetOperation(resetOperation, executor, sessionId)
-        case addJarOperation: AddJarOperation if isFlinkVersionAtMost("1.15") =>
-          resultSet = OperationUtils.runAddJarOperation(addJarOperation, executor, sessionId)
-        case removeJarOperation: RemoveJarOperation =>
-          resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation, executor, sessionId)
-        case showJarsOperation: ShowJarsOperation if isFlinkVersionAtMost("1.15") =>
-          resultSet = OperationUtils.runShowJarOperation(showJarsOperation, executor, sessionId)
-        case operation: Operation => runOperation(operation)
-      }
+      val resultFetcher = executor.executeStatement(
+        new OperationHandle(getHandle.identifier),
+        statement)
+      jobId = FlinkEngineUtils.getResultJobId(resultFetcher)
+      resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows)
       setState(OperationState.FINISHED)
     } catch {
       onError(cancel = true)
@@ -100,163 +72,4 @@ class ExecuteStatement(
       shutdownTimeoutMonitor()
     }
   }
-
-  private def runQueryOperation(operation: QueryOperation): Unit = {
-    var resultId: String = null
-    try {
-      val resultDescriptor = executor.executeQuery(sessionId, operation)
-      val dataTypes = resultDescriptor.getResultSchema.getColumnDataTypes.asScala.toList
-
-      resultId = resultDescriptor.getResultId
-
-      val rows = new ArrayBuffer[Row]()
-      var loop = true
-
-      while (loop) {
-        Thread.sleep(50) // slow the processing down
-
-        val pageSize = Math.min(500, resultMaxRows)
-        val result = executor.snapshotResult(sessionId, resultId, pageSize)
-        result.getType match {
-          case TypedResult.ResultType.PAYLOAD =>
-            (1 to result.getPayload).foreach { page =>
-              if (rows.size < resultMaxRows) {
-                val result = executor.retrieveResultPage(resultId, page)
-                rows ++= result.asScala.map(r => convertToRow(r, dataTypes))
-              } else {
-                loop = false
-              }
-            }
-          case TypedResult.ResultType.EOS => loop = false
-          case TypedResult.ResultType.EMPTY =>
-        }
-      }
-
-      resultSet = ResultSet.builder
-        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-        .columns(resultDescriptor.getResultSchema.getColumns)
-        .data(rows.slice(0, resultMaxRows).toArray[Row])
-        .build
-    } finally {
-      if (resultId != null) {
-        cleanupQueryResult(resultId)
-      }
-    }
-  }
-
-  private def runModifyOperation(operation: ModifyOperation): Unit = {
-    val result = executor.executeOperation(sessionId, operation)
-    jobId = result.getJobClient.asScala.map(_.getJobID)
-    resultSet = ResultSet.fromJobId(jobId.orNull)
-  }
-
-  private def runOperation(operation: Operation): Unit = {
-    val result = executor.executeOperation(sessionId, operation)
-    jobId = result.getJobClient.asScala.map(_.getJobID)
-    // after FLINK-24461, TableResult#await() would block insert statements
-    // until the job finishes, instead of returning row affected immediately
-    resultSet = ResultSet.fromTableResult(result)
-  }
-
-  private def cleanupQueryResult(resultId: String): Unit = {
-    try {
-      executor.cancelQuery(sessionId, resultId)
-    } catch {
-      case t: Throwable =>
-        warn(s"Failed to clean result set $resultId in session $sessionId", t)
-    }
-  }
-
-  private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
-    val row = Row.withPositions(r.getRowKind, r.getArity)
-    for (i <- 0 until r.getArity) {
-      val dataType = dataTypes(i)
-      dataType.getLogicalType match {
-        case arrayType: ArrayType =>
-          val arrayData = r.getArray(i)
-          if (arrayData == null) {
-            row.setField(i, null)
-          }
-          arrayData match {
-            case d: GenericArrayData =>
-              row.setField(i, d.toObjectArray)
-            case d: BinaryArrayData =>
-              row.setField(i, d.toObjectArray(arrayType.getElementType))
-            case _ =>
-          }
-        case _: BinaryType | _: VarBinaryType =>
-          row.setField(i, r.getBinary(i))
-        case _: BigIntType =>
-          row.setField(i, r.getLong(i))
-        case _: BooleanType =>
-          row.setField(i, r.getBoolean(i))
-        case _: VarCharType | _: CharType =>
-          row.setField(i, r.getString(i))
-        case t: DecimalType =>
-          row.setField(i, r.getDecimal(i, t.getPrecision, t.getScale).toBigDecimal)
-        case _: DateType =>
-          val date = RowSetUtils.formatLocalDate(LocalDate.ofEpochDay(r.getInt(i)))
-          row.setField(i, date)
-        case _: TimeType =>
-          val time = RowSetUtils.formatLocalTime(LocalTime.ofNanoOfDay(r.getLong(i) * 1000 * 1000))
-          row.setField(i, time)
-        case t: TimestampType =>
-          val ts = RowSetUtils
-            .formatLocalDateTime(r.getTimestamp(i, t.getPrecision)
-              .toLocalDateTime)
-          row.setField(i, ts)
-        case _: TinyIntType =>
-          row.setField(i, r.getByte(i))
-        case _: SmallIntType =>
-          row.setField(i, r.getShort(i))
-        case _: IntType =>
-          row.setField(i, r.getInt(i))
-        case _: FloatType =>
-          row.setField(i, r.getFloat(i))
-        case mapType: MapType =>
-          val mapData = r.getMap(i)
-          if (mapData != null && mapData.size > 0) {
-            val keyType = mapType.getKeyType
-            val valueType = mapType.getValueType
-            mapData match {
-              case d: BinaryMapData =>
-                val kvArray = toArray(keyType, valueType, d)
-                val map: util.Map[Any, Any] = new util.HashMap[Any, Any]
-                for (i <- kvArray._1.indices) {
-                  val value: Any = kvArray._2(i)
-                  map.put(kvArray._1(i), value)
-                }
-                row.setField(i, map)
-              case d: GenericMapData => // TODO
-            }
-          } else {
-            row.setField(i, null)
-          }
-        case _: DoubleType =>
-          row.setField(i, r.getDouble(i))
-        case t: RowType =>
-          val fieldDataTypes = DynMethods.builder("getFieldDataTypes")
-            .impl(classOf[DataType], classOf[DataType])
-            .buildStatic
-            .invoke[util.List[DataType]](dataType)
-            .asScala.toList
-          val internalRowData = r.getRow(i, t.getFieldCount)
-          val internalRow = convertToRow(internalRowData, fieldDataTypes)
-          row.setField(i, internalRow)
-        case t =>
-          val hiveString = toHiveString((row.getField(i), t))
-          row.setField(i, hiveString)
-      }
-    }
-    row
-  }
-
-  private[this] def toArray(
-      keyType: LogicalType,
-      valueType: LogicalType,
-      arrayData: BinaryMapData): (Array[_], Array[_]) = {
-
-    arrayData.keyArray().toObjectArray(keyType) -> arrayData.valueArray().toObjectArray(valueType)
-  }
-
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 2859d659e..d734cea05 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -21,8 +21,9 @@ import java.io.IOException
 
 import scala.collection.JavaConverters.collectionAsScalaIterableConverter
 
-import org.apache.flink.table.client.gateway.Executor
-import org.apache.flink.table.client.gateway.context.SessionContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.gateway.service.context.SessionContext
+import org.apache.flink.table.gateway.service.operation.OperationExecutor
 import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTableSchema}
 
 import org.apache.kyuubi.{KyuubiSQLException, Utils}
@@ -36,12 +37,16 @@ import org.apache.kyuubi.session.Session
 
 abstract class FlinkOperation(session: Session) extends AbstractOperation(session) {
 
+  protected val flinkSession: org.apache.flink.table.gateway.service.session.Session =
+    session.asInstanceOf[FlinkSessionImpl].fSession
+
+  protected val executor: OperationExecutor = flinkSession.createExecutor(
+    Configuration.fromMap(flinkSession.getSessionConfig))
+
   protected val sessionContext: SessionContext = {
     session.asInstanceOf[FlinkSessionImpl].sessionContext
   }
 
-  protected val executor: Executor = session.asInstanceOf[FlinkSessionImpl].executor
-
   protected val sessionId: String = session.handle.identifier.toString
 
   protected var resultSet: ResultSet = _
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index d7b5e297d..712c13596 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.kyuubi.KyuubiSQLException
 import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
 import org.apache.kyuubi.operation.{NoneMode, Operation, OperationManager, PlanOnlyMode}
@@ -44,7 +45,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
       runAsync: Boolean,
       queryTimeout: Long): Operation = {
     val flinkSession = session.asInstanceOf[FlinkSessionImpl]
-    if (flinkSession.sessionContext.getConfigMap.getOrDefault(
+    val sessionConfig = flinkSession.fSession.getSessionConfig
+    if (sessionConfig.getOrDefault(
         ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key,
         operationConvertCatalogDatabaseDefault.toString).toBoolean) {
       val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
@@ -53,11 +55,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
       }
     }
 
-    val mode = PlanOnlyMode.fromString(flinkSession.sessionContext.getConfigMap.getOrDefault(
-      OPERATION_PLAN_ONLY_MODE.key,
-      operationModeDefault))
+    val mode = PlanOnlyMode.fromString(
+      sessionConfig.getOrDefault(
+        OPERATION_PLAN_ONLY_MODE.key,
+        operationModeDefault))
 
-    flinkSession.sessionContext.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
+    val sessionContext = FlinkEngineUtils.getSessionContext(flinkSession.fSession)
+    sessionContext.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
     val resultMaxRows =
       flinkSession.normalizedConf.getOrElse(
         ENGINE_FLINK_MAX_ROWS.key,
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
index 11dd760e4..245371681 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import scala.collection.convert.ImplicitConversions._
+
 import org.apache.kyuubi.engine.flink.result.ResultSetUtil
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
 import org.apache.kyuubi.session.Session
@@ -25,8 +27,8 @@ class GetCatalogs(session: Session) extends FlinkOperation(session) {
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
-      val catalogs = tableEnv.listCatalogs.toList
+      val catalogManager = sessionContext.getSessionState.catalogManager
+      val catalogs = catalogManager.listCatalogs.toList
       resultSet = ResultSetUtil.stringListToResultSet(catalogs, TABLE_CAT)
     } catch onError()
   }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
index 6ce2a6ac7..b1a7c0c3e 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
 import org.apache.flink.table.types.logical._
 import org.apache.flink.types.Row
 
@@ -40,17 +40,17 @@ class GetColumns(
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
 
       val catalogName =
-        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) executor.getCurrentCatalog
         else catalogNameOrEmpty
 
       val schemaNameRegex = toJavaRegex(schemaNamePattern)
       val tableNameRegex = toJavaRegex(tableNamePattern)
       val columnNameRegex = toJavaRegex(columnNamePattern).r
 
-      val columns = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
+      val catalogManager = sessionContext.getSessionState.catalogManager
+      val columns = catalogManager.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
         SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
           .flatMap { schemaName =>
             SchemaHelper.getFlinkTablesWithPattern(
@@ -60,7 +60,8 @@ class GetColumns(
               tableNameRegex)
               .filter { _._2.isDefined }
               .flatMap { case (tableName, _) =>
-                val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+                val flinkTable = catalogManager.getTable(
+                  ObjectIdentifier.of(catalogName, schemaName, tableName)).get()
                 val resolvedSchema = flinkTable.getResolvedSchema
                 resolvedSchema.getColumns.asScala.toArray.zipWithIndex
                   .filter { case (column, _) =>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
index 3e42e9aa6..5f82de4a6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
@@ -31,8 +31,7 @@ class GetCurrentCatalog(session: Session) extends FlinkOperation(session) {
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
-      val catalog = tableEnv.getCurrentCatalog
+      val catalog = executor.getCurrentCatalog
       resultSet = ResultSetUtil.stringListToResultSet(List(catalog), TABLE_CAT)
     } catch onError()
   }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
index 014ca2ea3..107609c06 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
@@ -31,8 +31,7 @@ class GetCurrentDatabase(session: Session) extends FlinkOperation(session) {
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
-      val database = tableEnv.getCurrentDatabase
+      val database = sessionContext.getSessionState.catalogManager.getCurrentDatabase
       resultSet = ResultSetUtil.stringListToResultSet(List(database), TABLE_SCHEM)
     } catch onError()
   }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala
index ab870ab79..85f34a29a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala
@@ -20,9 +20,10 @@ package org.apache.kyuubi.engine.flink.operation
 import java.sql.DatabaseMetaData
 
 import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment}
+import org.apache.flink.table.api.{DataTypes, ResultKind}
 import org.apache.flink.table.catalog.Column
 import org.apache.flink.types.Row
 
@@ -42,17 +43,20 @@ class GetFunctions(
     try {
       val schemaPattern = toJavaRegex(schemaName)
       val functionPattern = toJavaRegex(functionName)
-      val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
+      val functionCatalog = sessionContext.getSessionState.functionCatalog
+      val catalogManager = sessionContext.getSessionState.catalogManager
+
       val systemFunctions = filterPattern(
-        tableEnv.listFunctions().diff(tableEnv.listUserDefinedFunctions()),
+        functionCatalog.getFunctions
+          .diff(functionCatalog.getUserDefinedFunctions),
         functionPattern)
         .map { f =>
           Row.of(null, null, f, null, Integer.valueOf(DatabaseMetaData.functionResultUnknown), null)
-        }
-      val catalogFunctions = tableEnv.listCatalogs()
+        }.toArray
+      val catalogFunctions = catalogManager.listCatalogs()
         .filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
         .flatMap { c =>
-          val catalog = tableEnv.getCatalog(c).get()
+          val catalog = catalogManager.getCatalog(c).get()
           filterPattern(catalog.listDatabases().asScala, schemaPattern)
             .flatMap { d =>
               filterPattern(catalog.listFunctions(d).asScala, functionPattern)
@@ -66,7 +70,7 @@ class GetFunctions(
                     null)
                 }
             }
-        }
+        }.toArray
       resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
         .columns(
           Column.physical(FUNCTION_CAT, DataTypes.STRING()),
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala
index b534feb1f..5b9060cf1 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala
@@ -21,8 +21,9 @@ import scala.collection.JavaConverters._
 
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
 import org.apache.flink.types.Row
+import org.apache.flink.util.FlinkException
 
 import org.apache.kyuubi.engine.flink.result.ResultSet
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -37,22 +38,25 @@ class GetPrimaryKeys(
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+      val catalogManager = sessionContext.getSessionState.catalogManager
 
       val catalogName =
-        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) catalogManager.getCurrentCatalog
         else catalogNameOrEmpty
 
       val schemaName =
         if (StringUtils.isEmpty(schemaNameOrEmpty)) {
-          if (catalogName != tableEnv.getCurrentCatalog) {
-            tableEnv.getCatalog(catalogName).get().getDefaultDatabase
+          if (catalogName != executor.getCurrentCatalog) {
+            catalogManager.getCatalog(catalogName).get().getDefaultDatabase
           } else {
-            tableEnv.getCurrentDatabase
+            catalogManager.getCurrentDatabase
           }
         } else schemaNameOrEmpty
 
-      val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+      val flinkTable = catalogManager
+        .getTable(ObjectIdentifier.of(catalogName, schemaName, tableName))
+        .orElseThrow(() =>
+          new FlinkException(s"Table `$catalogName`.`$schemaName`.`$tableName`` not found."))
 
       val resolvedSchema = flinkTable.getResolvedSchema
       val primaryKeySchema = resolvedSchema.getPrimaryKey
@@ -102,5 +106,4 @@ class GetPrimaryKeys(
     )
     // format: on
   }
-
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala
index 6715b2320..f56ddd8b1 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala
@@ -18,9 +18,10 @@
 package org.apache.kyuubi.engine.flink.operation
 
 import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment}
+import org.apache.flink.table.api.{DataTypes, ResultKind}
 import org.apache.flink.table.catalog.Column
 import org.apache.flink.types.Row
 
@@ -35,14 +36,14 @@ class GetSchemas(session: Session, catalogName: String, schema: String)
   override protected def runInternal(): Unit = {
     try {
       val schemaPattern = toJavaRegex(schema)
-      val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
-      val schemas = tableEnv.listCatalogs()
+      val catalogManager = sessionContext.getSessionState.catalogManager
+      val schemas = catalogManager.listCatalogs()
         .filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
         .flatMap { c =>
-          val catalog = tableEnv.getCatalog(c).get()
+          val catalog = catalogManager.getCatalog(c).get()
           filterPattern(catalog.listDatabases().asScala, schemaPattern)
             .map { d => Row.of(d, c) }
-        }
+        }.toArray
       resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
         .columns(
           Column.physical(TABLE_SCHEM, DataTypes.STRING()),
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
index a4e55715a..325a50167 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
@@ -37,16 +37,16 @@ class GetTables(
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+      val catalogManager = sessionContext.getSessionState.catalogManager
 
       val catalogName =
-        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) catalogManager.getCurrentCatalog
         else catalogNameOrEmpty
 
       val schemaNameRegex = toJavaRegex(schemaNamePattern)
       val tableNameRegex = toJavaRegex(tableNamePattern)
 
-      val tables = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
+      val tables = catalogManager.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
         SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
           .flatMap { schemaName =>
             SchemaHelper.getFlinkTablesWithPattern(
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
deleted file mode 100644
index 7d624948c..000000000
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.operation
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
-import org.apache.flink.table.client.gateway.Executor
-import org.apache.flink.table.operations.command._
-import org.apache.flink.types.Row
-
-import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil}
-import org.apache.kyuubi.engine.flink.result.ResultSetUtil.successResultSet
-import org.apache.kyuubi.reflection.DynMethods
-
-object OperationUtils {
-
-  /**
-   * Runs a SetOperation with executor. Returns when SetOperation is executed successfully.
-   *
-   * @param setOperation Set operation.
-   * @param executor A gateway for communicating with Flink and other external systems.
-   * @param sessionId Id of the session.
-   * @return A ResultSet of SetOperation execution.
-   */
-  def runSetOperation(
-      setOperation: SetOperation,
-      executor: Executor,
-      sessionId: String): ResultSet = {
-    if (setOperation.getKey.isPresent) {
-      val key: String = setOperation.getKey.get.trim
-
-      if (setOperation.getValue.isPresent) {
-        val newValue: String = setOperation.getValue.get.trim
-        executor.setSessionProperty(sessionId, key, newValue)
-      }
-
-      val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
-      ResultSet.builder
-        .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-        .columns(
-          Column.physical("key", DataTypes.STRING()),
-          Column.physical("value", DataTypes.STRING()))
-        .data(Array(Row.of(key, value)))
-        .build
-    } else {
-      // show all properties if set without key
-      val properties: util.Map[String, String] = executor.getSessionConfigMap(sessionId)
-
-      val entries = ArrayBuffer.empty[Row]
-      properties.forEach((key, value) => entries.append(Row.of(key, value)))
-
-      if (entries.nonEmpty) {
-        val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
-        ResultSet.builder
-          .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-          .columns(
-            Column.physical("key", DataTypes.STRING()),
-            Column.physical("value", DataTypes.STRING()))
-          .data(prettyEntries.toArray)
-          .build
-      } else {
-        ResultSet.builder
-          .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
-          .columns(
-            Column.physical("key", DataTypes.STRING()),
-            Column.physical("value", DataTypes.STRING()))
-          .data(Array[Row]())
-          .build
-      }
-    }
-  }
-
-  /**
-   * Runs a ResetOperation with executor. Returns when ResetOperation is executed successfully.
-   *
-   * @param resetOperation Reset operation.
-   * @param executor A gateway for communicating with Flink and other external systems.
-   * @param sessionId Id of the session.
-   * @return A ResultSet of ResetOperation execution.
-   */
-  def runResetOperation(
-      resetOperation: ResetOperation,
-      executor: Executor,
-      sessionId: String): ResultSet = {
-    if (resetOperation.getKey.isPresent) {
-      // reset the given property
-      executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
-    } else {
-      // reset all properties
-      executor.resetSessionProperties(sessionId)
-    }
-    successResultSet
-  }
-
-  /**
-   * Runs a AddJarOperation with the executor. Currently only jars on local filesystem
-   * are supported.
-   *
-   * @param addJarOperation Add-jar operation.
-   * @param executor A gateway for communicating with Flink and other external systems.
-   * @param sessionId Id of the session.
-   * @return A ResultSet of AddJarOperation execution.
-   */
-  def runAddJarOperation(
-      addJarOperation: AddJarOperation,
-      executor: Executor,
-      sessionId: String): ResultSet = {
-    // Removed by FLINK-27790
-    val addJar = DynMethods.builder("addJar")
-      .impl(executor.getClass, classOf[String], classOf[String])
-      .build(executor)
-    addJar.invoke[Void](sessionId, addJarOperation.getPath)
-    successResultSet
-  }
-
-  /**
-   * Runs a RemoveJarOperation with the executor. Only jars added by AddJarOperation could
-   * be removed.
-   *
-   * @param removeJarOperation Remove-jar operation.
-   * @param executor A gateway for communicating with Flink and other external systems.
-   * @param sessionId Id of the session.
-   * @return A ResultSet of RemoveJarOperation execution.
-   */
-  def runRemoveJarOperation(
-      removeJarOperation: RemoveJarOperation,
-      executor: Executor,
-      sessionId: String): ResultSet = {
-    executor.removeJar(sessionId, removeJarOperation.getPath)
-    successResultSet
-  }
-
-  /**
-   * Runs a ShowJarsOperation with the executor. Returns the jars of the current session.
-   *
-   * @param showJarsOperation Show-jar operation.
-   * @param executor A gateway for communicating with Flink and other external systems.
-   * @param sessionId Id of the session.
-   * @return A ResultSet of ShowJarsOperation execution.
-   */
-  def runShowJarOperation(
-      showJarsOperation: ShowJarsOperation,
-      executor: Executor,
-      sessionId: String): ResultSet = {
-    // Removed by FLINK-27790
-    val listJars = DynMethods.builder("listJars")
-      .impl(executor.getClass, classOf[String])
-      .build(executor)
-    val jars = listJars.invoke[util.List[String]](sessionId)
-    ResultSetUtil.stringListToResultSet(jars.asScala.toList, "jar")
-  }
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
index afe04a307..4f5d8218f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
@@ -17,10 +17,11 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import com.google.common.base.Preconditions
 import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.gateway.api.operation.OperationHandle
 import org.apache.flink.table.operations.command._
 
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils.isFlinkVersionAtMost
 import org.apache.kyuubi.engine.flink.result.ResultSetUtil
 import org.apache.kyuubi.operation.{ExecutionMode, ParseMode, PhysicalMode, PlanOnlyMode, UnknownMode}
 import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
@@ -46,18 +47,19 @@ class PlanOnlyStatement(
 
   override protected def runInternal(): Unit = {
     try {
-      val operation = executor.parseStatement(sessionId, statement)
+      val operations = executor.getTableEnvironment.getParser.parse(statement)
+      Preconditions.checkArgument(
+        operations.size() == 1,
+        "Plan-only mode supports single statement only",
+        null)
+      val operation = operations.get(0)
       operation match {
-        case setOperation: SetOperation =>
-          resultSet = OperationUtils.runSetOperation(setOperation, executor, sessionId)
-        case resetOperation: ResetOperation =>
-          resultSet = OperationUtils.runResetOperation(resetOperation, executor, sessionId)
-        case addJarOperation: AddJarOperation if isFlinkVersionAtMost("1.15") =>
-          resultSet = OperationUtils.runAddJarOperation(addJarOperation, executor, sessionId)
-        case removeJarOperation: RemoveJarOperation =>
-          resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation, executor, sessionId)
-        case showJarsOperation: ShowJarsOperation if isFlinkVersionAtMost("1.15") =>
-          resultSet = OperationUtils.runShowJarOperation(showJarsOperation, executor, sessionId)
+        case _: SetOperation | _: ResetOperation | _: AddJarOperation | _: RemoveJarOperation |
+            _: ShowJarsOperation =>
+          val resultFetcher = executor.executeStatement(
+            new OperationHandle(getHandle.identifier),
+            statement)
+          resultSet = ResultSetUtil.fromResultFetcher(resultFetcher);
         case _ => explainOperation(statement)
       }
     } catch {
@@ -66,7 +68,7 @@ class PlanOnlyStatement(
   }
 
   private def explainOperation(statement: String): Unit = {
-    val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
+    val tableEnv: TableEnvironment = executor.getTableEnvironment
     val explainPlans =
       tableEnv.explainSql(statement).split(s"$lineSeparator$lineSeparator")
     val operationPlan = mode match {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
index 60214b2cd..f279ccda6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
@@ -30,8 +30,8 @@ class SetCurrentCatalog(session: Session, catalog: String)
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
-      tableEnv.useCatalog(catalog)
+      val catalogManager = sessionContext.getSessionState.catalogManager
+      catalogManager.setCurrentCatalog(catalog)
       setHasResultSet(false)
     } catch onError()
   }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
index 7610ab2f1..70535e834 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
@@ -30,8 +30,8 @@ class SetCurrentDatabase(session: Session, database: String)
 
   override protected def runInternal(): Unit = {
     try {
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
-      tableEnv.useDatabase(database)
+      val catalogManager = sessionContext.getSessionState.catalogManager
+      catalogManager.setCurrentDatabase(database)
       setHasResultSet(false)
     } catch onError()
   }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index 09c401988..b90be09ff 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
 
 import com.google.common.collect.Iterators
 import org.apache.flink.api.common.JobID
-import org.apache.flink.table.api.{DataTypes, ResultKind, TableResult}
+import org.apache.flink.table.api.{DataTypes, ResultKind}
 import org.apache.flink.table.catalog.Column
 import org.apache.flink.types.Row
 
@@ -58,17 +58,6 @@ case class ResultSet(
  */
 object ResultSet {
 
-  def fromTableResult(tableResult: TableResult): ResultSet = {
-    val schema = tableResult.getResolvedSchema
-    // collect all rows from table result as list
-    // this is ok as TableResult contains limited rows
-    val rows = tableResult.collect.asScala.toArray
-    builder.resultKind(tableResult.getResultKind)
-      .columns(schema.getColumns)
-      .data(rows)
-      .build
-  }
-
   def fromJobId(jobID: JobID): ResultSet = {
     val data: Array[Row] = if (jobID != null) {
       Array(Row.of(jobID.toString))
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index ded271cf1..d6bc2bada 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -17,14 +17,25 @@
 
 package org.apache.kyuubi.engine.flink.result;
 
+import scala.collection.convert.ImplicitConversions._
+import scala.collection.mutable.ListBuffer
+
 import org.apache.flink.table.api.DataTypes
 import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.data.conversion.DataStructureConverters
+import org.apache.flink.table.gateway.service.result.ResultFetcher
+import org.apache.flink.table.types.DataType
 import org.apache.flink.types.Row
 
+import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
+
 /** Utility object for building ResultSet. */
 object ResultSetUtil {
 
+  private val FETCH_ROWS_PER_SECOND = 1000
+
   /**
    * Build a ResultSet with a column name and a list of String values.
    *
@@ -54,4 +65,64 @@ object ResultSetUtil {
       .columns(Column.physical("result", DataTypes.STRING))
       .data(Array[Row](Row.of("OK")))
       .build
+
+  def fromResultFetcher(resultFetcher: ResultFetcher, maxRows: Int): ResultSet = {
+    val schema = resultFetcher.getResultSchema
+    val resultRowData = ListBuffer.newBuilder[RowData]
+    var fetched: FlinkResultSet = null
+    var token: Long = 0
+    var rowNum: Int = 0
+    do {
+      fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND))
+      val data = fetched.getData
+      val slice = data.slice(0, maxRows - rowNum)
+      resultRowData ++= slice
+      rowNum += slice.size
+      token = fetched.getNextToken
+      try Thread.sleep(1000L)
+      catch {
+        case _: InterruptedException => fetched.getNextToken == null
+      }
+    } while (
+      fetched.getNextToken != null &&
+        rowNum < maxRows &&
+        fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
+    )
+    val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
+    ResultSet.builder
+      .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+      .columns(schema.getColumns)
+      .data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray)
+      .build
+  }
+
+  def fromResultFetcher(resultFetcher: ResultFetcher): ResultSet = {
+    val schema = resultFetcher.getResultSchema
+    val resultRowData = ListBuffer.newBuilder[RowData]
+    var fetched: FlinkResultSet = null
+    var token: Long = 0
+    do {
+      fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND))
+      resultRowData ++= fetched.getData
+      token = fetched.getNextToken
+      try Thread.sleep(1000L)
+      catch {
+        case _: InterruptedException =>
+      }
+    } while (
+      fetched.getNextToken != null &&
+        fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
+    )
+    val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
+    ResultSet.builder
+      .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+      .columns(schema.getColumns)
+      .data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray)
+      .build
+  }
+
+  private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
+    val converter = DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
+    converter.toExternal(r).asInstanceOf[Row]
+  }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 07971e39f..71caaa67a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,12 +17,17 @@
 
 package org.apache.kyuubi.engine.flink.session
 
-import org.apache.flink.table.client.gateway.context.DefaultContext
-import org.apache.flink.table.client.gateway.local.LocalExecutor
+import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.mapAsJavaMap
+
+import org.apache.flink.table.gateway.api.session.SessionEnvironment
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
+import org.apache.flink.table.gateway.service.context.DefaultContext
 import org.apache.hive.service.rpc.thrift.TProtocolVersion
 
 import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
 import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
+import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
 import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
 
 class FlinkSQLSessionManager(engineContext: DefaultContext)
@@ -31,11 +36,11 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
   override protected def isServer: Boolean = false
 
   val operationManager = new FlinkSQLOperationManager()
-  val executor = new LocalExecutor(engineContext)
+  val sessionManager = new FlinkSessionManager(engineContext)
 
   override def start(): Unit = {
     super.start()
-    executor.start()
+    sessionManager.start()
   }
 
   override protected def createSession(
@@ -46,19 +51,33 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
       conf: Map[String, String]): Session = {
     conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
       getSessionOption).getOrElse {
-      new FlinkSessionImpl(
+      val flinkInternalSession = sessionManager.openSession(
+        SessionEnvironment.newBuilder
+          .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1)
+          .addSessionConfig(mapAsJavaMap(conf))
+          .build)
+      val sessionConfig = flinkInternalSession.getSessionConfig
+      sessionConfig.putAll(conf.asJava)
+      val session = new FlinkSessionImpl(
         protocol,
         user,
         password,
         ipAddress,
         conf,
         this,
-        executor)
+        flinkInternalSession)
+      session
     }
   }
 
   override def closeSession(sessionHandle: SessionHandle): Unit = {
     super.closeSession(sessionHandle)
-    executor.closeSession(sessionHandle.toString)
+    sessionManager.closeSession(
+      new org.apache.flink.table.gateway.api.session.SessionHandle(sessionHandle.identifier))
+  }
+
+  override def stop(): Unit = synchronized {
+    sessionManager.stop()
+    super.stop()
   }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index a4b6a8a90..09f5ac943 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -19,10 +19,12 @@ package org.apache.kyuubi.engine.flink.session
 
 import scala.util.control.NonFatal
 
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.table.client.gateway.SqlExecutionException
-import org.apache.flink.table.client.gateway.context.SessionContext
-import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.table.gateway.api.operation.OperationHandle
+import org.apache.flink.table.gateway.service.context.SessionContext
+import org.apache.flink.table.gateway.service.session.{Session => FSession}
 import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
 
 import org.apache.kyuubi.KyuubiSQLException
@@ -37,14 +39,15 @@ class FlinkSessionImpl(
     ipAddress: String,
     conf: Map[String, String],
     sessionManager: SessionManager,
-    val executor: LocalExecutor)
+    val fSession: FSession)
   extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
 
   override val handle: SessionHandle =
-    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+    conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID)
+      .getOrElse(SessionHandle.fromUUID(fSession.getSessionHandle.getIdentifier.toString))
 
   lazy val sessionContext: SessionContext = {
-    FlinkEngineUtils.getSessionContext(executor, handle.identifier.toString)
+    FlinkEngineUtils.getSessionContext(fSession)
   }
 
   private def setModifiableConfig(key: String, value: String): Unit = {
@@ -56,16 +59,15 @@ class FlinkSessionImpl(
   }
 
   override def open(): Unit = {
-    executor.openSession(handle.identifier.toString)
+    val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))
 
     val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
       Array("use:catalog", "use:database").contains(k)
     }
 
     useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
       try {
-        tableEnv.useCatalog(catalog)
+        executor.executeStatement(OperationHandle.create, s"USE CATALOG $catalog")
       } catch {
         case NonFatal(e) =>
           throw e
@@ -73,9 +75,8 @@ class FlinkSessionImpl(
     }
 
     useCatalogAndDatabaseConf.get("use:database").foreach { database =>
-      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
       try {
-        tableEnv.useDatabase(database)
+        executor.executeStatement(OperationHandle.create, s"USE $database")
       } catch {
         case NonFatal(e) =>
           if (database != "default") {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
new file mode 100644
index 000000000..e3dd0f081
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.shim
+
+import java.lang.{Long => JLong}
+import java.util
+
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.gateway.api.results.ResultSet.ResultType
+
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.reflection.DynMethods
+
+class FlinkResultSet(resultSet: AnyRef) {
+
+  def getData: util.List[RowData] = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("getData")
+        .impl("org.apache.flink.table.gateway.api.results.ResultSet")
+        .build()
+        .invoke(resultSet)
+        .asInstanceOf[util.List[RowData]]
+    } else {
+      DynMethods.builder("getData")
+        .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
+        .build()
+        .invoke(resultSet)
+        .asInstanceOf[util.List[RowData]]
+    }
+  }
+
+  def getNextToken: JLong = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("getNextToken")
+        .impl("org.apache.flink.table.gateway.api.results.ResultSet")
+        .build()
+        .invoke(resultSet)
+        .asInstanceOf[JLong]
+    } else {
+      DynMethods.builder("getNextToken")
+        .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
+        .build()
+        .invoke(resultSet)
+        .asInstanceOf[JLong]
+    }
+  }
+
+  def getResultType: ResultType = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("getResultType")
+        .impl("org.apache.flink.table.gateway.api.results.ResultSet")
+        .build()
+        .invoke(resultSet)
+        .asInstanceOf[ResultType]
+    } else {
+      DynMethods.builder("getResultType")
+        .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
+        .build()
+        .invoke(resultSet)
+        .asInstanceOf[ResultType]
+    }
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
new file mode 100644
index 000000000..e34819dd6
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.shim
+
+import org.apache.flink.table.gateway.api.session.{SessionEnvironment, SessionHandle}
+import org.apache.flink.table.gateway.service.context.DefaultContext
+import org.apache.flink.table.gateway.service.session.Session
+
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.reflection.{DynConstructors, DynMethods}
+
+class FlinkSessionManager(engineContext: DefaultContext) {
+
+  val sessionManager: AnyRef = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynConstructors.builder().impl(
+        "org.apache.flink.table.gateway.service.session.SessionManager",
+        classOf[DefaultContext])
+        .build()
+        .newInstance(engineContext)
+    } else {
+      DynConstructors.builder().impl(
+        "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+        classOf[DefaultContext])
+        .build()
+        .newInstance(engineContext)
+    }
+  }
+
+  def start(): Unit = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("start")
+        .impl("org.apache.flink.table.gateway.service.session.SessionManager")
+        .build()
+        .invoke(sessionManager)
+    } else {
+      DynMethods.builder("start")
+        .impl("org.apache.flink.table.gateway.service.session.SessionManagerImpl")
+        .build()
+        .invoke(sessionManager)
+    }
+  }
+
+  def stop(): Unit = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("stop")
+        .impl("org.apache.flink.table.gateway.service.session.SessionManager")
+        .build()
+        .invoke(sessionManager)
+    } else {
+      DynMethods.builder("stop")
+        .impl("org.apache.flink.table.gateway.service.session.SessionManagerImpl")
+        .build()
+        .invoke(sessionManager)
+    }
+  }
+
+  def getSession(sessionHandle: SessionHandle): Session = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("getSession")
+        .impl(
+          "org.apache.flink.table.gateway.service.session.SessionManager",
+          classOf[SessionHandle])
+        .build()
+        .invoke(sessionManager, sessionHandle)
+        .asInstanceOf[Session]
+    } else {
+      DynMethods.builder("getSession")
+        .impl(
+          "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+          classOf[SessionHandle])
+        .build()
+        .invoke(sessionManager, sessionHandle)
+        .asInstanceOf[Session]
+    }
+  }
+
+  def openSession(environment: SessionEnvironment): Session = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("openSession")
+        .impl(
+          "org.apache.flink.table.gateway.service.session.SessionManager",
+          classOf[SessionEnvironment])
+        .build()
+        .invoke(sessionManager, environment)
+        .asInstanceOf[Session]
+    } else {
+      DynMethods.builder("openSession")
+        .impl(
+          "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+          classOf[SessionEnvironment])
+        .build()
+        .invoke(sessionManager, environment)
+        .asInstanceOf[Session]
+    }
+  }
+
+  def closeSession(sessionHandle: SessionHandle): Unit = {
+    if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+      DynMethods.builder("closeSession")
+        .impl(
+          "org.apache.flink.table.gateway.service.session.SessionManager",
+          classOf[SessionHandle])
+        .build()
+        .invoke(sessionManager, sessionHandle)
+    } else {
+      DynMethods.builder("closeSession")
+        .impl(
+          "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+          classOf[SessionHandle])
+        .build()
+        .invoke(sessionManager, sessionHandle)
+    }
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
index aebcce6c5..c352429ea 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
@@ -17,30 +17,14 @@
 
 package org.apache.kyuubi.engine.flink
 
-import java.util.UUID
-
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
-import org.apache.kyuubi.engine.ShareLevel
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
+import org.apache.kyuubi.config.KyuubiConf
 import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider}
 
-trait WithDiscoveryFlinkSQLEngine extends WithFlinkSQLEngineOnYarn {
-
-  override protected def engineRefId: String = UUID.randomUUID().toString
-
-  def namespace: String = "/kyuubi/flink-yarn-application-test"
+trait WithDiscoveryFlinkSQLEngine {
 
-  def shareLevel: String = ShareLevel.USER.toString
+  protected def namespace: String
 
-  def engineType: String = "flink"
-
-  override def withKyuubiConf: Map[String, String] = {
-    Map(
-      HA_NAMESPACE.key -> namespace,
-      HA_ENGINE_REF_ID.key -> engineRefId,
-      ENGINE_TYPE.key -> "FLINK_SQL",
-      ENGINE_SHARE_LEVEL.key -> shareLevel)
-  }
+  protected def conf: KyuubiConf
 
   def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
     DiscoveryClientProvider.withDiscoveryClient(conf)(f)
@@ -49,7 +33,7 @@ trait WithDiscoveryFlinkSQLEngine extends WithFlinkSQLEngineOnYarn {
   def getFlinkEngineServiceUrl: String = {
     var hostPort: Option[(String, Int)] = None
     var retries = 0
-    while (hostPort.isEmpty && retries < 5) {
+    while (hostPort.isEmpty && retries < 10) {
       withDiscoveryClient(client => hostPort = client.getServerHost(namespace))
       retries += 1
       Thread.sleep(1000L)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index c8435f9c5..0001f31ae 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -17,58 +17,167 @@
 
 package org.apache.kyuubi.engine.flink
 
+import java.io.{File, FilenameFilter}
+import java.lang.ProcessBuilder.Redirect
+import java.net.URI
+import java.nio.file.{Files, Paths}
+
 import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
 
-import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI}
 import org.apache.flink.configuration.{Configuration, RestOptions}
 import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
-import org.apache.flink.table.client.gateway.context.DefaultContext
 
-import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiFunSuite, SCALA_COMPILE_VERSION, Utils}
 import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS}
 
 trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources {
 
   protected val flinkConfig = new Configuration()
+
   protected var miniCluster: MiniCluster = _
-  protected var engine: FlinkSQLEngine = _
-  // conf will be loaded until start flink engine
+
+  protected var engineProcess: Process = _
+
+  private var zkServer: EmbeddedZookeeper = _
+
+  protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf
+
+  protected def engineRefId: String
+
   def withKyuubiConf: Map[String, String]
-  protected val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
 
   protected var connectionUrl: String = _
 
   override def beforeAll(): Unit = {
+    withKyuubiConf.foreach { case (k, v) =>
+      if (k.startsWith("flink.")) {
+        flinkConfig.setString(k.stripPrefix("flink."), v)
+      }
+    }
+    withKyuubiConf.foreach { case (k, v) =>
+      System.setProperty(k, v)
+      conf.set(k, v)
+    }
+
+    zkServer = new EmbeddedZookeeper()
+    conf.set(ZK_CLIENT_PORT, 0).set(ZK_CLIENT_PORT_ADDRESS, "localhost")
+    zkServer.initialize(conf)
+    zkServer.start()
+    conf.set(HA_ADDRESSES, zkServer.getConnectString)
+
+    val envs = scala.collection.mutable.Map[String, String]()
+    val kyuubiExternals = Utils.getCodeSourceLocation(getClass)
+      .split("externals").head
+    val flinkHome = {
+      val candidates = Paths.get(kyuubiExternals, "externals", "kyuubi-download", "target")
+        .toFile.listFiles(f => f.getName.contains("flink"))
+      if (candidates == null) None else candidates.map(_.toPath).headOption
+    }
+    if (flinkHome.isDefined) {
+      envs("FLINK_HOME") = flinkHome.get.toString
+      envs("FLINK_CONF_DIR") = Paths.get(flinkHome.get.toString, "conf").toString
+    }
+    envs("JAVA_HOME") = System.getProperty("java.home")
+    envs("JAVA_EXEC") = Paths.get(envs("JAVA_HOME"), "bin", "java").toString
+
     startMiniCluster()
-    startFlinkEngine()
+    startFlinkEngine(envs.toMap)
     super.beforeAll()
   }
 
   override def afterAll(): Unit = {
     super.afterAll()
-    stopFlinkEngine()
-    miniCluster.close()
+    if (engineProcess != null) {
+      engineProcess.destroy()
+      engineProcess = null
+    }
+    if (miniCluster != null) {
+      miniCluster.close()
+      miniCluster = null
+    }
+    if (zkServer != null) {
+      zkServer.stop()
+      zkServer = null
+    }
   }
 
-  def startFlinkEngine(): Unit = {
-    withKyuubiConf.foreach { case (k, v) =>
-      System.setProperty(k, v)
-      kyuubiConf.set(k, v)
+  def startFlinkEngine(envs: Map[String, String]): Unit = {
+    val flinkHome = envs("FLINK_HOME")
+    val processBuilder: ProcessBuilder = new ProcessBuilder
+    processBuilder.environment().putAll(envs.asJava)
+
+    conf.set(ENGINE_FLINK_EXTRA_CLASSPATH, udfJar.getAbsolutePath)
+    val command = new ArrayBuffer[String]()
+
+    command += envs("JAVA_EXEC")
+
+    val memory = conf.get(ENGINE_FLINK_MEMORY)
+    command += s"-Xmx$memory"
+    val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
+    if (javaOptions.isDefined) {
+      command += javaOptions.get
     }
-    val engineContext = new DefaultContext(
-      List(udfJar.toURI.toURL).asJava,
-      flinkConfig,
-      List[CustomCommandLine](new DefaultCLI).asJava)
-    FlinkSQLEngine.startEngine(engineContext)
-    engine = FlinkSQLEngine.currentEngine.get
-    connectionUrl = engine.frontendServices.head.connectionUrl
-  }
 
-  def stopFlinkEngine(): Unit = {
-    if (engine != null) {
-      engine.stop()
-      engine = null
+    command += "-cp"
+    val classpathEntries = new java.util.LinkedHashSet[String]
+    // flink engine runtime jar
+    mainResource(envs).foreach(classpathEntries.add)
+    // flink sql jars
+    Paths.get(flinkHome)
+      .resolve("opt")
+      .toFile
+      .listFiles(new FilenameFilter {
+        override def accept(dir: File, name: String): Boolean = {
+          name.toLowerCase.startsWith("flink-sql-client") ||
+          name.toLowerCase.startsWith("flink-sql-gateway")
+        }
+      }).foreach(jar => classpathEntries.add(jar.getAbsolutePath))
+
+    // jars from flink lib
+    classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
+
+    // classpath contains flink configurations, default to flink.home/conf
+    classpathEntries.add(envs.getOrElse("FLINK_CONF_DIR", ""))
+    // classpath contains hadoop configurations
+    val cp = System.getProperty("java.class.path")
+    // exclude kyuubi flink engine jar that has SPI for EmbeddedExecutorFactory
+    // which can't be initialized on the client side
+    val hadoopJars = cp.split(":").filter(s => !s.contains("flink"))
+    hadoopJars.foreach(classpathEntries.add)
+    val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+    extraCp.foreach(classpathEntries.add)
+    if (hadoopJars.isEmpty && extraCp.isEmpty) {
+      mainResource(envs).foreach { path =>
+        val devHadoopJars = Paths.get(path).getParent
+          .resolve(s"scala-$SCALA_COMPILE_VERSION")
+          .resolve("jars")
+        if (!Files.exists(devHadoopJars)) {
+          throw new KyuubiException(s"The path $devHadoopJars does not exists. " +
+            s"Please set FLINK_HADOOP_CLASSPATH or ${ENGINE_FLINK_EXTRA_CLASSPATH.key}" +
+            s" for configuring location of hadoop client jars, etc.")
+        }
+        classpathEntries.add(s"$devHadoopJars${File.separator}*")
+      }
+    }
+    command += classpathEntries.asScala.mkString(File.pathSeparator)
+    command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
+
+    conf.getAll.foreach { case (k, v) =>
+      command += "--conf"
+      command += s"$k=$v"
     }
+
+    processBuilder.command(command.toList.asJava)
+    processBuilder.redirectOutput(Redirect.INHERIT)
+    processBuilder.redirectError(Redirect.INHERIT)
+
+    info(s"staring flink local engine...")
+    engineProcess = processBuilder.start()
   }
 
   private def startMiniCluster(): Unit = {
@@ -84,4 +193,35 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
 
   protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
 
+  def mainResource(env: Map[String, String]): Option[String] = {
+    val module = "kyuubi-flink-sql-engine"
+    val shortName = "flink"
+    // 1. get the main resource jar for user specified config first
+    val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
+    conf.getOption(s"kyuubi.session.engine.$shortName.main.resource").filter {
+      userSpecified =>
+        // skip check exist if not local file.
+        val uri = new URI(userSpecified)
+        val schema = if (uri.getScheme != null) uri.getScheme else "file"
+        schema match {
+          case "file" => Files.exists(Paths.get(userSpecified))
+          case _ => true
+        }
+    }.orElse {
+      // 2. get the main resource jar from system build default
+      env.get(KYUUBI_HOME).toSeq
+        .flatMap { p =>
+          Seq(
+            Paths.get(p, "externals", "engines", shortName, jarName),
+            Paths.get(p, "externals", module, "target", jarName))
+        }
+        .find(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
+    }.orElse {
+      // 3. get the main resource from dev environment
+      val cwd = Utils.getCodeSourceLocation(getClass).split("externals")
+      assert(cwd.length > 1)
+      Option(Paths.get(cwd.head, "externals", module, "target", jarName))
+        .map(_.toAbsolutePath.toFile.getCanonicalPath)
+    }
+  }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
index 3847087b3..553574e65 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
@@ -49,7 +49,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with WithFlinkTestResource
 
   private var zkServer: EmbeddedZookeeper = _
 
-  def withKyuubiConf: Map[String, String]
+  def withKyuubiConf: Map[String, String] = testExtraConf
 
   private val yarnConf: YarnConfiguration = {
     val yarnConfig = new YarnConfiguration()
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
index 6a85654f0..3ea02774e 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
@@ -17,6 +17,8 @@
 
 package org.apache.kyuubi.engine.flink
 
+import java.io.File
+
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
 
@@ -33,9 +35,12 @@ trait WithFlinkTestResources {
       }
      """
 
-  protected val udfJar = TestUserClassLoaderJar.createJarFile(
+  protected val udfJar: File = TestUserClassLoaderJar.createJarFile(
     Utils.createTempDir("test-jar").toFile,
     "test-classloader-udf.jar",
     GENERATED_UDF_CLASS,
     GENERATED_UDF_CODE)
+
+  protected val testExtraConf: Map[String, String] = Map(
+    "flink.pipeline.name" -> "test-job")
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
index e4e6a5c67..0f4b38d36 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -17,17 +17,35 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
+import java.util.UUID
+
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
 import org.apache.kyuubi.operation.NoneMode
 
 class FlinkOperationLocalSuite extends FlinkOperationSuite
-  with WithFlinkSQLEngineLocal {
+  with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {
+
+  protected def jdbcUrl: String = getFlinkEngineServiceUrl
+
+  override def withKyuubiConf: Map[String, String] = {
+    Map(
+      "flink.execution.target" -> "remote",
+      HA_NAMESPACE.key -> namespace,
+      HA_ENGINE_REF_ID.key -> engineRefId,
+      ENGINE_TYPE.key -> "FLINK_SQL",
+      ENGINE_SHARE_LEVEL.key -> shareLevel,
+      OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name) ++ testExtraConf
+  }
+
+  override protected def engineRefId: String = UUID.randomUUID().toString
+
+  def namespace: String = "/kyuubi/flink-local-engine-test"
 
-  override def withKyuubiConf: Map[String, String] =
-    Map(OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name)
+  def shareLevel: String = ShareLevel.USER.toString
 
-  override protected def jdbcUrl: String =
-    s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+  def engineType: String = "flink"
 
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
index b43e83db6..931d500f7 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -17,10 +17,31 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
-import org.apache.kyuubi.engine.flink.WithDiscoveryFlinkSQLEngine
+import java.util.UUID
+
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineOnYarn}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
 
 class FlinkOperationOnYarnSuite extends FlinkOperationSuite
-  with WithDiscoveryFlinkSQLEngine {
+  with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineOnYarn {
 
   protected def jdbcUrl: String = getFlinkEngineServiceUrl
+
+  override def withKyuubiConf: Map[String, String] = {
+    Map(
+      HA_NAMESPACE.key -> namespace,
+      HA_ENGINE_REF_ID.key -> engineRefId,
+      ENGINE_TYPE.key -> "FLINK_SQL",
+      ENGINE_SHARE_LEVEL.key -> shareLevel) ++ testExtraConf
+  }
+
+  override protected def engineRefId: String = UUID.randomUUID().toString
+
+  def namespace: String = "/kyuubi/flink-yarn-application-test"
+
+  def shareLevel: String = ShareLevel.USER.toString
+
+  def engineType: String = "flink"
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 77ce3b3ee..908e407a9 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -23,12 +23,13 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 
 import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.PipelineOptions
 import org.apache.flink.table.types.logical.LogicalTypeRoot
 import org.apache.hive.service.rpc.thrift._
 
 import org.apache.kyuubi.Utils
 import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.flink.WithFlinkTestResources
+import org.apache.kyuubi.engine.flink.{FlinkEngineUtils, WithFlinkTestResources}
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
 import org.apache.kyuubi.jdbc.hive.KyuubiStatement
@@ -758,7 +759,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
       val metaData = resultSet.getMetaData
       assert(metaData.getColumnType(1) === java.sql.Types.ARRAY)
       assert(resultSet.next())
-      val expected = "[v1,v2,v3]"
+      val expected = "[\"v1\",\"v2\",\"v3\"]"
       assert(resultSet.getObject(1).toString == expected)
     }
   }
@@ -778,7 +779,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("select (1, '2', true)")
       assert(resultSet.next())
-      val expected = """{INT NOT NULL:1,CHAR(1) NOT NULL:2,BOOLEAN NOT NULL:true}"""
+      val expected = """{INT NOT NULL:1,CHAR(1) NOT NULL:"2",BOOLEAN NOT NULL:true}"""
       assert(resultSet.getString(1) == expected)
       val metaData = resultSet.getMetaData
       assert(metaData.getColumnType(1) === java.sql.Types.STRUCT)
@@ -955,7 +956,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
       statement.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
       val resultSet = statement.executeQuery("insert into tbl_a select 1")
       val metadata = resultSet.getMetaData
-      assert(metadata.getColumnName(1) === "result")
+      assert(metadata.getColumnName(1) === "job id")
       assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
       assert(resultSet.next())
       assert(resultSet.getString(1).length == 32)
@@ -973,7 +974,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
       statement.executeQuery("create table tbl_b (a int) with ('connector' = 'blackhole')")
       val resultSet = statement.executeQuery("insert into tbl_b select * from tbl_a")
       val metadata = resultSet.getMetaData
-      assert(metadata.getColumnName(1) === "result")
+      assert(metadata.getColumnName(1) === "job id")
       assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
       assert(resultSet.next())
       assert(resultSet.getString(1).length == 32)
@@ -984,11 +985,9 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
     withMultipleConnectionJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("set table.dynamic-table-options.enabled = true")
       val metadata = resultSet.getMetaData
-      assert(metadata.getColumnName(1) == "key")
-      assert(metadata.getColumnName(2) == "value")
+      assert(metadata.getColumnName(1) == "result")
       assert(resultSet.next())
-      assert(resultSet.getString(1) == "table.dynamic-table-options.enabled")
-      assert(resultSet.getString(2) == "true")
+      assert(resultSet.getString(1) == "OK")
     }
   }
 
@@ -1003,16 +1002,17 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
   }
 
   test("execute statement - reset property") {
+    val originalName = "test-job" // defined in WithFlinkTestResource
     withMultipleConnectionJdbcStatement() { statement =>
-      statement.executeQuery("set pipeline.jars = my.jar")
-      statement.executeQuery("reset pipeline.jars")
+      statement.executeQuery(s"set ${PipelineOptions.NAME.key()} = wrong-name")
+      statement.executeQuery(s"reset ${PipelineOptions.NAME.key()}")
       val resultSet = statement.executeQuery("set")
       // Flink does not support set key without value currently,
       // thus read all rows to find the desired one
       var success = false
       while (resultSet.next()) {
-        if (resultSet.getString(1) == "pipeline.jars" &&
-          !resultSet.getString(2).contains("my.jar")) {
+        if (resultSet.getString(1) == PipelineOptions.NAME.key() &&
+          resultSet.getString(2).equals(originalName)) {
           success = true
         }
       }
@@ -1066,7 +1066,31 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
     }
   }
 
-  test("execute statement - add/remove/show jar") {
+  test("execute statement - add/show jar") {
+    val jarName = s"newly-added-${UUID.randomUUID()}.jar"
+    val newJar = TestUserClassLoaderJar.createJarFile(
+      Utils.createTempDir("add-jar-test").toFile,
+      jarName,
+      GENERATED_UDF_CLASS,
+      GENERATED_UDF_CODE).toPath
+
+    withMultipleConnectionJdbcStatement()({ statement =>
+      statement.execute(s"add jar '$newJar'")
+
+      val showJarsResultAdded = statement.executeQuery("show jars")
+      var exists = false
+      while (showJarsResultAdded.next()) {
+        if (showJarsResultAdded.getString(1).contains(jarName)) {
+          exists = true
+        }
+      }
+      assert(exists)
+    })
+  }
+
+  // ignored because Flink gateway doesn't support remove-jar statements
+  // see org.apache.flink.table.gateway.service.operation.OperationExecutor#callRemoveJar(..)
+  ignore("execute statement - remove jar") {
     val jarName = s"newly-added-${UUID.randomUUID()}.jar"
     val newJar = TestUserClassLoaderJar.createJarFile(
       Utils.createTempDir("add-jar-test").toFile,
@@ -1136,9 +1160,12 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
       assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
       stmt.executeQuery("insert into tbl_a values (1)")
       val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
-      assert(queryId !== null)
-      // parse the string to check if it's valid Flink job id
-      assert(JobID.fromHexString(queryId) !== null)
+      // Flink 1.16 doesn't support query id via ResultFetcher
+      if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
+        assert(queryId !== null)
+        // parse the string to check if it's valid Flink job id
+        assert(JobID.fromHexString(queryId) !== null)
+      }
     }
   }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
index 1657f21f6..17c49464f 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
@@ -18,21 +18,33 @@
 package org.apache.kyuubi.engine.flink.operation
 
 import java.sql.Statement
+import java.util.UUID
 
 import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
 import org.apache.kyuubi.operation.{AnalyzeMode, ExecutionMode, HiveJDBCTestHelper, ParseMode, PhysicalMode}
 
-class PlanOnlyOperationSuite extends WithFlinkSQLEngineLocal with HiveJDBCTestHelper {
+class PlanOnlyOperationSuite extends WithFlinkSQLEngineLocal
+  with HiveJDBCTestHelper with WithDiscoveryFlinkSQLEngine {
+
+  override protected def engineRefId: String = UUID.randomUUID().toString
+
+  override protected def namespace: String = "/kyuubi/flink-plan-only-test"
+
+  def engineType: String = "flink"
 
   override def withKyuubiConf: Map[String, String] =
     Map(
+      "flink.execution.target" -> "remote",
+      HA_NAMESPACE.key -> namespace,
+      HA_ENGINE_REF_ID.key -> engineRefId,
+      KyuubiConf.ENGINE_TYPE.key -> "FLINK_SQL",
       KyuubiConf.ENGINE_SHARE_LEVEL.key -> "user",
       KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> ParseMode.name,
-      KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key -> "plan-only")
+      KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key -> "plan-only") ++ testExtraConf
 
-  override protected def jdbcUrl: String =
-    s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+  override protected def jdbcUrl: String = getFlinkEngineServiceUrl
 
   test("Plan only operation with system defaults") {
     withJdbcStatement() { statement =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 02aed2866..8c92b77ef 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -176,7 +176,7 @@ object KyuubiApplicationManager {
         // if the master is not identified ahead, add all tags
         setupSparkYarnTag(applicationTag, conf)
         setupSparkK8sTag(applicationTag, conf)
-      case ("FLINK", _) =>
+      case ("FLINK", Some("YARN")) =>
         // running flink on other platforms is not yet supported
         setupFlinkYarnTag(applicationTag, conf)
       // other engine types are running locally yet
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 8642d87d7..d8d46e427 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -54,6 +54,9 @@ class FlinkProcessBuilder(
     Paths.get(flinkHome, "bin", FLINK_EXEC_FILE).toFile.getCanonicalPath
   }
 
+  // flink.execution.target are required in Kyuubi conf currently
+  val executionTarget: Option[String] = conf.getOption("flink.execution.target")
+
   override protected def module: String = "kyuubi-flink-sql-engine"
 
   override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
@@ -63,13 +66,17 @@ class FlinkProcessBuilder(
       "FLINK_CONF_DIR",
       s"$flinkHome${File.separator}conf"))
 
-  override def clusterManager(): Option[String] = Some("yarn")
+  override def clusterManager(): Option[String] = {
+    executionTarget match {
+      case Some("yarn-application") => Some("yarn")
+      case _ => None
+    }
+  }
 
   override protected val commands: Array[String] = {
     KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
 
     // flink.execution.target are required in Kyuubi conf currently
-    val executionTarget = conf.getOption("flink.execution.target")
     executionTarget match {
       case Some("yarn-application") =>
         val buffer = new ArrayBuffer[String]()
@@ -133,16 +140,16 @@ class FlinkProcessBuilder(
         val classpathEntries = new java.util.LinkedHashSet[String]
         // flink engine runtime jar
         mainResource.foreach(classpathEntries.add)
-        // flink sql client jar
-        val flinkSqlClientPath = Paths.get(flinkHome)
+        // flink sql jars
+        Paths.get(flinkHome)
           .resolve("opt")
           .toFile
           .listFiles(new FilenameFilter {
             override def accept(dir: File, name: String): Boolean = {
-              name.toLowerCase.startsWith("flink-sql-client")
+              name.toLowerCase.startsWith("flink-sql-client") ||
+              name.toLowerCase.startsWith("flink-sql-gateway")
             }
-          }).head.getAbsolutePath
-        classpathEntries.add(flinkSqlClientPath)
+          }).sorted.foreach(jar => classpathEntries.add(jar.getAbsolutePath))
 
         // jars from flink lib
         classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 53450b589..45272618d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -63,7 +63,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
     (FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
   private def confStr: String = {
-    sessionModeConf.clone.set("yarn.tags", "KYUUBI").getAll
+    sessionModeConf.clone.getAll
       .map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
       .mkString(" ")
   }
@@ -106,6 +106,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
 
     val flinkHome = builder.flinkHome
     classpathEntries.add(s"$flinkHome$flinkSqlClientJarPathSuffixRegex")
+    classpathEntries.add(s"$flinkHome$flinkSqlGatewayJarPathSuffixRegex")
     classpathEntries.add(s"$flinkHome$flinkLibPathSuffixRegex")
     classpathEntries.add(s"$flinkHome$flinkConfPathSuffix")
     val envMap = builder.env
@@ -123,6 +124,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
   private val javaPath = s"${envDefault("JAVA_HOME")}${File.separator}bin${File.separator}java"
   private val flinkSqlClientJarPathSuffixRegex = s"${File.separator}opt${File.separator}" +
     s"flink-sql-client-.*.jar"
+  private val flinkSqlGatewayJarPathSuffixRegex = s"${File.separator}opt${File.separator}" +
+    s"flink-sql-gateway-.*.jar"
   private val flinkLibPathSuffixRegex = s"${File.separator}lib${File.separator}\\*"
   private val flinkConfPathSuffix = s"${File.separator}conf"
   private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
diff --git a/pom.xml b/pom.xml
index 520b181d5..1feae0322 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,7 +135,7 @@
         <failsafe.verion>2.4.4</failsafe.verion>
         <fb303.version>0.9.3</fb303.version>
         <flexmark.version>0.62.2</flexmark.version>
-        <flink.version>1.16.1</flink.version>
+        <flink.version>1.17.0</flink.version>
         <flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
         <flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
         <flink.archive.download.skip>false</flink.archive.download.skip>
@@ -1624,6 +1624,12 @@
                 <version>${flink.version}</version>
             </dependency>
 
+            <dependency>
+                <groupId>org.apache.flink</groupId>
+                <artifactId>flink-sql-gateway</artifactId>
+                <version>${flink.version}</version>
+            </dependency>
+
             <dependency>
                 <groupId>org.apache.flink</groupId>
                 <artifactId>flink-test-utils</artifactId>
@@ -2390,16 +2396,16 @@
         </profile>
 
         <profile>
-            <id>flink-1.15</id>
+            <id>flink-1.16</id>
             <properties>
-                <flink.version>1.15.4</flink.version>
+                <flink.version>1.16.1</flink.version>
             </properties>
         </profile>
 
         <profile>
-            <id>flink-1.16</id>
+            <id>flink-1.17</id>
             <properties>
-                <flink.version>1.16.1</flink.version>
+                <flink.version>1.17.0</flink.version>
             </properties>
         </profile>