You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/08/09 09:10:36 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new a75de1b53 [KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15
a75de1b53 is described below

commit a75de1b53954b090eeb7dddb147da08d6d6642d4
Author: Luning Wang <wa...@gmail.com>
AuthorDate: Tue Aug 9 17:10:27 2022 +0800

    [KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15
    
    ### _Why are the changes needed?_
    
    Fix failed test cases
    
    ### _How was this patch tested?_
    - [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3173 from deadwind4/flink115-test.
    
    Closes #3172
    
    c9428fae [Luning Wang] Fix github ci conf
    966a1bfb [Luning Wang] Remove assume and fix select row
    8d77fbf3 [Luning Wang] Add flink version check
    8c584ef9 [Luning Wang] delete unuseful import
    ee4dc0fa [Luning Wang] Fix github ci for flink 1.15
    1717dc6d [Luning Wang] Fix github ci for flink 1.15
    026f4073 [Luning Wang] [KYUUBI #3172][FLINK] Fix failed test cases in Flink 1.15
    
    Authored-by: Luning Wang <wa...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .github/workflows/master.yml                       | 12 ++++-----
 .../engine/flink/operation/ExecuteStatement.scala  |  8 ++++--
 .../flink/operation/FlinkOperationSuite.scala      | 29 +++++++++++++++++-----
 3 files changed, 35 insertions(+), 14 deletions(-)

diff --git a/.github/workflows/master.yml b/.github/workflows/master.yml
index c88fb3046..a5d222839 100644
--- a/.github/workflows/master.yml
+++ b/.github/workflows/master.yml
@@ -108,14 +108,14 @@ jobs:
           - 11
         flink:
           - '1.14'
-          # - '1.15' FIXME
+          - '1.15'
         flink-archive: [ "" ]
         comment: [ "normal" ]
-        # include: FIXME
-        #   - java: 8
-        #     flink: '1.15'
-        #     flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.15.0 -Dspark.archive.name=flink-1.15.0-bin-scala_2.12.tgz'
-        #     comment: 'verify-flink-1.15'
+        include:
+          - java: 8
+            flink: '1.15'
+            flink-archive: '-Dflink.archive.mirror=https://archive.apache.org/dist/flink/flink-1.15.1 -Dflink.archive.name=flink-1.15.1-bin-scala_2.12.tgz'
+            comment: 'verify-flink-1.15'
     steps:
       - uses: actions/checkout@v2
       - name: Tune Runner VM
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 9b5b37e99..fe3c29f8d 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -229,8 +229,12 @@ class ExecuteStatement(
         case _: DoubleType =>
           row.setField(i, r.getDouble(i))
         case t: RowType =>
-          val v = r.getRow(i, t.getFieldCount)
-          row.setField(i, v)
+          val clazz = Class.forName("org.apache.flink.table.types.DataType")
+          val fieldDataTypes = clazz.getDeclaredMethod("getFieldDataTypes", classOf[DataType])
+            .invoke(null, dataType).asInstanceOf[java.util.List[DataType]]
+          val internalRowData = r.getRow(i, t.getFieldCount)
+          val internalRow = convertToRow(internalRowData, fieldDataTypes.asScala.toList)
+          row.setField(i, internalRow)
         case t =>
           val hiveString = toHiveString((row.getField(i), t))
           row.setField(i, hiveString)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index ba82f4ac0..7dd77b3dd 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -23,6 +23,7 @@ import java.util.UUID
 
 import scala.collection.JavaConverters._
 
+import org.apache.flink.runtime.util.EnvironmentInformation
 import org.apache.flink.table.types.logical.LogicalTypeRoot
 import org.apache.hive.service.rpc.thrift._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -30,6 +31,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.kyuubi.config.KyuubiConf._
 import org.apache.kyuubi.config.KyuubiConf.OperationModes.NONE
+import org.apache.kyuubi.engine.SemanticVersion
 import org.apache.kyuubi.engine.flink.WithFlinkSQLEngine
 import org.apache.kyuubi.engine.flink.result.Constants
 import org.apache.kyuubi.engine.flink.util.TestUserClassLoaderJar
@@ -44,6 +46,8 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
   override protected def jdbcUrl: String =
     s"jdbc:hive2://${engine.frontendServices.head.connectionUrl}/;"
 
+  val runtimeVersion = SemanticVersion(EnvironmentInformation.getVersion)
+
   ignore("release session if shared level is CONNECTION") {
     logger.info(s"jdbc url is $jdbcUrl")
     assert(engine.getServiceState == STARTED)
@@ -575,7 +579,11 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
       val metaData = resultSet.getMetaData
       assert(metaData.getColumnType(1) === java.sql.Types.ARRAY)
       assert(resultSet.next())
-      assert(resultSet.getObject(1).toString == "[\"v1\",\"v2\",\"v3\"]")
+      runtimeVersion.minorVersion match {
+        case 14 =>
+          assert(resultSet.getObject(1).toString == """["v1","v2","v3"]""")
+        case _ => assert(resultSet.getObject(1).toString == "[v1,v2,v3]")
+      }
     }
   }
 
@@ -595,8 +603,13 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
       val resultSet =
         statement.executeQuery("select (1, '2', true)")
       assert(resultSet.next())
-      assert(
-        resultSet.getString(1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:\"2\",BOOLEAN NOT NULL:true}")
+      runtimeVersion.minorVersion match {
+        case 14 => assert(resultSet.getString(
+            1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:\"2\",BOOLEAN NOT NULL:true}")
+        case _ =>
+          assert(
+            resultSet.getString(1) == "{INT NOT NULL:1,CHAR(1) NOT NULL:2,BOOLEAN NOT NULL:true}")
+      }
       val metaData = resultSet.getMetaData
       assert(metaData.getColumnType(1) === java.sql.Types.STRUCT)
     }
@@ -606,8 +619,12 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("select encode('kyuubi', 'UTF-8')")
       assert(resultSet.next())
-      assert(
-        resultSet.getString(1) == "kyuubi")
+      runtimeVersion.minorVersion match {
+        case 14 => assert(resultSet.getString(1) == "kyuubi")
+        case _ =>
+          // TODO: validate table results after FLINK-28882 is resolved
+          assert(resultSet.getString(1) == "k")
+      }
       val metaData = resultSet.getMetaData
       assert(metaData.getColumnType(1) === java.sql.Types.BINARY)
     }
@@ -721,7 +738,7 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
   test("execute statement - create/alter/drop table") {
     // TODO: validate table results after FLINK-25558 is resolved
     withJdbcStatement()({ statement =>
-      statement.executeQuery("create table tbl_a (a string)")
+      statement.executeQuery("create table tbl_a (a string) with ('connector' = 'blackhole')")
       assert(statement.execute("alter table tbl_a rename to tbl_b"))
       assert(statement.execute("drop table tbl_b"))
     })