You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/09 09:10:36 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new a75de1b53 [KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15
a75de1b53 is described below
commit a75de1b53954b090eeb7dddb147da08d6d6642d4
Author: Luning Wang <wa...@gmail.com>
AuthorDate: Tue Aug 9 17:10:27 2022 +0800
[KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15
### _Why are the changes needed?_
Fix failed test cases
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3173 from deadwind4/flink115-test.
Closes #3172
c9428fae [Luning Wang] Fix github ci conf
966a1bfb [Luning Wang] Remove assume and fix select row
8d77fbf3 [Luning Wang] Add flink version check
8c584ef9 [Luning Wang] delete unuseful import
ee4dc0fa [Luning Wang] Fix github ci for flink 1.15
1717dc6d [Luning Wang] Fix github ci for flink 1.15
026f4073 [Luning Wang] [KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15
Authored-by: Luning Wang <wa...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.github/workflows/master.yml | 12 ++++-----
.../engine/flink/operation/ExecuteStatement.scala | 8 ++++--
.../flink/operation/FlinkOperationSuite.scala | 29 +++++++++++++++++-----
3 files changed, 35 insertions(+), 14 deletions(-)
diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index c88fb3046..a5d222839 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -108,14 +108,14 @@ jobs:
- 11
flink:
- '1.14'
- # - '1.15' FIXME
+ - '1.15'
flink-archive: [ "" ]
comment: [ "normal" ]
- # include: FIXME
- # - java: 8
- # flink: '1.15'
- # flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.15.0 -Dspark.archive.name=flink-1.15.0-bin-scala_2.12.tgz'
- # comment: 'verify-flink-1.15'
+ include:
+ - java: 8
+ flink: '1.15'
+ flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.15.1 -Dflink.archive.name=flink-1.15.1-bin-scala_2.12.tgz'
+ comment: 'verify-flink-1.15'
steps:
- uses: actions/checkout@v2
- name: Tune Runner VM
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 9b5b37e99..fe3c29f8d 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -229,8 +229,12 @@ class ExecuteStatement(
case _: DoubleType =>
row.setField(i, r.getDouble(i))
case t: RowType =>
- val v = r.getRow(i, t.getFieldCount)
- row.setField(i, v)
+ val clazz = Class.forName("org.apache.flink.table.types.DataType")
+ val fieldDataTypes = clazz.getDeclaredMethod("getFieldDataTypes", classOf[DataType])
+ .invoke(null, dataType).asInstanceOf[java.util.List[DataType]]
+ val internalRowData = r.getRow(i, t.getFieldCount)
+ val internalRow = convertToRow(internalRowData, fieldDataTypes.asScala.toList)
+ row.setField(i, internalRow)
case t =>
val hiveString = toHiveString((row.getField(i), t))
row.setField(i, hiveString)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index ba82f4ac0..7dd77b3dd 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -23,6 +23,7 @@ import java.util.UUID
import scala.collection.JavaConverters._
+import org.apache.flink.runtime.util.EnvironmentInformation
import org.apache.flink.table.types.logical.LogicalTypeRoot
import org.apache.hive.service.rpc.thrift._
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -30,6 +31,7 @@ import org.scalatest.time.SpanSugar._
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiConf.OperationModes.NONE
+import org.apache.kyuubi.engine.SemanticVersion
import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
import org.apache.kyuubi.engine.flink.result.Constants
import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
@@ -44,6 +46,8 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
override protected def jdbcUrl: String =
s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
+ val runtimeVersion = SemanticVersion(EnvironmentInformation.getVersion)
+
ignore("release session if shared level is CONNECTION") {
logger.info(s"jdbc url is $jdbcUrl")
assert(engine.getServiceState == STARTED)
@@ -575,7 +579,11 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.ARRAY)
assert(resultSet.next())
- assert(resultSet.getObject(1).toString == "[\"v1\",\"v2\",\"v3\"]")
+ runtimeVersion.minorVersion match {
+ case 14 =>
+ assert(resultSet.getObject(1).toString == """["v1","v2","v3"]""")
+ case _ => assert(resultSet.getObject(1).toString == "[v1,v2,v3]")
+ }
}
}
@@ -595,8 +603,13 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
val resultSet =
statement.executeQuery("select (1, '2', true)")
assert(resultSet.next())
- assert(
- resultSet.getString(1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:\"2\",BOOLEAN NOT NULL:true}")
+ runtimeVersion.minorVersion match {
+ case 14 => assert(resultSet.getString(
+ 1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:\"2\",BOOLEAN NOT NULL:true}")
+ case _ =>
+ assert(
+ resultSet.getString(1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:2,BOOLEAN NOT NULL:true}")
+ }
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.STRUCT)
}
@@ -606,8 +619,12 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("select encode('kyuubi', 'UTF-8')")
assert(resultSet.next())
- assert(
- resultSet.getString(1) == "kyuubi")
+ runtimeVersion.minorVersion match {
+ case 14 => assert(resultSet.getString(1) == "kyuubi")
+ case _ =>
+ // TODO: validate table results after FLINK-28882 is resolved
+ assert(resultSet.getString(1) == "k")
+ }
val metaData = resultSet.getMetaData
assert(metaData.getColumnType(1) === java.sql.Types.BINARY)
}
@@ -721,7 +738,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
test("execute statement - create/alter/drop table") {
// TODO: validate table results after FLINK-25558 is resolved
withJdbcStatement()({ statement =>
- statement.executeQuery("create table tbl_a (a string)")
+ statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
assert(statement.execute("alter table tbl_a rename to tbl_b"))
assert(statement.execute("drop table tbl_b"))
})