You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/17 10:43:02 UTC

[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3498] Fix GetTables operation in Flink

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new d2f76f7ec [KYUUBI #3498] Fix GetTables operation in Flink
d2f76f7ec is described below

commit d2f76f7ecd1ad0570295b7f38105abfc94fa2d01
Author: sychen <sy...@ctrip.com>
AuthorDate: Sat Sep 17 18:42:38 2022 +0800

    [KYUUBI #3498] Fix GetTables operation in Flink
    
    ### _Why are the changes needed?_
    #1646
    
    In Flink, the schema returned by `GetTables` is incorrect, causing some database management tools to not see the table.
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3498 from cxzl25/flink_get_tables.
    
    Closes #3498
    
    6e51487f [sychen] compile
    cba06130 [sychen] toArray
    37bd6aa2 [sychen] style
    93fb80ce [sychen] GetTables
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit dce40c37041253a426277dd54bb60713a2dfc24d)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../kyuubi/engine/flink/result/Constants.java      |  2 -
 .../flink/operation/FlinkSQLOperationManager.scala |  6 +-
 .../kyuubi/engine/flink/operation/GetTables.scala  | 77 +++++++++++++++-------
 .../flink/operation/FlinkOperationSuite.scala      | 26 ++++++--
 4 files changed, 76 insertions(+), 35 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
index 889bcf95f..b683eb76a 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
+++ b/externals/kyuubi-flink-sql-engine/src/main/java/org/apache/kyuubi/engine/flink/result/Constants.java
@@ -21,8 +21,6 @@ package org.apache.kyuubi.engine.flink.result;
 /** Constant column names. */
 public class Constants {
 
-  public static final String SHOW_TABLES_RESULT = "tables";
-
   public static final String TABLE_TYPE = "TABLE";
   public static final String VIEW_TYPE = "VIEW";
 
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 20ad4b434..48a23b202 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -127,9 +127,9 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
 
     val op = new GetTables(
       session = session,
-      catalog = catalogName,
-      schema = schemaName,
-      tableName = tableName,
+      catalogNameOrEmpty = catalogName,
+      schemaNamePattern = schemaName,
+      tableNamePattern = tableName,
       tableTypes = tTypes)
 
     addOperation(op)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
index 0e9affd2a..e0e362fd7 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
@@ -21,16 +21,19 @@ import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
 import org.apache.commons.lang3.StringUtils
-import org.apache.flink.table.catalog.ObjectIdentifier
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
+import org.apache.flink.types.Row
 
-import org.apache.kyuubi.engine.flink.result.{Constants, ResultSetUtil}
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
 
 class GetTables(
     session: Session,
-    catalog: String,
-    schema: String,
-    tableName: String,
+    catalogNameOrEmpty: String,
+    schemaNamePattern: String,
+    tableNamePattern: String,
     tableTypes: Set[String])
   extends FlinkOperation(session) {
 
@@ -38,33 +41,59 @@ class GetTables(
     try {
       val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
 
-      val catalogName = if (StringUtils.isEmpty(catalog)) tableEnv.getCurrentCatalog else catalog
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
 
-      val schemaPattern = toJavaRegex(schema).r
-      val tableNamePattern = toJavaRegex(tableName).r
+      val schemaNameRegex = toJavaRegex(schemaNamePattern).r
+      val tableNameRegex = toJavaRegex(tableNamePattern).r
 
-      val tables = tableEnv.getCatalog(catalogName).asScala.toSeq.flatMap { flinkCatalog =>
+      val tables = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
         flinkCatalog.listDatabases().asScala
-          .filter { _schema => schemaPattern.pattern.matcher(_schema).matches() }
-          .flatMap { _schema =>
-            flinkCatalog.listTables(_schema).asScala
-              .filter { _table => tableNamePattern.pattern.matcher(_table).matches() }
-              .filter { _table =>
-                // skip check type of every table if request all types
-                if (Set(Constants.TABLE_TYPE, Constants.VIEW_TYPE) subsetOf tableTypes) {
-                  true
-                } else {
-                  val objPath = ObjectIdentifier.of(catalogName, _schema, _table).toObjectPath
-                  Try(flinkCatalog.getTable(objPath)) match {
-                    case Success(table) => tableTypes.contains(table.getTableKind.name)
-                    case Failure(_) => false
-                  }
+          .filter { schemaName => schemaNameRegex.pattern.matcher(schemaName).matches() }
+          .flatMap { schemaName =>
+            flinkCatalog.listTables(schemaName).asScala
+              .filter { tableName => tableNameRegex.pattern.matcher(tableName).matches() }
+              .map { tableName =>
+                val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
+                Try(flinkCatalog.getTable(objPath)) match {
+                  case Success(flinkTable) => (tableName, Some(flinkTable))
+                  case Failure(_) => (tableName, None)
                 }
               }
+              .filter {
+                case (_, None) => false
+                case (_, Some(flinkTable)) => tableTypes.contains(flinkTable.getTableKind.name)
+              }.map { case (tableName, flinkTable) =>
+                Row.of(
+                  catalogName,
+                  schemaName,
+                  tableName,
+                  flinkTable.map(_.getTableKind.name).getOrElse(""),
+                  flinkTable.map(_.getComment).getOrElse(""),
+                  null,
+                  null,
+                  null,
+                  null,
+                  null)
+              }
           }
       }
 
-      resultSet = ResultSetUtil.stringListToResultSet(tables.toList, Constants.SHOW_TABLES_RESULT)
+      resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical(TABLE_CAT, DataTypes.STRING),
+          Column.physical(TABLE_SCHEM, DataTypes.STRING),
+          Column.physical(TABLE_NAME, DataTypes.STRING),
+          Column.physical(TABLE_TYPE, DataTypes.STRING),
+          Column.physical(REMARKS, DataTypes.STRING),
+          Column.physical("TYPE_CAT", DataTypes.STRING),
+          Column.physical("TYPE_SCHEM", DataTypes.STRING),
+          Column.physical("TYPE_NAME", DataTypes.STRING),
+          Column.physical("SELF_REFERENCING_COL_NAME", DataTypes.STRING),
+          Column.physical("REF_GENERATION", DataTypes.STRING))
+        .data(tables)
+        .build
     } catch onError()
   }
 }
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index c402d5cd5..8f29f7474 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -390,7 +390,9 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
            |  id int,
            |  name string,
            |  price double
-           | ) with (
+           | )
+           | comment 'table_comment'
+           | with (
            |   'connector' = 'filesystem'
            | )
        """.stripMargin)
@@ -404,25 +406,37 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
       val metaData = statement.getConnection.getMetaData
       val rs1 = metaData.getTables(null, null, null, null)
       assert(rs1.next())
-      assert(rs1.getString(1) == table)
+      assert(rs1.getString(1) == "default_catalog")
+      assert(rs1.getString(2) == "default_database")
+      assert(rs1.getString(3) == table)
+      assert(rs1.getString(4) == "TABLE")
+      assert(rs1.getString(5) == "table_comment")
       assert(rs1.next())
-      assert(rs1.getString(1) == table_view)
+      assert(rs1.getString(1) == "default_catalog")
+      assert(rs1.getString(2) == "default_database")
+      assert(rs1.getString(3) == table_view)
+      assert(rs1.getString(4) == "VIEW")
+      assert(rs1.getString(5) == "")
 
       // get table , table name like table%
       val rs2 = metaData.getTables(null, null, "table%", Array("TABLE"))
       assert(rs2.next())
-      assert(rs2.getString(1) == table)
+      assert(rs2.getString(1) == "default_catalog")
+      assert(rs2.getString(2) == "default_database")
+      assert(rs2.getString(3) == table)
       assert(!rs2.next())
 
       // get view , view name like *
       val rs3 = metaData.getTables(null, "default_database", "*", Array("VIEW"))
       assert(rs3.next())
-      assert(rs3.getString(1) == table_view)
+      assert(rs3.getString(1) == "default_catalog")
+      assert(rs3.getString(2) == "default_database")
+      assert(rs3.getString(3) == table_view)
 
       // get view , view name like *, schema pattern like default_%
       val rs4 = metaData.getTables(null, "default_%", "*", Array("VIEW"))
       assert(rs4.next())
-      assert(rs4.getString(1) == table_view)
+      assert(rs4.getString(3) == table_view)
 
       // get view , view name like *, schema pattern like no_exists_%
       val rs5 = metaData.getTables(null, "no_exists_%", "*", Array("VIEW"))