You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/07/25 04:04:07 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3106] Correct `RelMetadataProvider` used in flink-sql-engine
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 0b3f6b739 [KYUUBI #3106] Correct `RelMetadataProvider` used in flink-sql-engine
0b3f6b739 is described below
commit 0b3f6b739d3cd253d97a85e142d1d817ff1f6d8b
Author: liaoyt <li...@gmail.com>
AuthorDate: Mon Jul 25 12:03:58 2022 +0800
[KYUUBI #3106] Correct `RelMetadataProvider` used in flink-sql-engine
### _Why are the changes needed?_
Currently flink-sql-engine can not run sql statement like followed, which is a normal case in flink sql.
```sql
select count(*) from tbl_src;
```
In Flink the default RelMetadataProvider is `org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider`, and in Kyuubi we use `org.apache.calcite.rel.metadata.DefaultRelMetadataProvider`.
We should change the RelMetadataProvider to the flink one to support all flink sql syntax.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3106 from liaoyt/master.
Closes #3106
029cda5b [liaoyt] fix checkstyle
Authored-by: liaoyt <li...@gmail.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kyuubi/engine/flink/operation/ExecuteStatement.scala | 5 +++--
.../kyuubi/engine/flink/operation/FlinkOperationSuite.scala | 10 ++++++++++
2 files changed, 13 insertions(+), 2 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
index 10e2a271b..d1a8d2c7a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/ExecuteStatement.scala
@@ -24,13 +24,14 @@ import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
-import org.apache.calcite.rel.metadata.{DefaultRelMetadataProvider, JaninoRelMetadataProvider, RelMetadataQueryBase}
+import org.apache.calcite.rel.metadata.{JaninoRelMetadataProvider, RelMetadataQueryBase}
import org.apache.flink.table.api.ResultKind
import org.apache.flink.table.client.gateway.TypedResult
import org.apache.flink.table.data.{GenericArrayData, GenericMapData, RowData}
import org.apache.flink.table.data.binary.{BinaryArrayData, BinaryMapData}
import org.apache.flink.table.operations.{Operation, QueryOperation}
import org.apache.flink.table.operations.command._
+import org.apache.flink.table.planner.plan.metadata.FlinkDefaultRelMetadataProvider
import org.apache.flink.table.types.DataType
import org.apache.flink.table.types.logical._
import org.apache.flink.types.Row
@@ -99,7 +100,7 @@ class ExecuteStatement(
// set the thread variable THREAD_PROVIDERS
RelMetadataQueryBase.THREAD_PROVIDERS.set(
- JaninoRelMetadataProvider.of(DefaultRelMetadataProvider.INSTANCE))
+ JaninoRelMetadataProvider.of(FlinkDefaultRelMetadataProvider.INSTANCE))
val operation = executor.parseStatement(sessionId, statement)
operation match {
case queryOperation: QueryOperation => runQueryOperation(queryOperation)
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 4c716621e..ba82f4ac0 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -622,6 +622,16 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
})
}
+ test("execute statement - select count") {
+ withJdbcStatement() { statement =>
+ statement.execute(
+ "create table tbl_src (a int) with ('connector' = 'datagen', 'number-of-rows' = '100')")
+ val resultSet = statement.executeQuery(s"select count(a) from tbl_src")
+ assert(resultSet.next())
+ assert(resultSet.getInt(1) === 100)
+ }
+ }
+
test("execute statement - show functions") {
withJdbcStatement() { statement =>
val resultSet = statement.executeQuery("show functions")