You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/07/25 04:04:07 UTC

[incubator-kyuubi] branch master updated: [KYUUBI #3106] Correct `RelMetadataProvider` used in flink-sql-engine

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b3f6b739 [KYUUBI #3106] Correct `RelMetadataProvider` used in flink-sql-engine
0b3f6b739 is described below

commit 0b3f6b739d3cd253d97a85e142d1d817ff1f6d8b
Author: liaoyt <li...@gmail.com>
AuthorDate: Mon Jul 25 12:03:58 2022 +0800

    [KYUUBI #3106] Correct `RelMetadataProvider` used in flink-sql-engine
    
    ### _Why are the changes needed?_
    
    Currently flink-sql-engine can not run sql statement like followed, which is a normal case in flink sql.
    ```sql
    select count(*) from tbl_src;
    ```
    In Flink the default RelMetadataProvider is `org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider`, and in Kyuubi we use `org.apache.calcite.rel.metadata.DefaultRelMetadataProvider`.
    We should change the RelMetadataProvider to the flink one to support all flink sql syntax.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [ ] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3106 from liaoyt/master.
    
    Closes #3106
    
    029cda5b [liaoyt] fix checkstyle
    
    Authored-by: liaoyt <li...@gmail.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/engine/flink/operation/ExecuteStatement.scala       |  5 +++--
 .../kyuubi/engine/flink/operation/FlinkOperationSuite.scala    | 10 ++++++++++
 2 files changed, 13 insertions(+), 2 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 10e2a271b..d1a8d2c7a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -24,13 +24,14 @@ import java.util.concurrent.RejectedExecutionException
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
+import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase}
 import org.apache.flink.table.api.ResultKind
 import org.apache.flink.table.client.gateway.TypedResult
 import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
 import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
 import org.apache.flink.table.operations.{Operation, QueryOperation}
 import org.apache.flink.table.operations.command._
+import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider
 import org.apache.flink.table.types.DataType
 import org.apache.flink.table.types.logical._
 import org.apache.flink.types.Row
@@ -99,7 +100,7 @@ class ExecuteStatement(
 
       // set the thread variable THREAD_PROVIDERS
       RelMetadataQueryBase.THREAD_PROVIDERS.set(
-        JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE))
+        JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE))
       val operation = executor.parseStatement(sessionId, statement)
       operation match {
         case queryOperation: QueryOperation => runQueryOperation(queryOperation)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 4c716621e..ba82f4ac0 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -622,6 +622,16 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
     })
   }
 
+  test("execute statement - select count") {
+    withJdbcStatement() { statement =>
+      statement.execute(
+        "create table tbl_src (a int) with ('connector' = 'datagen', 'number-of-rows' = '100')")
+      val resultSet = statement.executeQuery(s"select count(a) from tbl_src")
+      assert(resultSet.next())
+      assert(resultSet.getInt(1) === 100)
+    }
+  }
+
   test("execute statement - show functions") {
     withJdbcStatement() { statement =>
       val resultSet = statement.executeQuery("show functions")