You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by cs...@apache.org on 2022/09/27 07:29:32 UTC
[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3560] Flink SQL engine supports run DDL across versions
This is an automated email from the ASF dual-hosted git repository.
csy pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new 34ef8805d [KYUUBI #3560] Flink SQL engine supports run DDL across versions
34ef8805d is described below
commit 34ef8805d87ddade3e47f39e14af84e1eb8f0407
Author: df_liu <df...@trip.com>
AuthorDate: Tue Sep 27 15:28:37 2022 +0800
[KYUUBI #3560] Flink SQL engine supports run DDL across versions
### _Why are the changes needed?_
Followup #3230
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3560 from df-Liu/flink_ddl.
Closes #3560
0dbdfb3f [df_liu] flink ddl
Authored-by: df_liu <df...@trip.com>
Signed-off-by: Shaoyun Chen <cs...@apache.org>
(cherry picked from commit d06c656cc648bbcf96d96baaa6a2faac0d7844be)
Signed-off-by: Shaoyun Chen <cs...@apache.org>
---
.../apache/kyuubi/engine/flink/operation/ExecuteStatement.scala | 9 +++++++--
.../apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala | 8 ++++++++
2 files changed, 15 insertions(+), 2 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 374f62f18..182e3e3ef 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import org.apache.flink.api.common.JobID
-import org.apache.flink.table.api.ResultKind
+import org.apache.flink.table.api.{ResultKind, TableResult}
import org.apache.flink.table.client.gateway.TypedResult
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
@@ -154,7 +154,12 @@ class ExecuteStatement(
}
private def runOperation(operation: Operation): Unit = {
- val result = executor.executeOperation(sessionId, operation)
+ // FLINK-24461 executeOperation method changes the return type
+ // from TableResult to TableResultInternal
+ val executeOperation = DynMethods.builder("executeOperation")
+ .impl(executor.getClass, classOf[String], classOf[Operation])
+ .build(executor)
+ val result = executeOperation.invoke[TableResult](sessionId, operation)
jobId = result.getJobClient.asScala.map(_.getJobID)
result.await()
resultSet = ResultSet.fromTableResult(result)
diff --git a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
index b33669702..d4902d461 100644
--- a/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
+++ b/integration-tests/kyuubi-flink-it/src/test/scala/org/apache/kyuubi/it/flink/operation/FlinkOperationSuite.scala
@@ -48,6 +48,14 @@ class FlinkOperationSuite extends WithKyuubiServerAndFlinkMiniCluster
}
}
+ test("execute statement - create/alter/drop table") {
+ withJdbcStatement()({ statement =>
+ statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
+ assert(statement.execute("alter table tbl_a rename to tbl_b"))
+ assert(statement.execute("drop table tbl_b"))
+ })
+ }
+
test("execute statement - select column name with dots") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select 'tmp.hello'")