You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/26 04:24:03 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3547] Fix Flink statements results validation
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 5f436661e [KYUUBI #3547] Fix Flink statements results validation
5f436661e is described below
commit 5f436661e04e00fbe5fa9a26056b98c3ca98d644
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Sep 26 12:23:54 2022 +0800
[KYUUBI #3547] Fix Flink statements results validation
### _Why are the changes needed?_
The statement results were not available with Flink 1.14 because of a bug, but as we upgraded to Flink 1.15, now we could validate the result sets.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3548 from link3280/KYUUBI-3547.
Closes #3547
b0796a23 [Paul Lin] [KYUUBI #3547] Fix broken test for Flink 1.14
bbb3077e [Paul Lin] [KYUUBI #3547] Fix Flink statements results validation
Authored-by: Paul Lin <pa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../flink/operation/FlinkOperationSuite.scala | 76 +++++++++++++++++-----
1 file changed, 60 insertions(+), 16 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 6cafe1ae4..06edbe459 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -701,11 +701,20 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
}
- test("execute statement - create/alter/drop catalog") {
- // TODO: validate table results after FLINK-25558 is resolved
+ test("execute statement - create/drop catalog") {
withJdbcStatement()({ statement =>
- statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
- assert(statement.execute("drop catalog cat_a"))
+ val createResult = {
+ statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
+ }
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(createResult.next())
+ assert(createResult.getString(1) === "OK")
+ }
+ val dropResult = statement.executeQuery("drop catalog cat_a")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+ }
})
}
@@ -726,11 +735,22 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
test("execute statement - create/alter/drop database") {
- // TODO: validate table results after FLINK-25558 is resolved
withJdbcStatement()({ statement =>
- statement.executeQuery("create database db_a")
- assert(statement.execute("alter database db_a set ('k1' = 'v1')"))
- assert(statement.execute("drop database db_a"))
+ val createResult = statement.executeQuery("create database db_a")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(createResult.next())
+ assert(createResult.getString(1) === "OK")
+ }
+ val alterResult = statement.executeQuery("alter database db_a set ('k1' = 'v1')")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(alterResult.next())
+ assert(alterResult.getString(1) === "OK")
+ }
+ val dropResult = statement.executeQuery("drop database db_a")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+ }
})
}
@@ -751,20 +771,44 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
test("execute statement - create/alter/drop table") {
- // TODO: validate table results after FLINK-25558 is resolved
withJdbcStatement()({ statement =>
- statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
- assert(statement.execute("alter table tbl_a rename to tbl_b"))
- assert(statement.execute("drop table tbl_b"))
+ val createResult = {
+ statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
+ }
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(createResult.next())
+ assert(createResult.getString(1) === "OK")
+ }
+ val alterResult = statement.executeQuery("alter table tbl_a rename to tbl_b")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(alterResult.next())
+ assert(alterResult.getString(1) === "OK")
+ }
+ val dropResult = statement.executeQuery("drop table tbl_b")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+ }
})
}
test("execute statement - create/alter/drop view") {
- // TODO: validate table results after FLINK-25558 is resolved
withMultipleConnectionJdbcStatement()({ statement =>
- statement.executeQuery("create view view_a as select 1")
- assert(statement.execute("alter view view_a rename to view_b"))
- assert(statement.execute("drop view view_b"))
+ val createResult = statement.executeQuery("create view view_a as select 1")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(createResult.next())
+ assert(createResult.getString(1) === "OK")
+ }
+ val alterResult = statement.executeQuery("alter view view_a rename to view_b")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(alterResult.next())
+ assert(alterResult.getString(1) === "OK")
+ }
+ val dropResult = statement.executeQuery("drop view view_b")
+ if (isFlinkVersionAtLeast("1.15")) {
+ assert(dropResult.next())
+ assert(dropResult.getString(1) === "OK")
+ }
})
}