You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/26 04:27:57 UTC

[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3547] Fix Flink statements results validation

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new c065b88a9 [KYUUBI #3547] Fix Flink statements results validation
c065b88a9 is described below

commit c065b88a9792326804ad45dbf8b97ac79b975c9c
Author: Paul Lin <pa...@gmail.com>
AuthorDate: Mon Sep 26 12:23:54 2022 +0800

    [KYUUBI #3547] Fix Flink statements results validation
    
    ### _Why are the changes needed?_
    
    The statement results were not available with Flink 1.14 because of a bug, but as we upgraded to Flink 1.15, now we could validate the result sets.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3548 from link3280/KYUUBI-3547.
    
    Closes #3547
    
    b0796a23 [Paul Lin] [KYUUBI #3547] Fix broken test for Flink 1.14
    bbb3077e [Paul Lin] [KYUUBI #3547] Fix Flink statements results validation
    
    Authored-by: Paul Lin <pa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 5f436661e04e00fbe5fa9a26056b98c3ca98d644)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../flink/operation/FlinkOperationSuite.scala      | 76 +++++++++++++++++-----
 1 file changed, 60 insertions(+), 16 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 8f29f7474..75b22c93b 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -702,11 +702,20 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
     }
   }
 
-  test("execute statement - create/alter/drop catalog") {
-    // TODO: validate table results after FLINK-25558 is resolved
+  test("execute statement - create/drop catalog") {
     withJdbcStatement()({ statement =>
-      statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
-      assert(statement.execute("drop catalog cat_a"))
+      val createResult = {
+        statement.executeQuery("create catalog cat_a with ('type'='generic_in_memory')")
+      }
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(createResult.next())
+        assert(createResult.getString(1) === "OK")
+      }
+      val dropResult = statement.executeQuery("drop catalog cat_a")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(dropResult.next())
+        assert(dropResult.getString(1) === "OK")
+      }
     })
   }
 
@@ -727,11 +736,22 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
   }
 
   test("execute statement - create/alter/drop database") {
-    // TODO: validate table results after FLINK-25558 is resolved
     withJdbcStatement()({ statement =>
-      statement.executeQuery("create database db_a")
-      assert(statement.execute("alter database db_a set ('k1' = 'v1')"))
-      assert(statement.execute("drop database db_a"))
+      val createResult = statement.executeQuery("create database db_a")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(createResult.next())
+        assert(createResult.getString(1) === "OK")
+      }
+      val alterResult = statement.executeQuery("alter database db_a set ('k1' = 'v1')")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(alterResult.next())
+        assert(alterResult.getString(1) === "OK")
+      }
+      val dropResult = statement.executeQuery("drop database db_a")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(dropResult.next())
+        assert(dropResult.getString(1) === "OK")
+      }
     })
   }
 
@@ -752,20 +772,44 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
   }
 
   test("execute statement - create/alter/drop table") {
-    // TODO: validate table results after FLINK-25558 is resolved
     withJdbcStatement()({ statement =>
-      statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
-      assert(statement.execute("alter table tbl_a rename to tbl_b"))
-      assert(statement.execute("drop table tbl_b"))
+      val createResult = {
+        statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
+      }
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(createResult.next())
+        assert(createResult.getString(1) === "OK")
+      }
+      val alterResult = statement.executeQuery("alter table tbl_a rename to tbl_b")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(alterResult.next())
+        assert(alterResult.getString(1) === "OK")
+      }
+      val dropResult = statement.executeQuery("drop table tbl_b")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(dropResult.next())
+        assert(dropResult.getString(1) === "OK")
+      }
     })
   }
 
   test("execute statement - create/alter/drop view") {
-    // TODO: validate table results after FLINK-25558 is resolved
     withMultipleConnectionJdbcStatement()({ statement =>
-      statement.executeQuery("create view view_a as select 1")
-      assert(statement.execute("alter view view_a rename to view_b"))
-      assert(statement.execute("drop view view_b"))
+      val createResult = statement.executeQuery("create view view_a as select 1")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(createResult.next())
+        assert(createResult.getString(1) === "OK")
+      }
+      val alterResult = statement.executeQuery("alter view view_a rename to view_b")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(alterResult.next())
+        assert(alterResult.getString(1) === "OK")
+      }
+      val dropResult = statement.executeQuery("drop view view_b")
+      if (isFlinkVersionAtLeast("1.15")) {
+        assert(dropResult.next())
+        assert(dropResult.getString(1) === "OK")
+      }
     })
   }