You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/04/17 12:11:33 UTC
[kyuubi] branch master updated: [KYUUBI #4367] Support Flink 1.17
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 553b2aafe [KYUUBI #4367] Support Flink 1.17
553b2aafe is described below
commit 553b2aafe7a29546ee410f44640d8e92cdf58951
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Apr 17 20:11:22 2023 +0800
[KYUUBI #4367] Support Flink 1.17
### _Why are the changes needed?_
Support Flink 1.17 and Flink SQL gateway.
1. Drop Flink 1.15
2. Migrate API to Flink SQL Gateway
3. Support Flink 1.17
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4368 from link3280/flink-1.17.
Closes #4367
8eb4da6c0 [Paul Lin] [KYUUBI #4367] Fix test failure
81a10f6be [Paul Lin] [KYUUBI #4367] Fix test failure
23d87ba1d [Paul Lin] [KYUUBI #4367] Rename delegation package to shim
5c9d0aa84 [Paul Lin] [KYUUBI #4367] Improve code style
56567fcd7 [Paul Lin] [KYUUBI #4367] Improve java.long.Long usage
417d37b27 [Paul Lin] Update externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
08f89991a [Paul Lin] [KYUUBI #4367] Fix ambiguous reference
ed950600c [Paul Lin] [KYUUBI #4367] Fix spotless
7b28eaf11 [Paul Lin] [KYUUBI #4367] Improve code style for iterations
c2a23d571 [Paul Lin] [KYUUBI #4367] Improve code style for error messages and iterations
7e36e70c7 [Paul Lin] [KYUUBI #4367] Improve code style for java.lang.Boolean
4ef8c5209 [Paul Lin] [KYUUBI #4367] Improve code style for java.util.*
8530aec2a [Paul Lin] [KYUUBI #4367] Remove unnecessary classes
1c41e4400 [Paul Lin] [KYUUBI #4367] Remove unnecessary variables
33eeb37ee [Paul Lin] [KYUUBI #4367] Remove unnecessary reflection code
e1e5cd2cf [Paul Lin] [KYUUBI #4367] Fix IncompatibleClassChangeError
3520a5153 [Paul Lin] [KYUUBI #4367] Fix IncompatibleClassChangeError
42cce7a54 [Paul Lin] [KYUUBI #4367] Replace vanilla reflection with kyuubi refection tools
20e9913e3 [Paul Lin] [KYUUBI #4367] Fix FlinkProcessBuilder test error
a02e01adf [Paul Lin] [KYUUBI #4367] Improve code style
20e1a559e [Paul Lin] [KYUUBI #4367] Use kyuubi refection tools
9b2072e45 [Paul Lin] [KYUUBI #4367] Improve flink version match
7ce1e9a12 [Paul Lin] [KYUUBI #4367] Fix local engine tagged as YARN app
fd0c88d15 [Paul Lin] Revert "[KYUUBI #4367] Filter out non kyuubi prefixed conf in flink login engine"
f71c6014e [Paul Lin] [KYUUBI #4367] Fix local engine tagged as YARN app
b7d46f57d [Paul Lin] [KYUUBI #4367] Filter out non kyuubi prefixed conf in flink login engine
47beb1a78 [Paul Lin] [KYUUBI #4367] Refactor Flink engine tests
7e1a198ca [Paul Lin] [KYUUBI #4367] Fix flink sql gateway jar not included in local mode
e851d9732 [Paul Lin] [KYUUBI #4367] Disable query id test for flink 1.16
7291e27fa [Paul Lin] [KYUUBI #4367] Remove profile for flink-1.15
54cfe3bbc [Paul Lin] [KYUUBI #4367] Fix udf not found in local flink engine tests
1a7833bf2 [Paul Lin] [KYUUBI #4367] Fix test failure in PlanOnlyStatementSuit
700ee04db [Paul Lin] [KYUUBI #4367] Fix FLINK_CONF_DIR not set in ut
b685ff139 [Paul Lin] [KYUUBI #4367] Improve code style
29728c042 [Paul Lin] [KYUUBI #4367] Fix Flink conf dir not found
799c93876 [Paul Lin] [KYUUBI #4367] Fix NoSuchFieldException
614ecc335 [Paul Lin] [KYUUBI #4367] Fix reflection failures
6a08d0bbe [Paul Lin] [KYUUBI #4367] Fix NPE in dependencies
d289495c0 [Paul Lin] [KYUUBI #4367] Flink FlinkSQLEngine capabilities with Flink 1.16
ef6f4d4ff [Paul Lin] [KYUUBI #4367] Remove support for Flink 1.15
e18b3c2ed [Paul Lin] [KYUUBI #4367] Fix Flink SessionManager compatibility issue
49e0a94be [Paul Lin] feat: Support Flink 1.17
Authored-by: Paul Lin <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.github/workflows/master.yml | 8 +-
externals/kyuubi-flink-sql-engine/pom.xml | 6 +
.../kyuubi/engine/flink/FlinkEngineUtils.scala | 151 ++++++++++-----
.../engine/flink/FlinkSQLBackendService.scala | 2 +-
.../kyuubi/engine/flink/FlinkSQLEngine.scala | 55 +-----
.../engine/flink/operation/ExecuteStatement.scala | 203 +--------------------
.../engine/flink/operation/FlinkOperation.scala | 13 +-
.../flink/operation/FlinkSQLOperationManager.scala | 14 +-
.../engine/flink/operation/GetCatalogs.scala | 6 +-
.../kyuubi/engine/flink/operation/GetColumns.scala | 11 +-
.../engine/flink/operation/GetCurrentCatalog.scala | 3 +-
.../flink/operation/GetCurrentDatabase.scala | 3 +-
.../engine/flink/operation/GetFunctions.scala | 18 +-
.../engine/flink/operation/GetPrimaryKeys.scala | 19 +-
.../kyuubi/engine/flink/operation/GetSchemas.scala | 11 +-
.../kyuubi/engine/flink/operation/GetTables.scala | 6 +-
.../engine/flink/operation/OperationUtils.scala | 172 -----------------
.../engine/flink/operation/PlanOnlyStatement.scala | 28 +--
.../engine/flink/operation/SetCurrentCatalog.scala | 4 +-
.../flink/operation/SetCurrentDatabase.scala | 4 +-
.../kyuubi/engine/flink/result/ResultSet.scala | 13 +-
.../kyuubi/engine/flink/result/ResultSetUtil.scala | 71 +++++++
.../flink/session/FlinkSQLSessionManager.scala | 33 +++-
.../engine/flink/session/FlinkSessionImpl.scala | 21 ++-
.../kyuubi/engine/flink/shim/FlinkResultSet.scala | 78 ++++++++
.../engine/flink/shim/FlinkSessionManager.scala | 130 +++++++++++++
.../engine/flink/WithDiscoveryFlinkSQLEngine.scala | 26 +--
.../engine/flink/WithFlinkSQLEngineLocal.scala | 190 ++++++++++++++++---
.../engine/flink/WithFlinkSQLEngineOnYarn.scala | 2 +-
.../engine/flink/WithFlinkTestResources.scala | 7 +-
.../flink/operation/FlinkOperationLocalSuite.scala | 30 ++-
.../operation/FlinkOperationOnYarnSuite.scala | 25 ++-
.../flink/operation/FlinkOperationSuite.scala | 61 +++++--
.../flink/operation/PlanOnlyOperationSuite.scala | 22 ++-
.../kyuubi/engine/KyuubiApplicationManager.scala | 2 +-
.../kyuubi/engine/flink/FlinkProcessBuilder.scala | 21 ++-
.../engine/flink/FlinkProcessBuilderSuite.scala | 5 +-
pom.xml | 16 +-
38 files changed, 844 insertions(+), 646 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index ece87e265..04ecb1a60 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -157,15 +157,15 @@ jobs:
- 8
- 11
flink:
- - '1.15'
- '1.16'
+ - '1.17'
flink-archive: [ "" ]
comment: [ "normal" ]
include:
- java: 8
- flink: '1.16'
- flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.15.4 -Dflink.archive.name=flink-1.15.4-bin-scala_2.12.tgz'
- comment: 'verify-on-flink-1.15-binary'
+ flink: '1.17'
+ flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.16.1 -Dflink.archive.name=flink-1.16.1-bin-scala_2.12.tgz'
+ comment: 'verify-on-flink-1.16-binary'
steps:
- uses: actions/checkout@v3
- name: Tune Runner VM
diff --git a/externals/kyuubi-flink-sql-engine/pom.xml b/externals/kyuubi-flink-sql-engine/pom.xml
index 0e499f978..c73310a64 100644
--- a/externals/kyuubi-flink-sql-engine/pom.xml
+++ b/externals/kyuubi-flink-sql-engine/pom.xml
@@ -75,6 +75,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-gateway</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
index 69fc8c695..f9289ea81 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkEngineUtils.scala
@@ -18,29 +18,37 @@
package org.apache.kyuubi.engine.flink
import java.io.File
+import java.lang.{Boolean => JBoolean}
import java.net.URL
+import java.util.{ArrayList => JArrayList, Collections => JCollections, List => JList}
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
-import org.apache.commons.cli.{CommandLine, DefaultParser, Option, Options, ParseException}
+import org.apache.commons.cli.{CommandLine, DefaultParser, Options}
+import org.apache.flink.api.common.JobID
+import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI, GenericCLI}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.core.fs.Path
import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.table.client.SqlClientException
-import org.apache.flink.table.client.cli.CliOptions
+import org.apache.flink.table.client.cli.CliOptionsParser
import org.apache.flink.table.client.cli.CliOptionsParser._
-import org.apache.flink.table.client.gateway.context.SessionContext
-import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.table.gateway.service.context.{DefaultContext, SessionContext}
+import org.apache.flink.table.gateway.service.result.ResultFetcher
+import org.apache.flink.table.gateway.service.session.Session
+import org.apache.flink.util.JarUtils
-import org.apache.kyuubi.Logging
+import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.engine.SemanticVersion
+import org.apache.kyuubi.reflection.{DynConstructors, DynFields, DynMethods}
object FlinkEngineUtils extends Logging {
- val MODE_EMBEDDED = "embedded"
- val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options);
+ val EMBEDDED_MODE_CLIENT_OPTIONS: Options = getEmbeddedModeClientOptions(new Options)
val SUPPORTED_FLINK_VERSIONS: Array[SemanticVersion] =
- Array("1.15", "1.16").map(SemanticVersion.apply)
+ Array("1.16", "1.17").map(SemanticVersion.apply)
def checkFlinkVersion(): Unit = {
val flinkVersion = EnvironmentInformation.getVersion
@@ -62,47 +70,106 @@ object FlinkEngineUtils extends Logging {
def isFlinkVersionEqualTo(targetVersionString: String): Boolean =
SemanticVersion(EnvironmentInformation.getVersion).isVersionEqualTo(targetVersionString)
- def parseCliOptions(args: Array[String]): CliOptions = {
- val (mode, modeArgs) =
- if (args.isEmpty || args(0).startsWith("-")) (MODE_EMBEDDED, args)
- else (args(0), args.drop(1))
- val options = parseEmbeddedModeClient(modeArgs)
- if (mode == MODE_EMBEDDED) {
- if (options.isPrintHelp) {
- printHelpEmbeddedModeClient()
+ /**
+ * Copied and modified from [[org.apache.flink.table.client.cli.CliOptionsParser]]
+ * to avoid loading flink-python classes which we doesn't support yet.
+ */
+ private def discoverDependencies(
+ jars: JList[URL],
+ libraries: JList[URL]): JList[URL] = {
+ val dependencies: JList[URL] = new JArrayList[URL]
+ try { // find jar files
+ for (url <- jars) {
+ JarUtils.checkJarFile(url)
+ dependencies.add(url)
}
- options
+ // find jar files in library directories
+ libraries.foreach { libUrl =>
+ val dir: File = new File(libUrl.toURI)
+ if (!dir.isDirectory) throw new SqlClientException(s"Directory expected: $dir")
+ if (!dir.canRead) throw new SqlClientException(s"Directory cannot be read: $dir")
+ val files: Array[File] = dir.listFiles
+ if (files == null) throw new SqlClientException(s"Directory cannot be read: $dir")
+ files.filter { f => f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar") }
+ .foreach { f =>
+ val url: URL = f.toURI.toURL
+ JarUtils.checkJarFile(url)
+ dependencies.add(url)
+ }
+ }
+ } catch {
+ case e: Exception =>
+ throw new SqlClientException("Could not load all required JAR files.", e)
+ }
+ dependencies
+ }
+
+ def getDefaultContext(
+ args: Array[String],
+ flinkConf: Configuration,
+ flinkConfDir: String): DefaultContext = {
+ val parser = new DefaultParser
+ val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
+ val jars: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_JAR))
+ .getOrElse(JCollections.emptyList())
+ val libDirs: JList[URL] = Option(checkUrls(line, CliOptionsParser.OPTION_LIBRARY))
+ .getOrElse(JCollections.emptyList())
+ val dependencies: JList[URL] = discoverDependencies(jars, libDirs)
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ val commandLines: JList[CustomCommandLine] =
+ Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava
+ DynConstructors.builder()
+ .impl(
+ classOf[DefaultContext],
+ classOf[Configuration],
+ classOf[JList[CustomCommandLine]])
+ .build()
+ .newInstance(flinkConf, commandLines)
+ .asInstanceOf[DefaultContext]
+ } else if (FlinkEngineUtils.isFlinkVersionEqualTo("1.17")) {
+ DynMethods.builder("load")
+ .impl(
+ classOf[DefaultContext],
+ classOf[Configuration],
+ classOf[JList[URL]],
+ classOf[Boolean],
+ classOf[Boolean])
+ .buildStatic()
+ .invoke[DefaultContext](
+ flinkConf,
+ dependencies,
+ new JBoolean(true),
+ new JBoolean(false))
} else {
- throw new SqlClientException("Other mode is not supported yet.")
+ throw new KyuubiException(
+ s"Flink version ${EnvironmentInformation.getVersion} are not supported currently.")
}
}
- def getSessionContext(localExecutor: LocalExecutor, sessionId: String): SessionContext = {
- val method = classOf[LocalExecutor].getDeclaredMethod("getSessionContext", classOf[String])
- method.setAccessible(true)
- method.invoke(localExecutor, sessionId).asInstanceOf[SessionContext]
+ def getSessionContext(session: Session): SessionContext = {
+ DynFields.builder()
+ .hiddenImpl(classOf[Session], "sessionContext")
+ .build()
+ .get(session)
+ .asInstanceOf[SessionContext]
}
- def parseEmbeddedModeClient(args: Array[String]): CliOptions =
+ def getResultJobId(resultFetch: ResultFetcher): Option[JobID] = {
+ if (FlinkEngineUtils.isFlinkVersionAtMost("1.16")) {
+ return None
+ }
try {
- val parser = new DefaultParser
- val line = parser.parse(EMBEDDED_MODE_CLIENT_OPTIONS, args, true)
- val jarUrls = checkUrls(line, OPTION_JAR)
- val libraryUrls = checkUrls(line, OPTION_LIBRARY)
- new CliOptions(
- line.hasOption(OPTION_HELP.getOpt),
- checkSessionId(line),
- checkUrl(line, OPTION_INIT_FILE),
- checkUrl(line, OPTION_FILE),
- if (jarUrls != null && jarUrls.nonEmpty) jarUrls.asJava else null,
- if (libraryUrls != null && libraryUrls.nonEmpty) libraryUrls.asJava else null,
- line.getOptionValue(OPTION_UPDATE.getOpt),
- line.getOptionValue(OPTION_HISTORY.getOpt),
- null)
+ Option(DynFields.builder()
+ .hiddenImpl(classOf[ResultFetcher], "jobID")
+ .build()
+ .get(resultFetch)
+ .asInstanceOf[JobID])
} catch {
- case e: ParseException =>
- throw new SqlClientException(e.getMessage)
+ case _: NullPointerException => None
+ case e: Throwable =>
+ throw new IllegalStateException("Unexpected error occurred while fetching query ID", e)
}
+ }
def checkSessionId(line: CommandLine): String = {
val sessionId = line.getOptionValue(OPTION_SESSION.getOpt)
@@ -111,13 +178,13 @@ object FlinkEngineUtils extends Logging {
} else sessionId
}
- def checkUrl(line: CommandLine, option: Option): URL = {
- val urls: List[URL] = checkUrls(line, option)
+ def checkUrl(line: CommandLine, option: org.apache.commons.cli.Option): URL = {
+ val urls: JList[URL] = checkUrls(line, option)
if (urls != null && urls.nonEmpty) urls.head
else null
}
- def checkUrls(line: CommandLine, option: Option): List[URL] = {
+ def checkUrls(line: CommandLine, option: org.apache.commons.cli.Option): JList[URL] = {
if (line.hasOption(option.getOpt)) {
line.getOptionValues(option.getOpt).distinct.map((url: String) => {
checkFilePath(url)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
index d049e3c80..9802f1955 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLBackendService.scala
@@ -17,7 +17,7 @@
package org.apache.kyuubi.engine.flink
-import org.apache.flink.table.client.gateway.context.DefaultContext
+import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.kyuubi.engine.flink.session.FlinkSQLSessionManager
import org.apache.kyuubi.service.AbstractBackendService
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
index 42061a369..48a354b0f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/FlinkSQLEngine.scala
@@ -18,22 +18,17 @@
package org.apache.kyuubi.engine.flink
import java.io.File
-import java.net.URL
import java.nio.file.Paths
import java.time.Instant
import java.util.concurrent.CountDownLatch
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
-import org.apache.flink.client.cli.{DefaultCLI, GenericCLI}
import org.apache.flink.configuration.{Configuration, DeploymentOptions, GlobalConfiguration}
import org.apache.flink.table.api.TableEnvironment
-import org.apache.flink.table.client.SqlClientException
-import org.apache.flink.table.client.gateway.context.DefaultContext
-import org.apache.flink.util.JarUtils
+import org.apache.flink.table.gateway.service.context.DefaultContext
-import org.apache.kyuubi.{KyuubiSQLException, Logging, Utils}
+import org.apache.kyuubi.{Logging, Utils}
import org.apache.kyuubi.Utils.{addShutdownHook, currentUser, FLINK_ENGINE_SHUTDOWN_PRIORITY}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.engine.flink.FlinkSQLEngine.{countDownLatch, currentEngine}
@@ -118,18 +113,9 @@ object FlinkSQLEngine extends Logging {
debug(s"Skip generating app name for execution target $other")
}
- val cliOptions = FlinkEngineUtils.parseCliOptions(args)
- val jars = if (cliOptions.getJars != null) cliOptions.getJars.asScala else List.empty
- val libDirs =
- if (cliOptions.getLibraryDirs != null) cliOptions.getLibraryDirs.asScala else List.empty
- val dependencies = discoverDependencies(jars, libDirs)
- val engineContext = new DefaultContext(
- dependencies.asJava,
- flinkConf,
- Seq(new GenericCLI(flinkConf, flinkConfDir), new DefaultCLI).asJava)
-
kyuubiConf.setIfMissing(KyuubiConf.FRONTEND_THRIFT_BINARY_BIND_PORT, 0)
+ val engineContext = FlinkEngineUtils.getDefaultContext(args, flinkConf, flinkConfDir)
startEngine(engineContext)
info("Flink engine started")
@@ -146,7 +132,7 @@ object FlinkSQLEngine extends Logging {
engine.stop()
}
case t: Throwable =>
- error("Create FlinkSQL Engine Failed", t)
+ error("Failed to create FlinkSQL Engine", t)
}
}
@@ -167,37 +153,4 @@ object FlinkSQLEngine extends Logging {
res.await()
info("Initial Flink SQL finished.")
}
-
- private def discoverDependencies(
- jars: Seq[URL],
- libraries: Seq[URL]): List[URL] = {
- try {
- var dependencies: ListBuffer[URL] = ListBuffer()
- // find jar files
- jars.foreach { url =>
- JarUtils.checkJarFile(url)
- dependencies = dependencies += url
- }
- // find jar files in library directories
- libraries.foreach { libUrl =>
- val dir: File = new File(libUrl.toURI)
- if (!dir.isDirectory) throw new SqlClientException("Directory expected: " + dir)
- else if (!dir.canRead) throw new SqlClientException("Directory cannot be read: " + dir)
- val files: Array[File] = dir.listFiles
- if (files == null) throw new SqlClientException("Directory cannot be read: " + dir)
- files.foreach { f =>
- // only consider jars
- if (f.isFile && f.getAbsolutePath.toLowerCase.endsWith(".jar")) {
- val url: URL = f.toURI.toURL
- JarUtils.checkJarFile(url)
- dependencies = dependencies += url
- }
- }
- }
- dependencies.toList
- } catch {
- case e: Exception =>
- throw KyuubiSQLException(s"Could not load all required JAR files.", e)
- }
- }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 10ad5bf6d..4042756b6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -17,32 +17,15 @@
package org.apache.kyuubi.engine.flink.operation
-import java.time.{LocalDate, LocalTime}
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
import org.apache.flink.api.common.JobID
-import org.apache.flink.table.api.ResultKind
-import org.apache.flink.table.client.gateway.TypedResult
-import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
-import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
-import org.apache.flink.table.operations.{ModifyOperation, Operation, QueryOperation}
-import org.apache.flink.table.operations.command._
-import org.apache.flink.table.types.DataType
-import org.apache.flink.table.types.logical._
-import org.apache.flink.types.Row
+import org.apache.flink.table.gateway.api.operation.OperationHandle
import org.apache.kyuubi.Logging
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils._
-import org.apache.kyuubi.engine.flink.result.ResultSet
-import org.apache.kyuubi.engine.flink.schema.RowSet.toHiveString
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.OperationState
import org.apache.kyuubi.operation.log.OperationLog
-import org.apache.kyuubi.reflection.DynMethods
import org.apache.kyuubi.session.Session
-import org.apache.kyuubi.util.RowSetUtils
class ExecuteStatement(
session: Session,
@@ -77,22 +60,11 @@ class ExecuteStatement(
private def executeStatement(): Unit = {
try {
setState(OperationState.RUNNING)
- val operation = executor.parseStatement(sessionId, statement)
- operation match {
- case queryOperation: QueryOperation => runQueryOperation(queryOperation)
- case modifyOperation: ModifyOperation => runModifyOperation(modifyOperation)
- case setOperation: SetOperation =>
- resultSet = OperationUtils.runSetOperation(setOperation, executor, sessionId)
- case resetOperation: ResetOperation =>
- resultSet = OperationUtils.runResetOperation(resetOperation, executor, sessionId)
- case addJarOperation: AddJarOperation if isFlinkVersionAtMost("1.15") =>
- resultSet = OperationUtils.runAddJarOperation(addJarOperation, executor, sessionId)
- case removeJarOperation: RemoveJarOperation =>
- resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation, executor, sessionId)
- case showJarsOperation: ShowJarsOperation if isFlinkVersionAtMost("1.15") =>
- resultSet = OperationUtils.runShowJarOperation(showJarsOperation, executor, sessionId)
- case operation: Operation => runOperation(operation)
- }
+ val resultFetcher = executor.executeStatement(
+ new OperationHandle(getHandle.identifier),
+ statement)
+ jobId = FlinkEngineUtils.getResultJobId(resultFetcher)
+ resultSet = ResultSetUtil.fromResultFetcher(resultFetcher, resultMaxRows)
setState(OperationState.FINISHED)
} catch {
onError(cancel = true)
@@ -100,163 +72,4 @@ class ExecuteStatement(
shutdownTimeoutMonitor()
}
}
-
- private def runQueryOperation(operation: QueryOperation): Unit = {
- var resultId: String = null
- try {
- val resultDescriptor = executor.executeQuery(sessionId, operation)
- val dataTypes = resultDescriptor.getResultSchema.getColumnDataTypes.asScala.toList
-
- resultId = resultDescriptor.getResultId
-
- val rows = new ArrayBuffer[Row]()
- var loop = true
-
- while (loop) {
- Thread.sleep(50) // slow the processing down
-
- val pageSize = Math.min(500, resultMaxRows)
- val result = executor.snapshotResult(sessionId, resultId, pageSize)
- result.getType match {
- case TypedResult.ResultType.PAYLOAD =>
- (1 to result.getPayload).foreach { page =>
- if (rows.size < resultMaxRows) {
- val result = executor.retrieveResultPage(resultId, page)
- rows ++= result.asScala.map(r => convertToRow(r, dataTypes))
- } else {
- loop = false
- }
- }
- case TypedResult.ResultType.EOS => loop = false
- case TypedResult.ResultType.EMPTY =>
- }
- }
-
- resultSet = ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(resultDescriptor.getResultSchema.getColumns)
- .data(rows.slice(0, resultMaxRows).toArray[Row])
- .build
- } finally {
- if (resultId != null) {
- cleanupQueryResult(resultId)
- }
- }
- }
-
- private def runModifyOperation(operation: ModifyOperation): Unit = {
- val result = executor.executeOperation(sessionId, operation)
- jobId = result.getJobClient.asScala.map(_.getJobID)
- resultSet = ResultSet.fromJobId(jobId.orNull)
- }
-
- private def runOperation(operation: Operation): Unit = {
- val result = executor.executeOperation(sessionId, operation)
- jobId = result.getJobClient.asScala.map(_.getJobID)
- // after FLINK-24461, TableResult#await() would block insert statements
- // until the job finishes, instead of returning row affected immediately
- resultSet = ResultSet.fromTableResult(result)
- }
-
- private def cleanupQueryResult(resultId: String): Unit = {
- try {
- executor.cancelQuery(sessionId, resultId)
- } catch {
- case t: Throwable =>
- warn(s"Failed to clean result set $resultId in session $sessionId", t)
- }
- }
-
- private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
- val row = Row.withPositions(r.getRowKind, r.getArity)
- for (i <- 0 until r.getArity) {
- val dataType = dataTypes(i)
- dataType.getLogicalType match {
- case arrayType: ArrayType =>
- val arrayData = r.getArray(i)
- if (arrayData == null) {
- row.setField(i, null)
- }
- arrayData match {
- case d: GenericArrayData =>
- row.setField(i, d.toObjectArray)
- case d: BinaryArrayData =>
- row.setField(i, d.toObjectArray(arrayType.getElementType))
- case _ =>
- }
- case _: BinaryType | _: VarBinaryType =>
- row.setField(i, r.getBinary(i))
- case _: BigIntType =>
- row.setField(i, r.getLong(i))
- case _: BooleanType =>
- row.setField(i, r.getBoolean(i))
- case _: VarCharType | _: CharType =>
- row.setField(i, r.getString(i))
- case t: DecimalType =>
- row.setField(i, r.getDecimal(i, t.getPrecision, t.getScale).toBigDecimal)
- case _: DateType =>
- val date = RowSetUtils.formatLocalDate(LocalDate.ofEpochDay(r.getInt(i)))
- row.setField(i, date)
- case _: TimeType =>
- val time = RowSetUtils.formatLocalTime(LocalTime.ofNanoOfDay(r.getLong(i) * 1000 * 1000))
- row.setField(i, time)
- case t: TimestampType =>
- val ts = RowSetUtils
- .formatLocalDateTime(r.getTimestamp(i, t.getPrecision)
- .toLocalDateTime)
- row.setField(i, ts)
- case _: TinyIntType =>
- row.setField(i, r.getByte(i))
- case _: SmallIntType =>
- row.setField(i, r.getShort(i))
- case _: IntType =>
- row.setField(i, r.getInt(i))
- case _: FloatType =>
- row.setField(i, r.getFloat(i))
- case mapType: MapType =>
- val mapData = r.getMap(i)
- if (mapData != null && mapData.size > 0) {
- val keyType = mapType.getKeyType
- val valueType = mapType.getValueType
- mapData match {
- case d: BinaryMapData =>
- val kvArray = toArray(keyType, valueType, d)
- val map: util.Map[Any, Any] = new util.HashMap[Any, Any]
- for (i <- kvArray._1.indices) {
- val value: Any = kvArray._2(i)
- map.put(kvArray._1(i), value)
- }
- row.setField(i, map)
- case d: GenericMapData => // TODO
- }
- } else {
- row.setField(i, null)
- }
- case _: DoubleType =>
- row.setField(i, r.getDouble(i))
- case t: RowType =>
- val fieldDataTypes = DynMethods.builder("getFieldDataTypes")
- .impl(classOf[DataType], classOf[DataType])
- .buildStatic
- .invoke[util.List[DataType]](dataType)
- .asScala.toList
- val internalRowData = r.getRow(i, t.getFieldCount)
- val internalRow = convertToRow(internalRowData, fieldDataTypes)
- row.setField(i, internalRow)
- case t =>
- val hiveString = toHiveString((row.getField(i), t))
- row.setField(i, hiveString)
- }
- }
- row
- }
-
- private[this] def toArray(
- keyType: LogicalType,
- valueType: LogicalType,
- arrayData: BinaryMapData): (Array[_], Array[_]) = {
-
- arrayData.keyArray().toObjectArray(keyType) -> arrayData.valueArray().toObjectArray(valueType)
- }
-
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
index 2859d659e..d734cea05 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperation.scala
@@ -21,8 +21,9 @@ import java.io.IOException
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
-import org.apache.flink.table.client.gateway.Executor
-import org.apache.flink.table.client.gateway.context.SessionContext
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.table.gateway.service.context.SessionContext
+import org.apache.flink.table.gateway.service.operation.OperationExecutor
import org.apache.hive.service.rpc.thrift.{TGetResultSetMetadataResp, TRowSet, TTableSchema}
import org.apache.kyuubi.{KyuubiSQLException, Utils}
@@ -36,12 +37,16 @@ import org.apache.kyuubi.session.Session
abstract class FlinkOperation(session: Session) extends AbstractOperation(session) {
+ protected val flinkSession: org.apache.flink.table.gateway.service.session.Session =
+ session.asInstanceOf[FlinkSessionImpl].fSession
+
+ protected val executor: OperationExecutor = flinkSession.createExecutor(
+ Configuration.fromMap(flinkSession.getSessionConfig))
+
protected val sessionContext: SessionContext = {
session.asInstanceOf[FlinkSessionImpl].sessionContext
}
- protected val executor: Executor = session.asInstanceOf[FlinkSessionImpl].executor
-
protected val sessionId: String = session.handle.identifier.toString
protected var resultSet: ResultSet = _
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index d7b5e297d..712c13596 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
import org.apache.kyuubi.KyuubiSQLException
import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.session.FlinkSessionImpl
import org.apache.kyuubi.operation.{NoneMode, Operation, OperationManager, PlanOnlyMode}
@@ -44,7 +45,8 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
runAsync: Boolean,
queryTimeout: Long): Operation = {
val flinkSession = session.asInstanceOf[FlinkSessionImpl]
- if (flinkSession.sessionContext.getConfigMap.getOrDefault(
+ val sessionConfig = flinkSession.fSession.getSessionConfig
+ if (sessionConfig.getOrDefault(
ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key,
operationConvertCatalogDatabaseDefault.toString).toBoolean) {
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
@@ -53,11 +55,13 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
}
}
- val mode = PlanOnlyMode.fromString(flinkSession.sessionContext.getConfigMap.getOrDefault(
- OPERATION_PLAN_ONLY_MODE.key,
- operationModeDefault))
+ val mode = PlanOnlyMode.fromString(
+ sessionConfig.getOrDefault(
+ OPERATION_PLAN_ONLY_MODE.key,
+ operationModeDefault))
- flinkSession.sessionContext.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
+ val sessionContext = FlinkEngineUtils.getSessionContext(flinkSession.fSession)
+ sessionContext.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
val resultMaxRows =
flinkSession.normalizedConf.getOrElse(
ENGINE_FLINK_MAX_ROWS.key,
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
index 11dd760e4..245371681 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCatalogs.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink.operation
+import scala.collection.convert.ImplicitConversions._
+
import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant.TABLE_CAT
import org.apache.kyuubi.session.Session
@@ -25,8 +27,8 @@ class GetCatalogs(session: Session) extends FlinkOperation(session) {
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- val catalogs = tableEnv.listCatalogs.toList
+ val catalogManager = sessionContext.getSessionState.catalogManager
+ val catalogs = catalogManager.listCatalogs.toList
resultSet = ResultSetUtil.stringListToResultSet(catalogs, TABLE_CAT)
} catch onError()
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
index 6ce2a6ac7..b1a7c0c3e 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
@@ -21,7 +21,7 @@ import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
@@ -40,17 +40,17 @@ class GetColumns(
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
val catalogName =
- if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+ if (StringUtils.isEmpty(catalogNameOrEmpty)) executor.getCurrentCatalog
else catalogNameOrEmpty
val schemaNameRegex = toJavaRegex(schemaNamePattern)
val tableNameRegex = toJavaRegex(tableNamePattern)
val columnNameRegex = toJavaRegex(columnNamePattern).r
- val columns = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
+ val catalogManager = sessionContext.getSessionState.catalogManager
+ val columns = catalogManager.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
.flatMap { schemaName =>
SchemaHelper.getFlinkTablesWithPattern(
@@ -60,7 +60,8 @@ class GetColumns(
tableNameRegex)
.filter { _._2.isDefined }
.flatMap { case (tableName, _) =>
- val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+ val flinkTable = catalogManager.getTable(
+ ObjectIdentifier.of(catalogName, schemaName, tableName)).get()
val resolvedSchema = flinkTable.getResolvedSchema
resolvedSchema.getColumns.asScala.toArray.zipWithIndex
.filter { case (column, _) =>
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
index 3e42e9aa6..5f82de4a6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentCatalog.scala
@@ -31,8 +31,7 @@ class GetCurrentCatalog(session: Session) extends FlinkOperation(session) {
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- val catalog = tableEnv.getCurrentCatalog
+ val catalog = executor.getCurrentCatalog
resultSet = ResultSetUtil.stringListToResultSet(List(catalog), TABLE_CAT)
} catch onError()
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
index 014ca2ea3..107609c06 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetCurrentDatabase.scala
@@ -31,8 +31,7 @@ class GetCurrentDatabase(session: Session) extends FlinkOperation(session) {
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- val database = tableEnv.getCurrentDatabase
+ val database = sessionContext.getSessionState.catalogManager.getCurrentDatabase
resultSet = ResultSetUtil.stringListToResultSet(List(database), TABLE_SCHEM)
} catch onError()
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala
index ab870ab79..85f34a29a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetFunctions.scala
@@ -20,9 +20,10 @@ package org.apache.kyuubi.engine.flink.operation
import java.sql.DatabaseMetaData
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment}
+import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
@@ -42,17 +43,20 @@ class GetFunctions(
try {
val schemaPattern = toJavaRegex(schemaName)
val functionPattern = toJavaRegex(functionName)
- val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
+ val functionCatalog = sessionContext.getSessionState.functionCatalog
+ val catalogManager = sessionContext.getSessionState.catalogManager
+
val systemFunctions = filterPattern(
- tableEnv.listFunctions().diff(tableEnv.listUserDefinedFunctions()),
+ functionCatalog.getFunctions
+ .diff(functionCatalog.getUserDefinedFunctions),
functionPattern)
.map { f =>
Row.of(null, null, f, null, Integer.valueOf(DatabaseMetaData.functionResultUnknown), null)
- }
- val catalogFunctions = tableEnv.listCatalogs()
+ }.toArray
+ val catalogFunctions = catalogManager.listCatalogs()
.filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
.flatMap { c =>
- val catalog = tableEnv.getCatalog(c).get()
+ val catalog = catalogManager.getCatalog(c).get()
filterPattern(catalog.listDatabases().asScala, schemaPattern)
.flatMap { d =>
filterPattern(catalog.listFunctions(d).asScala, functionPattern)
@@ -66,7 +70,7 @@ class GetFunctions(
null)
}
}
- }
+ }.toArray
resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical(FUNCTION_CAT, DataTypes.STRING()),
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala
index b534feb1f..5b9060cf1 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetPrimaryKeys.scala
@@ -21,8 +21,9 @@ import scala.collection.JavaConverters._
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
import org.apache.flink.types.Row
+import org.apache.flink.util.FlinkException
import org.apache.kyuubi.engine.flink.result.ResultSet
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -37,22 +38,25 @@ class GetPrimaryKeys(
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ val catalogManager = sessionContext.getSessionState.catalogManager
val catalogName =
- if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+ if (StringUtils.isEmpty(catalogNameOrEmpty)) catalogManager.getCurrentCatalog
else catalogNameOrEmpty
val schemaName =
if (StringUtils.isEmpty(schemaNameOrEmpty)) {
- if (catalogName != tableEnv.getCurrentCatalog) {
- tableEnv.getCatalog(catalogName).get().getDefaultDatabase
+ if (catalogName != executor.getCurrentCatalog) {
+ catalogManager.getCatalog(catalogName).get().getDefaultDatabase
} else {
- tableEnv.getCurrentDatabase
+ catalogManager.getCurrentDatabase
}
} else schemaNameOrEmpty
- val flinkTable = tableEnv.from(s"`$catalogName`.`$schemaName`.`$tableName`")
+ val flinkTable = catalogManager
+ .getTable(ObjectIdentifier.of(catalogName, schemaName, tableName))
+ .orElseThrow(() =>
+ new FlinkException(s"Table `$catalogName`.`$schemaName`.`$tableName`` not found."))
val resolvedSchema = flinkTable.getResolvedSchema
val primaryKeySchema = resolvedSchema.getPrimaryKey
@@ -102,5 +106,4 @@ class GetPrimaryKeys(
)
// format: on
}
-
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala
index 6715b2320..f56ddd8b1 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetSchemas.scala
@@ -18,9 +18,10 @@
package org.apache.kyuubi.engine.flink.operation
import scala.collection.JavaConverters._
+import scala.collection.convert.ImplicitConversions._
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.table.api.{DataTypes, ResultKind, TableEnvironment}
+import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
@@ -35,14 +36,14 @@ class GetSchemas(session: Session, catalogName: String, schema: String)
override protected def runInternal(): Unit = {
try {
val schemaPattern = toJavaRegex(schema)
- val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
- val schemas = tableEnv.listCatalogs()
+ val catalogManager = sessionContext.getSessionState.catalogManager
+ val schemas = catalogManager.listCatalogs()
.filter { c => StringUtils.isEmpty(catalogName) || c == catalogName }
.flatMap { c =>
- val catalog = tableEnv.getCatalog(c).get()
+ val catalog = catalogManager.getCatalog(c).get()
filterPattern(catalog.listDatabases().asScala, schemaPattern)
.map { d => Row.of(d, c) }
- }
+ }.toArray
resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
.columns(
Column.physical(TABLE_SCHEM, DataTypes.STRING()),
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
index a4e55715a..325a50167 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
@@ -37,16 +37,16 @@ class GetTables(
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ val catalogManager = sessionContext.getSessionState.catalogManager
val catalogName =
- if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+ if (StringUtils.isEmpty(catalogNameOrEmpty)) catalogManager.getCurrentCatalog
else catalogNameOrEmpty
val schemaNameRegex = toJavaRegex(schemaNamePattern)
val tableNameRegex = toJavaRegex(tableNamePattern)
- val tables = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
+ val tables = catalogManager.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
.flatMap { schemaName =>
SchemaHelper.getFlinkTablesWithPattern(
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
deleted file mode 100644
index 7d624948c..000000000
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/OperationUtils.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kyuubi.engine.flink.operation
-
-import java.util
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.Column
-import org.apache.flink.table.client.gateway.Executor
-import org.apache.flink.table.operations.command._
-import org.apache.flink.types.Row
-
-import org.apache.kyuubi.engine.flink.result.{ResultSet, ResultSetUtil}
-import org.apache.kyuubi.engine.flink.result.ResultSetUtil.successResultSet
-import org.apache.kyuubi.reflection.DynMethods
-
-object OperationUtils {
-
- /**
- * Runs a SetOperation with executor. Returns when SetOperation is executed successfully.
- *
- * @param setOperation Set operation.
- * @param executor A gateway for communicating with Flink and other external systems.
- * @param sessionId Id of the session.
- * @return A ResultSet of SetOperation execution.
- */
- def runSetOperation(
- setOperation: SetOperation,
- executor: Executor,
- sessionId: String): ResultSet = {
- if (setOperation.getKey.isPresent) {
- val key: String = setOperation.getKey.get.trim
-
- if (setOperation.getValue.isPresent) {
- val newValue: String = setOperation.getValue.get.trim
- executor.setSessionProperty(sessionId, key, newValue)
- }
-
- val value = executor.getSessionConfigMap(sessionId).getOrDefault(key, "")
- ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(
- Column.physical("key", DataTypes.STRING()),
- Column.physical("value", DataTypes.STRING()))
- .data(Array(Row.of(key, value)))
- .build
- } else {
- // show all properties if set without key
- val properties: util.Map[String, String] = executor.getSessionConfigMap(sessionId)
-
- val entries = ArrayBuffer.empty[Row]
- properties.forEach((key, value) => entries.append(Row.of(key, value)))
-
- if (entries.nonEmpty) {
- val prettyEntries = entries.sortBy(_.getField(0).asInstanceOf[String])
- ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(
- Column.physical("key", DataTypes.STRING()),
- Column.physical("value", DataTypes.STRING()))
- .data(prettyEntries.toArray)
- .build
- } else {
- ResultSet.builder
- .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
- .columns(
- Column.physical("key", DataTypes.STRING()),
- Column.physical("value", DataTypes.STRING()))
- .data(Array[Row]())
- .build
- }
- }
- }
-
- /**
- * Runs a ResetOperation with executor. Returns when ResetOperation is executed successfully.
- *
- * @param resetOperation Reset operation.
- * @param executor A gateway for communicating with Flink and other external systems.
- * @param sessionId Id of the session.
- * @return A ResultSet of ResetOperation execution.
- */
- def runResetOperation(
- resetOperation: ResetOperation,
- executor: Executor,
- sessionId: String): ResultSet = {
- if (resetOperation.getKey.isPresent) {
- // reset the given property
- executor.resetSessionProperty(sessionId, resetOperation.getKey.get())
- } else {
- // reset all properties
- executor.resetSessionProperties(sessionId)
- }
- successResultSet
- }
-
- /**
- * Runs a AddJarOperation with the executor. Currently only jars on local filesystem
- * are supported.
- *
- * @param addJarOperation Add-jar operation.
- * @param executor A gateway for communicating with Flink and other external systems.
- * @param sessionId Id of the session.
- * @return A ResultSet of AddJarOperation execution.
- */
- def runAddJarOperation(
- addJarOperation: AddJarOperation,
- executor: Executor,
- sessionId: String): ResultSet = {
- // Removed by FLINK-27790
- val addJar = DynMethods.builder("addJar")
- .impl(executor.getClass, classOf[String], classOf[String])
- .build(executor)
- addJar.invoke[Void](sessionId, addJarOperation.getPath)
- successResultSet
- }
-
- /**
- * Runs a RemoveJarOperation with the executor. Only jars added by AddJarOperation could
- * be removed.
- *
- * @param removeJarOperation Remove-jar operation.
- * @param executor A gateway for communicating with Flink and other external systems.
- * @param sessionId Id of the session.
- * @return A ResultSet of RemoveJarOperation execution.
- */
- def runRemoveJarOperation(
- removeJarOperation: RemoveJarOperation,
- executor: Executor,
- sessionId: String): ResultSet = {
- executor.removeJar(sessionId, removeJarOperation.getPath)
- successResultSet
- }
-
- /**
- * Runs a ShowJarsOperation with the executor. Returns the jars of the current session.
- *
- * @param showJarsOperation Show-jar operation.
- * @param executor A gateway for communicating with Flink and other external systems.
- * @param sessionId Id of the session.
- * @return A ResultSet of ShowJarsOperation execution.
- */
- def runShowJarOperation(
- showJarsOperation: ShowJarsOperation,
- executor: Executor,
- sessionId: String): ResultSet = {
- // Removed by FLINK-27790
- val listJars = DynMethods.builder("listJars")
- .impl(executor.getClass, classOf[String])
- .build(executor)
- val jars = listJars.invoke[util.List[String]](sessionId)
- ResultSetUtil.stringListToResultSet(jars.asScala.toList, "jar")
- }
-}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
index afe04a307..4f5d8218f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyStatement.scala
@@ -17,10 +17,11 @@
package org.apache.kyuubi.engine.flink.operation
+import com.google.common.base.Preconditions
import org.apache.flink.table.api.TableEnvironment
+import org.apache.flink.table.gateway.api.operation.OperationHandle
import org.apache.flink.table.operations.command._
-import org.apache.kyuubi.engine.flink.FlinkEngineUtils.isFlinkVersionAtMost
import org.apache.kyuubi.engine.flink.result.ResultSetUtil
import org.apache.kyuubi.operation.{ExecutionMode, ParseMode, PhysicalMode, PlanOnlyMode, UnknownMode}
import org.apache.kyuubi.operation.PlanOnlyMode.{notSupportedModeError, unknownModeError}
@@ -46,18 +47,19 @@ class PlanOnlyStatement(
override protected def runInternal(): Unit = {
try {
- val operation = executor.parseStatement(sessionId, statement)
+ val operations = executor.getTableEnvironment.getParser.parse(statement)
+ Preconditions.checkArgument(
+ operations.size() == 1,
+ "Plan-only mode supports single statement only",
+ null)
+ val operation = operations.get(0)
operation match {
- case setOperation: SetOperation =>
- resultSet = OperationUtils.runSetOperation(setOperation, executor, sessionId)
- case resetOperation: ResetOperation =>
- resultSet = OperationUtils.runResetOperation(resetOperation, executor, sessionId)
- case addJarOperation: AddJarOperation if isFlinkVersionAtMost("1.15") =>
- resultSet = OperationUtils.runAddJarOperation(addJarOperation, executor, sessionId)
- case removeJarOperation: RemoveJarOperation =>
- resultSet = OperationUtils.runRemoveJarOperation(removeJarOperation, executor, sessionId)
- case showJarsOperation: ShowJarsOperation if isFlinkVersionAtMost("1.15") =>
- resultSet = OperationUtils.runShowJarOperation(showJarsOperation, executor, sessionId)
+ case _: SetOperation | _: ResetOperation | _: AddJarOperation | _: RemoveJarOperation |
+ _: ShowJarsOperation =>
+ val resultFetcher = executor.executeStatement(
+ new OperationHandle(getHandle.identifier),
+ statement)
+ resultSet = ResultSetUtil.fromResultFetcher(resultFetcher);
case _ => explainOperation(statement)
}
} catch {
@@ -66,7 +68,7 @@ class PlanOnlyStatement(
}
private def explainOperation(statement: String): Unit = {
- val tableEnv: TableEnvironment = sessionContext.getExecutionContext.getTableEnvironment
+ val tableEnv: TableEnvironment = executor.getTableEnvironment
val explainPlans =
tableEnv.explainSql(statement).split(s"$lineSeparator$lineSeparator")
val operationPlan = mode match {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
index 60214b2cd..f279ccda6 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentCatalog.scala
@@ -30,8 +30,8 @@ class SetCurrentCatalog(session: Session, catalog: String)
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- tableEnv.useCatalog(catalog)
+ val catalogManager = sessionContext.getSessionState.catalogManager
+ catalogManager.setCurrentCatalog(catalog)
setHasResultSet(false)
} catch onError()
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
index 7610ab2f1..70535e834 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/SetCurrentDatabase.scala
@@ -30,8 +30,8 @@ class SetCurrentDatabase(session: Session, database: String)
override protected def runInternal(): Unit = {
try {
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- tableEnv.useDatabase(database)
+ val catalogManager = sessionContext.getSessionState.catalogManager
+ catalogManager.setCurrentDatabase(database)
setHasResultSet(false)
} catch onError()
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
index 09c401988..b90be09ff 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSet.scala
@@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import com.google.common.collect.Iterators
import org.apache.flink.api.common.JobID
-import org.apache.flink.table.api.{DataTypes, ResultKind, TableResult}
+import org.apache.flink.table.api.{DataTypes, ResultKind}
import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
@@ -58,17 +58,6 @@ case class ResultSet(
*/
object ResultSet {
- def fromTableResult(tableResult: TableResult): ResultSet = {
- val schema = tableResult.getResolvedSchema
- // collect all rows from table result as list
- // this is ok as TableResult contains limited rows
- val rows = tableResult.collect.asScala.toArray
- builder.resultKind(tableResult.getResultKind)
- .columns(schema.getColumns)
- .data(rows)
- .build
- }
-
def fromJobId(jobID: JobID): ResultSet = {
val data: Array[Row] = if (jobID != null) {
Array(Row.of(jobID.toString))
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
index ded271cf1..d6bc2bada 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/result/ResultSetUtil.scala
@@ -17,14 +17,25 @@
package org.apache.kyuubi.engine.flink.result;
+import scala.collection.convert.ImplicitConversions._
+import scala.collection.mutable.ListBuffer
+
import org.apache.flink.table.api.DataTypes
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.data.conversion.DataStructureConverters
+import org.apache.flink.table.gateway.service.result.ResultFetcher
+import org.apache.flink.table.types.DataType
import org.apache.flink.types.Row
+import org.apache.kyuubi.engine.flink.shim.FlinkResultSet
+
/** Utility object for building ResultSet. */
object ResultSetUtil {
+ private val FETCH_ROWS_PER_SECOND = 1000
+
/**
* Build a ResultSet with a column name and a list of String values.
*
@@ -54,4 +65,64 @@ object ResultSetUtil {
.columns(Column.physical("result", DataTypes.STRING))
.data(Array[Row](Row.of("OK")))
.build
+
+ def fromResultFetcher(resultFetcher: ResultFetcher, maxRows: Int): ResultSet = {
+ val schema = resultFetcher.getResultSchema
+ val resultRowData = ListBuffer.newBuilder[RowData]
+ var fetched: FlinkResultSet = null
+ var token: Long = 0
+ var rowNum: Int = 0
+ do {
+ fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND))
+ val data = fetched.getData
+ val slice = data.slice(0, maxRows - rowNum)
+ resultRowData ++= slice
+ rowNum += slice.size
+ token = fetched.getNextToken
+ try Thread.sleep(1000L)
+ catch {
+ case _: InterruptedException => fetched.getNextToken == null
+ }
+ } while (
+ fetched.getNextToken != null &&
+ rowNum < maxRows &&
+ fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
+ )
+ val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
+ ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(schema.getColumns)
+ .data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray)
+ .build
+ }
+
+ def fromResultFetcher(resultFetcher: ResultFetcher): ResultSet = {
+ val schema = resultFetcher.getResultSchema
+ val resultRowData = ListBuffer.newBuilder[RowData]
+ var fetched: FlinkResultSet = null
+ var token: Long = 0
+ do {
+ fetched = new FlinkResultSet(resultFetcher.fetchResults(token, FETCH_ROWS_PER_SECOND))
+ resultRowData ++= fetched.getData
+ token = fetched.getNextToken
+ try Thread.sleep(1000L)
+ catch {
+ case _: InterruptedException =>
+ }
+ } while (
+ fetched.getNextToken != null &&
+ fetched.getResultType != org.apache.flink.table.gateway.api.results.ResultSet.ResultType.EOS
+ )
+ val dataTypes = resultFetcher.getResultSchema.getColumnDataTypes
+ ResultSet.builder
+ .resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(schema.getColumns)
+ .data(resultRowData.result().map(rd => convertToRow(rd, dataTypes.toList)).toArray)
+ .build
+ }
+
+ private[this] def convertToRow(r: RowData, dataTypes: List[DataType]): Row = {
+ val converter = DataStructureConverters.getConverter(DataTypes.ROW(dataTypes: _*))
+ converter.toExternal(r).asInstanceOf[Row]
+ }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
index 07971e39f..71caaa67a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSQLSessionManager.scala
@@ -17,12 +17,17 @@
package org.apache.kyuubi.engine.flink.session
-import org.apache.flink.table.client.gateway.context.DefaultContext
-import org.apache.flink.table.client.gateway.local.LocalExecutor
+import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.mapAsJavaMap
+
+import org.apache.flink.table.gateway.api.session.SessionEnvironment
+import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion
+import org.apache.flink.table.gateway.service.context.DefaultContext
import org.apache.hive.service.rpc.thrift.TProtocolVersion
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_SESSION_HANDLE_KEY
import org.apache.kyuubi.engine.flink.operation.FlinkSQLOperationManager
+import org.apache.kyuubi.engine.flink.shim.FlinkSessionManager
import org.apache.kyuubi.session.{Session, SessionHandle, SessionManager}
class FlinkSQLSessionManager(engineContext: DefaultContext)
@@ -31,11 +36,11 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
override protected def isServer: Boolean = false
val operationManager = new FlinkSQLOperationManager()
- val executor = new LocalExecutor(engineContext)
+ val sessionManager = new FlinkSessionManager(engineContext)
override def start(): Unit = {
super.start()
- executor.start()
+ sessionManager.start()
}
override protected def createSession(
@@ -46,19 +51,33 @@ class FlinkSQLSessionManager(engineContext: DefaultContext)
conf: Map[String, String]): Session = {
conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).flatMap(
getSessionOption).getOrElse {
- new FlinkSessionImpl(
+ val flinkInternalSession = sessionManager.openSession(
+ SessionEnvironment.newBuilder
+ .setSessionEndpointVersion(SqlGatewayRestAPIVersion.V1)
+ .addSessionConfig(mapAsJavaMap(conf))
+ .build)
+ val sessionConfig = flinkInternalSession.getSessionConfig
+ sessionConfig.putAll(conf.asJava)
+ val session = new FlinkSessionImpl(
protocol,
user,
password,
ipAddress,
conf,
this,
- executor)
+ flinkInternalSession)
+ session
}
}
override def closeSession(sessionHandle: SessionHandle): Unit = {
super.closeSession(sessionHandle)
- executor.closeSession(sessionHandle.toString)
+ sessionManager.closeSession(
+ new org.apache.flink.table.gateway.api.session.SessionHandle(sessionHandle.identifier))
+ }
+
+ override def stop(): Unit = synchronized {
+ sessionManager.stop()
+ super.stop()
}
}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
index a4b6a8a90..09f5ac943 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/session/FlinkSessionImpl.scala
@@ -19,10 +19,12 @@ package org.apache.kyuubi.engine.flink.session
import scala.util.control.NonFatal
+import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.table.client.gateway.SqlExecutionException
-import org.apache.flink.table.client.gateway.context.SessionContext
-import org.apache.flink.table.client.gateway.local.LocalExecutor
+import org.apache.flink.table.gateway.api.operation.OperationHandle
+import org.apache.flink.table.gateway.service.context.SessionContext
+import org.apache.flink.table.gateway.service.session.{Session => FSession}
import org.apache.hive.service.rpc.thrift.{TGetInfoType, TGetInfoValue, TProtocolVersion}
import org.apache.kyuubi.KyuubiSQLException
@@ -37,14 +39,15 @@ class FlinkSessionImpl(
ipAddress: String,
conf: Map[String, String],
sessionManager: SessionManager,
- val executor: LocalExecutor)
+ val fSession: FSession)
extends AbstractSession(protocol, user, password, ipAddress, conf, sessionManager) {
override val handle: SessionHandle =
- conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID).getOrElse(SessionHandle())
+ conf.get(KYUUBI_SESSION_HANDLE_KEY).map(SessionHandle.fromUUID)
+ .getOrElse(SessionHandle.fromUUID(fSession.getSessionHandle.getIdentifier.toString))
lazy val sessionContext: SessionContext = {
- FlinkEngineUtils.getSessionContext(executor, handle.identifier.toString)
+ FlinkEngineUtils.getSessionContext(fSession)
}
private def setModifiableConfig(key: String, value: String): Unit = {
@@ -56,16 +59,15 @@ class FlinkSessionImpl(
}
override def open(): Unit = {
- executor.openSession(handle.identifier.toString)
+ val executor = fSession.createExecutor(Configuration.fromMap(fSession.getSessionConfig))
val (useCatalogAndDatabaseConf, otherConf) = normalizedConf.partition { case (k, _) =>
Array("use:catalog", "use:database").contains(k)
}
useCatalogAndDatabaseConf.get("use:catalog").foreach { catalog =>
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
- tableEnv.useCatalog(catalog)
+ executor.executeStatement(OperationHandle.create, s"USE CATALOG $catalog")
} catch {
case NonFatal(e) =>
throw e
@@ -73,9 +75,8 @@ class FlinkSessionImpl(
}
useCatalogAndDatabaseConf.get("use:database").foreach { database =>
- val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
try {
- tableEnv.useDatabase(database)
+ executor.executeStatement(OperationHandle.create, s"USE $database")
} catch {
case NonFatal(e) =>
if (database != "default") {
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
new file mode 100644
index 000000000..e3dd0f081
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkResultSet.scala
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.shim
+
+import java.lang.{Long => JLong}
+import java.util
+
+import org.apache.flink.table.data.RowData
+import org.apache.flink.table.gateway.api.results.ResultSet.ResultType
+
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.reflection.DynMethods
+
+class FlinkResultSet(resultSet: AnyRef) {
+
+ def getData: util.List[RowData] = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("getData")
+ .impl("org.apache.flink.table.gateway.api.results.ResultSet")
+ .build()
+ .invoke(resultSet)
+ .asInstanceOf[util.List[RowData]]
+ } else {
+ DynMethods.builder("getData")
+ .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
+ .build()
+ .invoke(resultSet)
+ .asInstanceOf[util.List[RowData]]
+ }
+ }
+
+ def getNextToken: JLong = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("getNextToken")
+ .impl("org.apache.flink.table.gateway.api.results.ResultSet")
+ .build()
+ .invoke(resultSet)
+ .asInstanceOf[JLong]
+ } else {
+ DynMethods.builder("getNextToken")
+ .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
+ .build()
+ .invoke(resultSet)
+ .asInstanceOf[JLong]
+ }
+ }
+
+ def getResultType: ResultType = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("getResultType")
+ .impl("org.apache.flink.table.gateway.api.results.ResultSet")
+ .build()
+ .invoke(resultSet)
+ .asInstanceOf[ResultType]
+ } else {
+ DynMethods.builder("getResultType")
+ .impl("org.apache.flink.table.gateway.api.results.ResultSetImpl")
+ .build()
+ .invoke(resultSet)
+ .asInstanceOf[ResultType]
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
new file mode 100644
index 000000000..e34819dd6
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/shim/FlinkSessionManager.scala
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.shim
+
+import org.apache.flink.table.gateway.api.session.{SessionEnvironment, SessionHandle}
+import org.apache.flink.table.gateway.service.context.DefaultContext
+import org.apache.flink.table.gateway.service.session.Session
+
+import org.apache.kyuubi.engine.flink.FlinkEngineUtils
+import org.apache.kyuubi.reflection.{DynConstructors, DynMethods}
+
+class FlinkSessionManager(engineContext: DefaultContext) {
+
+ val sessionManager: AnyRef = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynConstructors.builder().impl(
+ "org.apache.flink.table.gateway.service.session.SessionManager",
+ classOf[DefaultContext])
+ .build()
+ .newInstance(engineContext)
+ } else {
+ DynConstructors.builder().impl(
+ "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+ classOf[DefaultContext])
+ .build()
+ .newInstance(engineContext)
+ }
+ }
+
+ def start(): Unit = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("start")
+ .impl("org.apache.flink.table.gateway.service.session.SessionManager")
+ .build()
+ .invoke(sessionManager)
+ } else {
+ DynMethods.builder("start")
+ .impl("org.apache.flink.table.gateway.service.session.SessionManagerImpl")
+ .build()
+ .invoke(sessionManager)
+ }
+ }
+
+ def stop(): Unit = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("stop")
+ .impl("org.apache.flink.table.gateway.service.session.SessionManager")
+ .build()
+ .invoke(sessionManager)
+ } else {
+ DynMethods.builder("stop")
+ .impl("org.apache.flink.table.gateway.service.session.SessionManagerImpl")
+ .build()
+ .invoke(sessionManager)
+ }
+ }
+
+ def getSession(sessionHandle: SessionHandle): Session = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("getSession")
+ .impl(
+ "org.apache.flink.table.gateway.service.session.SessionManager",
+ classOf[SessionHandle])
+ .build()
+ .invoke(sessionManager, sessionHandle)
+ .asInstanceOf[Session]
+ } else {
+ DynMethods.builder("getSession")
+ .impl(
+ "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+ classOf[SessionHandle])
+ .build()
+ .invoke(sessionManager, sessionHandle)
+ .asInstanceOf[Session]
+ }
+ }
+
+ def openSession(environment: SessionEnvironment): Session = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("openSession")
+ .impl(
+ "org.apache.flink.table.gateway.service.session.SessionManager",
+ classOf[SessionEnvironment])
+ .build()
+ .invoke(sessionManager, environment)
+ .asInstanceOf[Session]
+ } else {
+ DynMethods.builder("openSession")
+ .impl(
+ "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+ classOf[SessionEnvironment])
+ .build()
+ .invoke(sessionManager, environment)
+ .asInstanceOf[Session]
+ }
+ }
+
+ def closeSession(sessionHandle: SessionHandle): Unit = {
+ if (FlinkEngineUtils.isFlinkVersionEqualTo("1.16")) {
+ DynMethods.builder("closeSession")
+ .impl(
+ "org.apache.flink.table.gateway.service.session.SessionManager",
+ classOf[SessionHandle])
+ .build()
+ .invoke(sessionManager, sessionHandle)
+ } else {
+ DynMethods.builder("closeSession")
+ .impl(
+ "org.apache.flink.table.gateway.service.session.SessionManagerImpl",
+ classOf[SessionHandle])
+ .build()
+ .invoke(sessionManager, sessionHandle)
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
index aebcce6c5..c352429ea 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithDiscoveryFlinkSQLEngine.scala
@@ -17,30 +17,14 @@
package org.apache.kyuubi.engine.flink
-import java.util.UUID
-
-import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
-import org.apache.kyuubi.engine.ShareLevel
-import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
+import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.ha.client.{DiscoveryClient, DiscoveryClientProvider}
-trait WithDiscoveryFlinkSQLEngine extends WithFlinkSQLEngineOnYarn {
-
- override protected def engineRefId: String = UUID.randomUUID().toString
-
- def namespace: String = "/kyuubi/flink-yarn-application-test"
+trait WithDiscoveryFlinkSQLEngine {
- def shareLevel: String = ShareLevel.USER.toString
+ protected def namespace: String
- def engineType: String = "flink"
-
- override def withKyuubiConf: Map[String, String] = {
- Map(
- HA_NAMESPACE.key -> namespace,
- HA_ENGINE_REF_ID.key -> engineRefId,
- ENGINE_TYPE.key -> "FLINK_SQL",
- ENGINE_SHARE_LEVEL.key -> shareLevel)
- }
+ protected def conf: KyuubiConf
def withDiscoveryClient(f: DiscoveryClient => Unit): Unit = {
DiscoveryClientProvider.withDiscoveryClient(conf)(f)
@@ -49,7 +33,7 @@ trait WithDiscoveryFlinkSQLEngine extends WithFlinkSQLEngineOnYarn {
def getFlinkEngineServiceUrl: String = {
var hostPort: Option[(String, Int)] = None
var retries = 0
- while (hostPort.isEmpty && retries < 5) {
+ while (hostPort.isEmpty && retries < 10) {
withDiscoveryClient(client => hostPort = client.getServerHost(namespace))
retries += 1
Thread.sleep(1000L)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index c8435f9c5..0001f31ae 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -17,58 +17,167 @@
package org.apache.kyuubi.engine.flink
+import java.io.{File, FilenameFilter}
+import java.lang.ProcessBuilder.Redirect
+import java.net.URI
+import java.nio.file.{Files, Paths}
+
import scala.collection.JavaConverters._
+import scala.collection.mutable.ArrayBuffer
-import org.apache.flink.client.cli.{CustomCommandLine, DefaultCLI}
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}
-import org.apache.flink.table.client.gateway.context.DefaultContext
-import org.apache.kyuubi.KyuubiFunSuite
+import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiException, KyuubiFunSuite, SCALA_COMPILE_VERSION, Utils}
import org.apache.kyuubi.config.KyuubiConf
+import org.apache.kyuubi.config.KyuubiConf._
+import org.apache.kyuubi.ha.HighAvailabilityConf.HA_ADDRESSES
+import org.apache.kyuubi.zookeeper.EmbeddedZookeeper
+import org.apache.kyuubi.zookeeper.ZookeeperConf.{ZK_CLIENT_PORT, ZK_CLIENT_PORT_ADDRESS}
trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources {
protected val flinkConfig = new Configuration()
+
protected var miniCluster: MiniCluster = _
- protected var engine: FlinkSQLEngine = _
- // conf will be loaded until start flink engine
+
+ protected var engineProcess: Process = _
+
+ private var zkServer: EmbeddedZookeeper = _
+
+ protected val conf: KyuubiConf = FlinkSQLEngine.kyuubiConf
+
+ protected def engineRefId: String
+
def withKyuubiConf: Map[String, String]
- protected val kyuubiConf: KyuubiConf = FlinkSQLEngine.kyuubiConf
protected var connectionUrl: String = _
override def beforeAll(): Unit = {
+ withKyuubiConf.foreach { case (k, v) =>
+ if (k.startsWith("flink.")) {
+ flinkConfig.setString(k.stripPrefix("flink."), v)
+ }
+ }
+ withKyuubiConf.foreach { case (k, v) =>
+ System.setProperty(k, v)
+ conf.set(k, v)
+ }
+
+ zkServer = new EmbeddedZookeeper()
+ conf.set(ZK_CLIENT_PORT, 0).set(ZK_CLIENT_PORT_ADDRESS, "localhost")
+ zkServer.initialize(conf)
+ zkServer.start()
+ conf.set(HA_ADDRESSES, zkServer.getConnectString)
+
+ val envs = scala.collection.mutable.Map[String, String]()
+ val kyuubiExternals = Utils.getCodeSourceLocation(getClass)
+ .split("externals").head
+ val flinkHome = {
+ val candidates = Paths.get(kyuubiExternals, "externals", "kyuubi-download", "target")
+ .toFile.listFiles(f => f.getName.contains("flink"))
+ if (candidates == null) None else candidates.map(_.toPath).headOption
+ }
+ if (flinkHome.isDefined) {
+ envs("FLINK_HOME") = flinkHome.get.toString
+ envs("FLINK_CONF_DIR") = Paths.get(flinkHome.get.toString, "conf").toString
+ }
+ envs("JAVA_HOME") = System.getProperty("java.home")
+ envs("JAVA_EXEC") = Paths.get(envs("JAVA_HOME"), "bin", "java").toString
+
startMiniCluster()
- startFlinkEngine()
+ startFlinkEngine(envs.toMap)
super.beforeAll()
}
override def afterAll(): Unit = {
super.afterAll()
- stopFlinkEngine()
- miniCluster.close()
+ if (engineProcess != null) {
+ engineProcess.destroy()
+ engineProcess = null
+ }
+ if (miniCluster != null) {
+ miniCluster.close()
+ miniCluster = null
+ }
+ if (zkServer != null) {
+ zkServer.stop()
+ zkServer = null
+ }
}
- def startFlinkEngine(): Unit = {
- withKyuubiConf.foreach { case (k, v) =>
- System.setProperty(k, v)
- kyuubiConf.set(k, v)
+ def startFlinkEngine(envs: Map[String, String]): Unit = {
+ val flinkHome = envs("FLINK_HOME")
+ val processBuilder: ProcessBuilder = new ProcessBuilder
+ processBuilder.environment().putAll(envs.asJava)
+
+ conf.set(ENGINE_FLINK_EXTRA_CLASSPATH, udfJar.getAbsolutePath)
+ val command = new ArrayBuffer[String]()
+
+ command += envs("JAVA_EXEC")
+
+ val memory = conf.get(ENGINE_FLINK_MEMORY)
+ command += s"-Xmx$memory"
+ val javaOptions = conf.get(ENGINE_FLINK_JAVA_OPTIONS)
+ if (javaOptions.isDefined) {
+ command += javaOptions.get
}
- val engineContext = new DefaultContext(
- List(udfJar.toURI.toURL).asJava,
- flinkConfig,
- List[CustomCommandLine](new DefaultCLI).asJava)
- FlinkSQLEngine.startEngine(engineContext)
- engine = FlinkSQLEngine.currentEngine.get
- connectionUrl = engine.frontendServices.head.connectionUrl
- }
- def stopFlinkEngine(): Unit = {
- if (engine != null) {
- engine.stop()
- engine = null
+ command += "-cp"
+ val classpathEntries = new java.util.LinkedHashSet[String]
+ // flink engine runtime jar
+ mainResource(envs).foreach(classpathEntries.add)
+ // flink sql jars
+ Paths.get(flinkHome)
+ .resolve("opt")
+ .toFile
+ .listFiles(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.toLowerCase.startsWith("flink-sql-client") ||
+ name.toLowerCase.startsWith("flink-sql-gateway")
+ }
+ }).foreach(jar => classpathEntries.add(jar.getAbsolutePath))
+
+ // jars from flink lib
+ classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
+
+ // classpath contains flink configurations, default to flink.home/conf
+ classpathEntries.add(envs.getOrElse("FLINK_CONF_DIR", ""))
+ // classpath contains hadoop configurations
+ val cp = System.getProperty("java.class.path")
+ // exclude kyuubi flink engine jar that has SPI for EmbeddedExecutorFactory
+ // which can't be initialized on the client side
+ val hadoopJars = cp.split(":").filter(s => !s.contains("flink"))
+ hadoopJars.foreach(classpathEntries.add)
+ val extraCp = conf.get(ENGINE_FLINK_EXTRA_CLASSPATH)
+ extraCp.foreach(classpathEntries.add)
+ if (hadoopJars.isEmpty && extraCp.isEmpty) {
+ mainResource(envs).foreach { path =>
+ val devHadoopJars = Paths.get(path).getParent
+ .resolve(s"scala-$SCALA_COMPILE_VERSION")
+ .resolve("jars")
+ if (!Files.exists(devHadoopJars)) {
+ throw new KyuubiException(s"The path $devHadoopJars does not exists. " +
+ s"Please set FLINK_HADOOP_CLASSPATH or ${ENGINE_FLINK_EXTRA_CLASSPATH.key}" +
+ s" for configuring location of hadoop client jars, etc.")
+ }
+ classpathEntries.add(s"$devHadoopJars${File.separator}*")
+ }
+ }
+ command += classpathEntries.asScala.mkString(File.pathSeparator)
+ command += "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
+
+ conf.getAll.foreach { case (k, v) =>
+ command += "--conf"
+ command += s"$k=$v"
}
+
+ processBuilder.command(command.toList.asJava)
+ processBuilder.redirectOutput(Redirect.INHERIT)
+ processBuilder.redirectError(Redirect.INHERIT)
+
+ info(s"staring flink local engine...")
+ engineProcess = processBuilder.start()
}
private def startMiniCluster(): Unit = {
@@ -84,4 +193,35 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
protected def getJdbcUrl: String = s"jdbc:hive2://$connectionUrl/;"
+ def mainResource(env: Map[String, String]): Option[String] = {
+ val module = "kyuubi-flink-sql-engine"
+ val shortName = "flink"
+ // 1. get the main resource jar for user specified config first
+ val jarName = s"${module}_$SCALA_COMPILE_VERSION-$KYUUBI_VERSION.jar"
+ conf.getOption(s"kyuubi.session.engine.$shortName.main.resource").filter {
+ userSpecified =>
+ // skip check exist if not local file.
+ val uri = new URI(userSpecified)
+ val schema = if (uri.getScheme != null) uri.getScheme else "file"
+ schema match {
+ case "file" => Files.exists(Paths.get(userSpecified))
+ case _ => true
+ }
+ }.orElse {
+ // 2. get the main resource jar from system build default
+ env.get(KYUUBI_HOME).toSeq
+ .flatMap { p =>
+ Seq(
+ Paths.get(p, "externals", "engines", shortName, jarName),
+ Paths.get(p, "externals", module, "target", jarName))
+ }
+ .find(Files.exists(_)).map(_.toAbsolutePath.toFile.getCanonicalPath)
+ }.orElse {
+ // 3. get the main resource from dev environment
+ val cwd = Utils.getCodeSourceLocation(getClass).split("externals")
+ assert(cwd.length > 1)
+ Option(Paths.get(cwd.head, "externals", module, "target", jarName))
+ .map(_.toAbsolutePath.toFile.getCanonicalPath)
+ }
+ }
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
index 3847087b3..553574e65 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineOnYarn.scala
@@ -49,7 +49,7 @@ trait WithFlinkSQLEngineOnYarn extends KyuubiFunSuite with WithFlinkTestResource
private var zkServer: EmbeddedZookeeper = _
- def withKyuubiConf: Map[String, String]
+ def withKyuubiConf: Map[String, String] = testExtraConf
private val yarnConf: YarnConfiguration = {
val yarnConfig = new YarnConfiguration()
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
index 6a85654f0..3ea02774e 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
@@ -17,6 +17,8 @@
package org.apache.kyuubi.engine.flink
+import java.io.File
+
import org.apache.kyuubi.Utils
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
@@ -33,9 +35,12 @@ trait WithFlinkTestResources {
}
"""
- protected val udfJar = TestUserClassLoaderJar.createJarFile(
+ protected val udfJar: File = TestUserClassLoaderJar.createJarFile(
Utils.createTempDir("test-jar").toFile,
"test-classloader-udf.jar",
GENERATED_UDF_CLASS,
GENERATED_UDF_CODE)
+
+ protected val testExtraConf: Map[String, String] = Map(
+ "flink.pipeline.name" -> "test-job")
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
index e4e6a5c67..0f4b38d36 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationLocalSuite.scala
@@ -17,17 +17,35 @@
package org.apache.kyuubi.engine.flink.operation
+import java.util.UUID
+
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.operation.NoneMode
class FlinkOperationLocalSuite extends FlinkOperationSuite
- with WithFlinkSQLEngineLocal {
+ with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineLocal {
+
+ protected def jdbcUrl: String = getFlinkEngineServiceUrl
+
+ override def withKyuubiConf: Map[String, String] = {
+ Map(
+ "flink.execution.target" -> "remote",
+ HA_NAMESPACE.key -> namespace,
+ HA_ENGINE_REF_ID.key -> engineRefId,
+ ENGINE_TYPE.key -> "FLINK_SQL",
+ ENGINE_SHARE_LEVEL.key -> shareLevel,
+ OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name) ++ testExtraConf
+ }
+
+ override protected def engineRefId: String = UUID.randomUUID().toString
+
+ def namespace: String = "/kyuubi/flink-local-engine-test"
- override def withKyuubiConf: Map[String, String] =
- Map(OPERATION_PLAN_ONLY_MODE.key -> NoneMode.name)
+ def shareLevel: String = ShareLevel.USER.toString
- override protected def jdbcUrl: String =
- s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+ def engineType: String = "flink"
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
index b43e83db6..931d500f7 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationOnYarnSuite.scala
@@ -17,10 +17,31 @@
package org.apache.kyuubi.engine.flink.operation
-import org.apache.kyuubi.engine.flink.WithDiscoveryFlinkSQLEngine
+import java.util.UUID
+
+import org.apache.kyuubi.config.KyuubiConf.{ENGINE_SHARE_LEVEL, ENGINE_TYPE}
+import org.apache.kyuubi.engine.ShareLevel
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineOnYarn}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
class FlinkOperationOnYarnSuite extends FlinkOperationSuite
- with WithDiscoveryFlinkSQLEngine {
+ with WithDiscoveryFlinkSQLEngine with WithFlinkSQLEngineOnYarn {
protected def jdbcUrl: String = getFlinkEngineServiceUrl
+
+ override def withKyuubiConf: Map[String, String] = {
+ Map(
+ HA_NAMESPACE.key -> namespace,
+ HA_ENGINE_REF_ID.key -> engineRefId,
+ ENGINE_TYPE.key -> "FLINK_SQL",
+ ENGINE_SHARE_LEVEL.key -> shareLevel) ++ testExtraConf
+ }
+
+ override protected def engineRefId: String = UUID.randomUUID().toString
+
+ def namespace: String = "/kyuubi/flink-yarn-application-test"
+
+ def shareLevel: String = ShareLevel.USER.toString
+
+ def engineType: String = "flink"
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 77ce3b3ee..908e407a9 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -23,12 +23,13 @@ import java.util.UUID
import scala.collection.JavaConverters._
import org.apache.flink.api.common.JobID
+import org.apache.flink.configuration.PipelineOptions
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift._
import org.apache.kyuubi.Utils
import org.apache.kyuubi.config.KyuubiConf._
-import org.apache.kyuubi.engine.flink.WithFlinkTestResources
+import org.apache.kyuubi.engine.flink.{FlinkEngineUtils, WithFlinkTestResources}
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
@@ -758,7 +759,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.ARRAY)
assert(resultSet.next())
- val expected = "[v1,v2,v3]"
+ val expected = "[\"v1\",\"v2\",\"v3\"]"
assert(resultSet.getObject(1).toString == expected)
}
}
@@ -778,7 +779,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select (1, '2', true)")
assert(resultSet.next())
- val expected = """{INT NOT NULL:1,CHAR(1) NOT NULL:2,BOOLEAN NOT NULL:true}"""
+ val expected = """{INT NOT NULL:1,CHAR(1) NOT NULL:"2",BOOLEAN NOT NULL:true}"""
assert(resultSet.getString(1) == expected)
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.STRUCT)
@@ -955,7 +956,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
statement.executeQuery("create table tbl_a (a int) with ('connector' = 'blackhole')")
val resultSet = statement.executeQuery("insert into tbl_a select 1")
val metadata = resultSet.getMetaData
- assert(metadata.getColumnName(1) === "result")
+ assert(metadata.getColumnName(1) === "job id")
assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
assert(resultSet.next())
assert(resultSet.getString(1).length == 32)
@@ -973,7 +974,7 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
statement.executeQuery("create table tbl_b (a int) with ('connector' = 'blackhole')")
val resultSet = statement.executeQuery("insert into tbl_b select * from tbl_a")
val metadata = resultSet.getMetaData
- assert(metadata.getColumnName(1) === "result")
+ assert(metadata.getColumnName(1) === "job id")
assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
assert(resultSet.next())
assert(resultSet.getString(1).length == 32)
@@ -984,11 +985,9 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
withMultipleConnectionJdbcStatement() { statement =>
val resultSet = statement.executeQuery("set table.dynamic-table-options.enabled = true")
val metadata = resultSet.getMetaData
- assert(metadata.getColumnName(1) == "key")
- assert(metadata.getColumnName(2) == "value")
+ assert(metadata.getColumnName(1) == "result")
assert(resultSet.next())
- assert(resultSet.getString(1) == "table.dynamic-table-options.enabled")
- assert(resultSet.getString(2) == "true")
+ assert(resultSet.getString(1) == "OK")
}
}
@@ -1003,16 +1002,17 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
}
test("execute statement - reset property") {
+ val originalName = "test-job" // defined in WithFlinkTestResource
withMultipleConnectionJdbcStatement() { statement =>
- statement.executeQuery("set pipeline.jars = my.jar")
- statement.executeQuery("reset pipeline.jars")
+ statement.executeQuery(s"set ${PipelineOptions.NAME.key()} = wrong-name")
+ statement.executeQuery(s"reset ${PipelineOptions.NAME.key()}")
val resultSet = statement.executeQuery("set")
// Flink does not support set key without value currently,
// thus read all rows to find the desired one
var success = false
while (resultSet.next()) {
- if (resultSet.getString(1) == "pipeline.jars" &&
- !resultSet.getString(2).contains("my.jar")) {
+ if (resultSet.getString(1) == PipelineOptions.NAME.key() &&
+ resultSet.getString(2).equals(originalName)) {
success = true
}
}
@@ -1066,7 +1066,31 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
}
}
- test("execute statement - add/remove/show jar") {
+ test("execute statement - add/show jar") {
+ val jarName = s"newly-added-${UUID.randomUUID()}.jar"
+ val newJar = TestUserClassLoaderJar.createJarFile(
+ Utils.createTempDir("add-jar-test").toFile,
+ jarName,
+ GENERATED_UDF_CLASS,
+ GENERATED_UDF_CODE).toPath
+
+ withMultipleConnectionJdbcStatement()({ statement =>
+ statement.execute(s"add jar '$newJar'")
+
+ val showJarsResultAdded = statement.executeQuery("show jars")
+ var exists = false
+ while (showJarsResultAdded.next()) {
+ if (showJarsResultAdded.getString(1).contains(jarName)) {
+ exists = true
+ }
+ }
+ assert(exists)
+ })
+ }
+
+ // ignored because Flink gateway doesn't support remove-jar statements
+ // see org.apache.flink.table.gateway.service.operation.OperationExecutor#callRemoveJar(..)
+ ignore("execute statement - remove jar") {
val jarName = s"newly-added-${UUID.randomUUID()}.jar"
val newJar = TestUserClassLoaderJar.createJarFile(
Utils.createTempDir("add-jar-test").toFile,
@@ -1136,9 +1160,12 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
assert(stmt.asInstanceOf[KyuubiStatement].getQueryId === null)
stmt.executeQuery("insert into tbl_a values (1)")
val queryId = stmt.asInstanceOf[KyuubiStatement].getQueryId
- assert(queryId !== null)
- // parse the string to check if it's valid Flink job id
- assert(JobID.fromHexString(queryId) !== null)
+ // Flink 1.16 doesn't support query id via ResultFetcher
+ if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
+ assert(queryId !== null)
+ // parse the string to check if it's valid Flink job id
+ assert(JobID.fromHexString(queryId) !== null)
+ }
}
}
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
index 1657f21f6..17c49464f 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/PlanOnlyOperationSuite.scala
@@ -18,21 +18,33 @@
package org.apache.kyuubi.engine.flink.operation
import java.sql.Statement
+import java.util.UUID
import org.apache.kyuubi.config.KyuubiConf
-import org.apache.kyuubi.engine.flink.WithFlinkSQLEngineLocal
+import org.apache.kyuubi.engine.flink.{WithDiscoveryFlinkSQLEngine, WithFlinkSQLEngineLocal}
+import org.apache.kyuubi.ha.HighAvailabilityConf.{HA_ENGINE_REF_ID, HA_NAMESPACE}
import org.apache.kyuubi.operation.{AnalyzeMode, ExecutionMode, HiveJDBCTestHelper, ParseMode, PhysicalMode}
-class PlanOnlyOperationSuite extends WithFlinkSQLEngineLocal with HiveJDBCTestHelper {
+class PlanOnlyOperationSuite extends WithFlinkSQLEngineLocal
+ with HiveJDBCTestHelper with WithDiscoveryFlinkSQLEngine {
+
+ override protected def engineRefId: String = UUID.randomUUID().toString
+
+ override protected def namespace: String = "/kyuubi/flink-plan-only-test"
+
+ def engineType: String = "flink"
override def withKyuubiConf: Map[String, String] =
Map(
+ "flink.execution.target" -> "remote",
+ HA_NAMESPACE.key -> namespace,
+ HA_ENGINE_REF_ID.key -> engineRefId,
+ KyuubiConf.ENGINE_TYPE.key -> "FLINK_SQL",
KyuubiConf.ENGINE_SHARE_LEVEL.key -> "user",
KyuubiConf.OPERATION_PLAN_ONLY_MODE.key -> ParseMode.name,
- KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key -> "plan-only")
+ KyuubiConf.ENGINE_SHARE_LEVEL_SUBDOMAIN.key -> "plan-only") ++ testExtraConf
- override protected def jdbcUrl: String =
- s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+ override protected def jdbcUrl: String = getFlinkEngineServiceUrl
test("Plan only operation with system defaults") {
withJdbcStatement() { statement =>
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
index 02aed2866..8c92b77ef 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/KyuubiApplicationManager.scala
@@ -176,7 +176,7 @@ object KyuubiApplicationManager {
// if the master is not identified ahead, add all tags
setupSparkYarnTag(applicationTag, conf)
setupSparkK8sTag(applicationTag, conf)
- case ("FLINK", _) =>
+ case ("FLINK", Some("YARN")) =>
// running flink on other platforms is not yet supported
setupFlinkYarnTag(applicationTag, conf)
// other engine types are running locally yet
diff --git a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
index 8642d87d7..d8d46e427 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilder.scala
@@ -54,6 +54,9 @@ class FlinkProcessBuilder(
Paths.get(flinkHome, "bin", FLINK_EXEC_FILE).toFile.getCanonicalPath
}
+ // flink.execution.target are required in Kyuubi conf currently
+ val executionTarget: Option[String] = conf.getOption("flink.execution.target")
+
override protected def module: String = "kyuubi-flink-sql-engine"
override protected def mainClass: String = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
@@ -63,13 +66,17 @@ class FlinkProcessBuilder(
"FLINK_CONF_DIR",
s"$flinkHome${File.separator}conf"))
- override def clusterManager(): Option[String] = Some("yarn")
+ override def clusterManager(): Option[String] = {
+ executionTarget match {
+ case Some("yarn-application") => Some("yarn")
+ case _ => None
+ }
+ }
override protected val commands: Array[String] = {
KyuubiApplicationManager.tagApplication(engineRefId, shortName, clusterManager(), conf)
// flink.execution.target are required in Kyuubi conf currently
- val executionTarget = conf.getOption("flink.execution.target")
executionTarget match {
case Some("yarn-application") =>
val buffer = new ArrayBuffer[String]()
@@ -133,16 +140,16 @@ class FlinkProcessBuilder(
val classpathEntries = new java.util.LinkedHashSet[String]
// flink engine runtime jar
mainResource.foreach(classpathEntries.add)
- // flink sql client jar
- val flinkSqlClientPath = Paths.get(flinkHome)
+ // flink sql jars
+ Paths.get(flinkHome)
.resolve("opt")
.toFile
.listFiles(new FilenameFilter {
override def accept(dir: File, name: String): Boolean = {
- name.toLowerCase.startsWith("flink-sql-client")
+ name.toLowerCase.startsWith("flink-sql-client") ||
+ name.toLowerCase.startsWith("flink-sql-gateway")
}
- }).head.getAbsolutePath
- classpathEntries.add(flinkSqlClientPath)
+ }).sorted.foreach(jar => classpathEntries.add(jar.getAbsolutePath))
// jars from flink lib
classpathEntries.add(s"$flinkHome${File.separator}lib${File.separator}*")
diff --git a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
index 53450b589..45272618d 100644
--- a/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
+++ b/kyuubi-server/src/test/scala/org/apache/kyuubi/engine/flink/FlinkProcessBuilderSuite.scala
@@ -63,7 +63,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private def envWithAllHadoop: ListMap[String, String] = envWithoutHadoopCLASSPATH +
(FLINK_HADOOP_CLASSPATH_KEY -> s"${File.separator}hadoop")
private def confStr: String = {
- sessionModeConf.clone.set("yarn.tags", "KYUUBI").getAll
+ sessionModeConf.clone.getAll
.map { case (k, v) => s"\\\\\\n\\t--conf $k=$v" }
.mkString(" ")
}
@@ -106,6 +106,7 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
val flinkHome = builder.flinkHome
classpathEntries.add(s"$flinkHome$flinkSqlClientJarPathSuffixRegex")
+ classpathEntries.add(s"$flinkHome$flinkSqlGatewayJarPathSuffixRegex")
classpathEntries.add(s"$flinkHome$flinkLibPathSuffixRegex")
classpathEntries.add(s"$flinkHome$flinkConfPathSuffix")
val envMap = builder.env
@@ -123,6 +124,8 @@ class FlinkProcessBuilderSuite extends KyuubiFunSuite {
private val javaPath = s"${envDefault("JAVA_HOME")}${File.separator}bin${File.separator}java"
private val flinkSqlClientJarPathSuffixRegex = s"${File.separator}opt${File.separator}" +
s"flink-sql-client-.*.jar"
+ private val flinkSqlGatewayJarPathSuffixRegex = s"${File.separator}opt${File.separator}" +
+ s"flink-sql-gateway-.*.jar"
private val flinkLibPathSuffixRegex = s"${File.separator}lib${File.separator}\\*"
private val flinkConfPathSuffix = s"${File.separator}conf"
private val mainClassStr = "org.apache.kyuubi.engine.flink.FlinkSQLEngine"
diff --git a/pom.xml b/pom.xml
index 520b181d5..1feae0322 100644
--- a/pom.xml
+++ b/pom.xml
@@ -135,7 +135,7 @@
<failsafe.verion>2.4.4</failsafe.verion>
<fb303.version>0.9.3</fb303.version>
<flexmark.version>0.62.2</flexmark.version>
- <flink.version>1.16.1</flink.version>
+ <flink.version>1.17.0</flink.version>
<flink.archive.name>flink-${flink.version}-bin-scala_${scala.binary.version}.tgz</flink.archive.name>
<flink.archive.mirror>${apache.archive.dist}/flink/flink-${flink.version}</flink.archive.mirror>
<flink.archive.download.skip>false</flink.archive.download.skip>
@@ -1624,6 +1624,12 @@
<version>${flink.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-sql-gateway</artifactId>
+ <version>${flink.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils</artifactId>
@@ -2390,16 +2396,16 @@
</profile>
<profile>
- <id>flink-1.15</id>
+ <id>flink-1.16</id>
<properties>
- <flink.version>1.15.4</flink.version>
+ <flink.version>1.16.1</flink.version>
</properties>
</profile>
<profile>
- <id>flink-1.16</id>
+ <id>flink-1.17</id>
<properties>
- <flink.version>1.16.1</flink.version>
+ <flink.version>1.17.0</flink.version>
</properties>
</profile>