You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2023/05/03 13:51:32 UTC
[kyuubi] branch master updated: [KYUUBI #4495] Support Flink job management statements
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new dcb444e2d [KYUUBI #4495] Support Flink job management statements
dcb444e2d is described below
commit dcb444e2d8e44b30eb6cad74ae8b75b2e54b52da
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Wed May 3 21:51:21 2023 +0800
[KYUUBI #4495] Support Flink job management statements
### _Why are the changes needed?_
Support Flink job management statements.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.readthedocs.io/en/master/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #4774 from link3280/KYUUBI-4495.
Closes #4495
a4aaebcbb [Paul Lin] [KYUUBI #4495] Adjust the order of tests
225a6cdbd [Paul Lin] [KYUUBI #4495] Increase the number of taskmanagers in the mini cluster
67935ac24 [Paul Lin] [KYUUBI #4495] Wait jobs to get ready for show job statements
9c4ce1d6e [Paul Lin] [KYUUBI #4495] Fix show jobs assertion error
ab3113cab [Paul Lin] [KYUUBI #4495] Support Flink job management statements
Authored-by: Paul Lin <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../engine/flink/WithFlinkSQLEngineLocal.scala | 1 +
.../engine/flink/WithFlinkTestResources.scala | 5 +-
.../flink/operation/FlinkOperationSuite.scala | 70 +++++++++++++++++++++-
3 files changed, 73 insertions(+), 3 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
index 0001f31ae..92c1bcd83 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkSQLEngineLocal.scala
@@ -184,6 +184,7 @@ trait WithFlinkSQLEngineLocal extends KyuubiFunSuite with WithFlinkTestResources
val cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(flinkConfig)
.setNumSlotsPerTaskManager(1)
+ .setNumTaskManagers(2)
.build
miniCluster = new MiniCluster(cfg)
miniCluster.start()
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
index 3ea02774e..3b1d65cb2 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/WithFlinkTestResources.scala
@@ -41,6 +41,9 @@ trait WithFlinkTestResources {
GENERATED_UDF_CLASS,
GENERATED_UDF_CODE)
+ protected val savepointDir: File = Utils.createTempDir("savepoints").toFile
+
protected val testExtraConf: Map[String, String] = Map(
- "flink.pipeline.name" -> "test-job")
+ "flink.pipeline.name" -> "test-job",
+ "flink.state.savepoints.dir" -> savepointDir.toURI.toString)
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 00e26c528..39d17aa7b 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -17,6 +17,7 @@
package org.apache.kyuubi.engine.flink.operation
+import java.nio.file.Paths
import java.sql.DatabaseMetaData
import java.util.UUID
@@ -33,6 +34,7 @@ import org.apache.kyuubi.engine.flink.{FlinkEngineUtils, WithFlinkTestResources}
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
import org.apache.kyuubi.jdbc.hive.KyuubiStatement
+import org.apache.kyuubi.jdbc.hive.common.TimestampTZ
import org.apache.kyuubi.operation.HiveJDBCTestHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
@@ -632,6 +634,62 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
}
}
+ test("execute statement - show/stop jobs") {
+ if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
+ withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "10"))(Map.empty) {
+ withMultipleConnectionJdbcStatement()({ statement =>
+ statement.executeQuery(
+ "create table tbl_a (a int) with (" +
+ "'connector' = 'datagen', " +
+ "'rows-per-second'='10')")
+ statement.executeQuery("create table tbl_b (a int) with ('connector' = 'blackhole')")
+ val insertResult1 = statement.executeQuery("insert into tbl_b select * from tbl_a")
+ assert(insertResult1.next())
+ val jobId1 = insertResult1.getString(1)
+
+ Thread.sleep(5000)
+
+ val showResult = statement.executeQuery("show jobs")
+ val metadata = showResult.getMetaData
+ assert(metadata.getColumnName(1) === "job id")
+ assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
+ assert(metadata.getColumnName(2) === "job name")
+ assert(metadata.getColumnType(2) === java.sql.Types.VARCHAR)
+ assert(metadata.getColumnName(3) === "status")
+ assert(metadata.getColumnType(3) === java.sql.Types.VARCHAR)
+ assert(metadata.getColumnName(4) === "start time")
+ assert(metadata.getColumnType(4) === java.sql.Types.OTHER)
+
+ var isFound = false
+ while (showResult.next()) {
+ if (showResult.getString(1) === jobId1) {
+ isFound = true
+ assert(showResult.getString(2) === "test-job")
+ assert(showResult.getString(3) === "RUNNING")
+ assert(showResult.getObject(4).isInstanceOf[TimestampTZ])
+ }
+ }
+ assert(isFound)
+
+ val stopResult1 = statement.executeQuery(s"stop job '$jobId1'")
+ assert(stopResult1.next())
+ assert(stopResult1.getString(1) === "OK")
+
+ val selectResult = statement.executeQuery("select * from tbl_a")
+ val jobId2 = statement.asInstanceOf[KyuubiStatement].getQueryId
+ assert(jobId2 !== null)
+ while (!selectResult.next()) {
+ Thread.sleep(1000L)
+ }
+ val stopResult2 = statement.executeQuery(s"stop job '$jobId2' with savepoint")
+ assert(stopResult2.getMetaData.getColumnName(1).equals("savepoint path"))
+ assert(stopResult2.next())
+ assert(Paths.get(stopResult2.getString(1)).getFileName.toString.startsWith("savepoint-"))
+ })
+ }
+ }
+ }
+
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")
@@ -994,7 +1052,14 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
assert(metadata.getColumnName(1) === "job id")
assert(metadata.getColumnType(1) === java.sql.Types.VARCHAR)
assert(resultSet.next())
- assert(resultSet.getString(1).length == 32)
+ val jobId = resultSet.getString(1)
+ assert(jobId.length == 32)
+
+ if (FlinkEngineUtils.isFlinkVersionAtLeast("1.17")) {
+ val stopResult = statement.executeQuery(s"stop job '$jobId'")
+ assert(stopResult.next())
+ assert(stopResult.getString(1) === "OK")
+ }
})
}
@@ -1072,7 +1137,8 @@ abstract class FlinkOperationSuite extends HiveJDBCTestHelper with WithFlinkTest
test("ensure result max rows") {
withSessionConf()(Map(ENGINE_FLINK_MAX_ROWS.key -> "200"))(Map.empty) {
withJdbcStatement() { statement =>
- statement.execute("create table tbl_src (a bigint) with ('connector' = 'datagen')")
+ statement.execute("create table tbl_src (a bigint) with (" +
+ "'connector' = 'datagen', 'number-of-rows' = '1000')")
val resultSet = statement.executeQuery(s"select a from tbl_src")
var rows = 0
while (resultSet.next()) {