You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by cs...@apache.org on 2022/09/27 07:29:32 UTC

[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3560] Flink SQL engine supports run DDL across versions

This is an automated email from the ASF dual-hosted git repository.

csy pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new 34ef8805d [KYUUBI #3560] Flink SQL engine supports run DDL across versions
34ef8805d is described below

commit 34ef8805d87ddade3e47f39e14af84e1eb8f0407
Author: df_liu <df...@trip.com>
AuthorDate: Tue Sep 27 15:28:37 2022 +0800

    [KYUUBI #3560] Flink SQL engine supports run DDL across versions
    
    ### _Why are the changes needed?_
    
    Followup #3230
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3560 from df-Liu/flink_ddl.
    
    Closes #3560
    
    0dbdfb3f [df_liu] flink ddl
    
    Authored-by: df_liu <df...@trip.com>
    Signed-off-by: Shaoyun Chen <cs...@apache.org>
    (cherry picked from commit d06c656cc648bbcf96d96baaa6a2faac0d7844be)
    Signed-off-by: Shaoyun Chen <cs...@apache.org>
---
 .../apache/kyuubi/engine/flink/operation/ExecuteStatement.scala  | 9 +++++++--
 .../apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala   | 8 ++++++++
 2 files changed, 15 insertions(+), 2 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 374f62f18..182e3e3ef 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.flink.api.common.JobID
-import org.apache.flink.table.api.ResultKind
+import org.apache.flink.table.api.{ResultKind, TableResult}
 import org.apache.flink.table.client.gateway.TypedResult
 import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
 import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
@@ -154,7 +154,12 @@ class ExecuteStatement(
   }
 
   private def runOperation(operation: Operation): Unit = {
-    val result = executor.executeOperation(sessionId, operation)
+    // FLINK-24461 executeOperation method changes the return type
+    // from TableResult to TableResultInternal
+    val executeOperation = DynMethods.builder("executeOperation")
+      .impl(executor.getClass, classOf[String], classOf[Operation])
+      .build(executor)
+    val result = executeOperation.invoke[TableResult](sessionId, operation)
     jobId = result.getJobClient.asScala.map(_.getJobID)
     result.await()
     resultSet = ResultSet.fromTableResult(result)
diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index b33669702..d4902d461 100644
--- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -48,6 +48,14 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
     }
   }
 
+  test("execute statement - create/alter/drop table") {
+    withJdbcStatement()({ statement =>
+      statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
+      assert(statement.execute("alter table tbl_a rename to tbl_b"))
+      assert(statement.execute("drop table tbl_b"))
+    })
+  }
+
   test("execute statement - select column name with dots") {
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("select 'tmp.hello'")