You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/17 10:42:48 UTC
[incubator-kyuubi] branch master updated: [KYUUBI #3498] Fix GetTables operation in Flink
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new dce40c370 [KYUUBI #3498] Fix GetTables operation in Flink
dce40c370 is described below
commit dce40c37041253a426277dd54bb60713a2dfc24d
Author: sychen <sy...@ctrip.com>
AuthorDate: Sat Sep 17 18:42:38 2022 +0800
[KYUUBI #3498] Fix GetTables operation in Flink
### _Why are the changes needed?_
#1646
In Flink, the schema returned by `GetTables` is incorrect, causing some database management tools to not see the table.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3498 from cxzl25/flink_get_tables.
Closes #3498
6e51487f [sychen] compile
cba06130 [sychen] toArray
37bd6aa2 [sychen] style
93fb80ce [sychen] GetTables
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../kyuubi/engine/flink/result/Constants.java | 2 -
.../flink/operation/FlinkSQLOperationManager.scala | 6 +-
.../kyuubi/engine/flink/operation/GetTables.scala | 77 +++++++++++++++-------
.../flink/operation/FlinkOperationSuite.scala | 26 ++++++--
4 files changed, 76 insertions(+), 35 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
index 889bcf95f..b683eb76a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
@@ -21,8 +21,6 @@ package org.apache.kyuubi.engine.flink.result;
/** Constant column names. */
public class Constants {
- public static final String SHOW_TABLES_RESULT = "tables";
-
public static final String TABLE_TYPE = "TABLE";
public static final String VIEW_TYPE = "VIEW";
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index a42075abc..a342dfd29 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -128,9 +128,9 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
val op = new GetTables(
session = session,
- catalog = catalogName,
- schema = schemaName,
- tableName = tableName,
+ catalogNameOrEmpty = catalogName,
+ schemaNamePattern = schemaName,
+ tableNamePattern = tableName,
tableTypes = tTypes)
addOperation(op)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
index 0e9affd2a..e0e362fd7 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
@@ -21,16 +21,19 @@ import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
import org.apache.commons.lang3.StringUtils
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
+import org.apache.flink.types.Row
-import org.apache.kyuubi.engine.flink.result.{Constants, ResultSetUtil}
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
class GetTables(
session: Session,
- catalog: String,
- schema: String,
- tableName: String,
+ catalogNameOrEmpty: String,
+ schemaNamePattern: String,
+ tableNamePattern: String,
tableTypes: Set[String])
extends FlinkOperation(session) {
@@ -38,33 +41,59 @@ class GetTables(
try {
val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
- val catalogName = if (StringUtils.isEmpty(catalog)) tableEnv.getCurrentCatalog else catalog
+ val catalogName =
+ if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+ else catalogNameOrEmpty
- val schemaPattern = toJavaRegex(schema).r
- val tableNamePattern = toJavaRegex(tableName).r
+ val schemaNameRegex = toJavaRegex(schemaNamePattern).r
+ val tableNameRegex = toJavaRegex(tableNamePattern).r
- val tables = tableEnv.getCatalog(catalogName).asScala.toSeq.flatMap { flinkCatalog =>
+ val tables = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
flinkCatalog.listDatabases().asScala
- .filter { _schema => schemaPattern.pattern.matcher(_schema).matches() }
- .flatMap { _schema =>
- flinkCatalog.listTables(_schema).asScala
- .filter { _table => tableNamePattern.pattern.matcher(_table).matches() }
- .filter { _table =>
- // skip check type of every table if request all types
- if (Set(Constants.TABLE_TYPE, Constants.VIEW_TYPE) subsetOf tableTypes) {
- true
- } else {
- val objPath = ObjectIdentifier.of(catalogName, _schema, _table).toObjectPath
- Try(flinkCatalog.getTable(objPath)) match {
- case Success(table) => tableTypes.contains(table.getTableKind.name)
- case Failure(_) => false
- }
+ .filter { schemaName => schemaNameRegex.pattern.matcher(schemaName).matches() }
+ .flatMap { schemaName =>
+ flinkCatalog.listTables(schemaName).asScala
+ .filter { tableName => tableNameRegex.pattern.matcher(tableName).matches() }
+ .map { tableName =>
+ val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
+ Try(flinkCatalog.getTable(objPath)) match {
+ case Success(flinkTable) => (tableName, Some(flinkTable))
+ case Failure(_) => (tableName, None)
}
}
+ .filter {
+ case (_, None) => false
+ case (_, Some(flinkTable)) => tableTypes.contains(flinkTable.getTableKind.name)
+ }.map { case (tableName, flinkTable) =>
+ Row.of(
+ catalogName,
+ schemaName,
+ tableName,
+ flinkTable.map(_.getTableKind.name).getOrElse(""),
+ flinkTable.map(_.getComment).getOrElse(""),
+ null,
+ null,
+ null,
+ null,
+ null)
+ }
}
}
- resultSet = ResultSetUtil.stringListToResultSet(tables.toList, Constants.SHOW_TABLES_RESULT)
+ resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(
+ Column.physical(TABLE_CAT, DataTypes.STRING),
+ Column.physical(TABLE_SCHEM, DataTypes.STRING),
+ Column.physical(TABLE_NAME, DataTypes.STRING),
+ Column.physical(TABLE_TYPE, DataTypes.STRING),
+ Column.physical(REMARKS, DataTypes.STRING),
+ Column.physical("TYPE_CAT", DataTypes.STRING),
+ Column.physical("TYPE_SCHEM", DataTypes.STRING),
+ Column.physical("TYPE_NAME", DataTypes.STRING),
+ Column.physical("SELF_REFERENCING_COL_NAME", DataTypes.STRING),
+ Column.physical("REF_GENERATION", DataTypes.STRING))
+ .data(tables)
+ .build
} catch onError()
}
}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 2c2485b94..6cafe1ae4 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -389,7 +389,9 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
| id int,
| name string,
| price double
- | ) with (
+ | )
+ | comment 'table_comment'
+ | with (
| 'connector' = 'filesystem'
| )
""".stripMargin)
@@ -403,25 +405,37 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
val metaData = statement.getConnection.getMetaData
val rs1 = metaData.getTables(null, null, null, null)
assert(rs1.next())
- assert(rs1.getString(1) == table)
+ assert(rs1.getString(1) == "default_catalog")
+ assert(rs1.getString(2) == "default_database")
+ assert(rs1.getString(3) == table)
+ assert(rs1.getString(4) == "TABLE")
+ assert(rs1.getString(5) == "table_comment")
assert(rs1.next())
- assert(rs1.getString(1) == table_view)
+ assert(rs1.getString(1) == "default_catalog")
+ assert(rs1.getString(2) == "default_database")
+ assert(rs1.getString(3) == table_view)
+ assert(rs1.getString(4) == "VIEW")
+ assert(rs1.getString(5) == "")
// get table , table name like table%
val rs2 = metaData.getTables(null, null, "table%", Array("TABLE"))
assert(rs2.next())
- assert(rs2.getString(1) == table)
+ assert(rs2.getString(1) == "default_catalog")
+ assert(rs2.getString(2) == "default_database")
+ assert(rs2.getString(3) == table)
assert(!rs2.next())
// get view , view name like *
val rs3 = metaData.getTables(null, "default_database", "*", Array("VIEW"))
assert(rs3.next())
- assert(rs3.getString(1) == table_view)
+ assert(rs3.getString(1) == "default_catalog")
+ assert(rs3.getString(2) == "default_database")
+ assert(rs3.getString(3) == table_view)
// get view , view name like *, schema pattern like default_%
val rs4 = metaData.getTables(null, "default_%", "*", Array("VIEW"))
assert(rs4.next())
- assert(rs4.getString(1) == table_view)
+ assert(rs4.getString(3) == table_view)
// get view , view name like *, schema pattern like no_exists_%
val rs5 = metaData.getTables(null, "no_exists_%", "*", Array("VIEW"))