You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/28 06:13:43 UTC

[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3519] Flink SQL Engine - GetColumns Operation

This is an automated email from the ASF dual-hosted git repository.

chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git


The following commit(s) were added to refs/heads/branch-1.6 by this push:
     new b87cf97c1 [KYUUBI #3519] Flink SQL Engine - GetColumns Operation
b87cf97c1 is described below

commit b87cf97c1ad6c4d0e8196b89cb78ff4f1de02861
Author: sychen <sy...@ctrip.com>
AuthorDate: Wed Sep 28 14:13:19 2022 +0800

    [KYUUBI #3519] Flink SQL Engine - GetColumns Operation
    
    ### _Why are the changes needed?_
    #1322
    #2129
    
    ### _How was this patch tested?_
    - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
    
    - [x] Add screenshots for manual tests if appropriate
    
    - [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
    
    Closes #3519 from cxzl25/flink_get_columns.
    
    Closes #3519
    
    ab81776f [sychen] add column size
    efdf1b90 [sychen] indent
    630e907a [sychen] refactor
    48a79d5b [sychen] add ut
    69763bd3 [sychen] GetColumns
    8e5e6c51 [sychen] GetColumns
    
    Authored-by: sychen <sy...@ctrip.com>
    Signed-off-by: Cheng Pan <ch...@apache.org>
    (cherry picked from commit 8419b7bafd0e39115f9b11b29e5b6f3c7f89451d)
    Signed-off-by: Cheng Pan <ch...@apache.org>
---
 .../flink/operation/FlinkSQLOperationManager.scala |  11 +-
 .../kyuubi/engine/flink/operation/GetColumns.scala | 217 +++++++++++++++++++++
 .../kyuubi/engine/flink/operation/GetTables.scala  |  27 +--
 .../kyuubi/engine/flink/schema/SchemaHelper.scala  |  63 ++++++
 .../flink/operation/FlinkOperationSuite.scala      |  98 +++++++++-
 5 files changed, 389 insertions(+), 27 deletions(-)

diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 92f84b692..774bfb8be 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -146,9 +146,14 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
       schemaName: String,
       tableName: String,
       columnName: String): Operation = {
-    throw new UnsupportedOperationException(
-      "Unsupported Operation type GetColumns. You can execute " +
-        "DESCRIBE statement instead to get column infos.")
+    val op = new GetColumns(
+      session = session,
+      catalogNameOrEmpty = catalogName,
+      schemaNamePattern = schemaName,
+      tableNamePattern = tableName,
+      columnNamePattern = columnName)
+
+    addOperation(op)
   }
 
   override def newGetFunctionsOperation(
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
new file mode 100644
index 000000000..63f604976
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.types.logical._
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.engine.flink.schema.SchemaHelper
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetColumns(
+    session: Session,
+    catalogNameOrEmpty: String,
+    schemaNamePattern: String,
+    tableNamePattern: String,
+    columnNamePattern: String)
+  extends FlinkOperation(session) {
+
+  override protected def runInternal(): Unit = {
+    try {
+      val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+      val resolver = tableEnv match {
+        case impl: StreamTableEnvironmentImpl =>
+          impl.getCatalogManager.getSchemaResolver
+        case _ =>
+          throw new UnsupportedOperationException(
+            "Unsupported Operation type GetColumns. You can execute " +
+              "DESCRIBE statement instead to get column infos.")
+      }
+
+      val catalogName =
+        if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+        else catalogNameOrEmpty
+
+      val schemaNameRegex = toJavaRegex(schemaNamePattern)
+      val tableNameRegex = toJavaRegex(tableNamePattern)
+      val columnNameRegex = toJavaRegex(columnNamePattern).r
+
+      val columns = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
+        SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
+          .flatMap { schemaName =>
+            SchemaHelper.getFlinkTablesWithPattern(
+              flinkCatalog,
+              catalogName,
+              schemaName,
+              tableNameRegex)
+              .filter { _._2.isDefined }
+              .flatMap { case (tableName, flinkTable) =>
+                val resolvedSchema = flinkTable.get.getUnresolvedSchema.resolve(resolver)
+                resolvedSchema.getColumns.asScala.toArray.zipWithIndex
+                  .filter { case (column, _) =>
+                    columnNameRegex.pattern.matcher(column.getName).matches()
+                  }
+                  .map { case (column, pos) =>
+                    toColumnResult(catalogName, schemaName, tableName, column, pos)
+                  }
+              }
+          }
+      }
+
+      resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+        .columns(
+          Column.physical(TABLE_CAT, DataTypes.STRING),
+          Column.physical(TABLE_SCHEM, DataTypes.STRING),
+          Column.physical(TABLE_NAME, DataTypes.STRING),
+          Column.physical(COLUMN_NAME, DataTypes.STRING),
+          Column.physical(DATA_TYPE, DataTypes.INT),
+          Column.physical(TYPE_NAME, DataTypes.STRING),
+          Column.physical(COLUMN_SIZE, DataTypes.INT),
+          Column.physical(BUFFER_LENGTH, DataTypes.TINYINT),
+          Column.physical(DECIMAL_DIGITS, DataTypes.INT),
+          Column.physical(NUM_PREC_RADIX, DataTypes.INT),
+          Column.physical(NULLABLE, DataTypes.INT),
+          Column.physical(REMARKS, DataTypes.STRING),
+          Column.physical(COLUMN_DEF, DataTypes.STRING),
+          Column.physical(SQL_DATA_TYPE, DataTypes.INT),
+          Column.physical(SQL_DATETIME_SUB, DataTypes.INT),
+          Column.physical(CHAR_OCTET_LENGTH, DataTypes.INT),
+          Column.physical(ORDINAL_POSITION, DataTypes.INT),
+          Column.physical(IS_NULLABLE, DataTypes.STRING),
+          Column.physical(SCOPE_CATALOG, DataTypes.STRING),
+          Column.physical(SCOPE_SCHEMA, DataTypes.STRING),
+          Column.physical(SCOPE_TABLE, DataTypes.STRING),
+          Column.physical(SOURCE_DATA_TYPE, DataTypes.SMALLINT),
+          Column.physical(IS_AUTO_INCREMENT, DataTypes.STRING))
+        .data(columns)
+        .build
+    } catch onError()
+  }
+
+  private def toColumnResult(
+      catalogName: String,
+      schemaName: String,
+      tableName: String,
+      column: Column,
+      pos: Int): Row = {
+    val logicalType = column.getDataType.getLogicalType
+    // format: off
+    Row.of(
+      catalogName,                                                      // TABLE_CAT
+      schemaName,                                                       // TABLE_SCHEM
+      tableName,                                                        // TABLE_NAME
+      column.getName,                                                   // COLUMN_NAME
+      Integer.valueOf(toJavaSQLType(logicalType)),                      // DATA_TYPE
+      logicalType.toString.replace(" NOT NULL", ""),                    // TYPE_NAME
+      getColumnSize(logicalType),                                       // COLUMN_SIZE
+      null,                                                             // BUFFER_LENGTH
+      getDecimalDigits(logicalType),                                    // DECIMAL_DIGITS
+      getNumPrecRadix(logicalType),                                     // NUM_PREC_RADIX
+      Integer.valueOf(if (logicalType.isNullable) 1 else 0),            // NULLABLE
+      column.getComment.orElse(null),                                   // REMARKS
+      null,                                                             // COLUMN_DEF
+      null,                                                             // SQL_DATA_TYPE
+      null,                                                             // SQL_DATETIME_SUB
+      null,                                                             // CHAR_OCTET_LENGTH
+      Integer.valueOf(pos),                                             // ORDINAL_POSITION
+      if (logicalType.isNullable) "YES" else "NO",                      // IS_NULLABLE
+      null,                                                             // SCOPE_CATALOG
+      null,                                                             // SCOPE_SCHEMA
+      null,                                                             // SCOPE_TABLE
+      null,                                                             // SOURCE_DATA_TYPE
+      "NO"                                                              // IS_AUTO_INCREMENT
+    )
+    // format: on
+  }
+
+  private def toJavaSQLType(flinkType: LogicalType): Int = flinkType.getClass match {
+    case c: Class[_] if c == classOf[NullType] => java.sql.Types.NULL
+    case c: Class[_] if c == classOf[BooleanType] => java.sql.Types.BOOLEAN
+    case c: Class[_] if c == classOf[TinyIntType] => java.sql.Types.TINYINT
+    case c: Class[_] if c == classOf[SmallIntType] => java.sql.Types.SMALLINT
+    case c: Class[_] if c == classOf[IntType] => java.sql.Types.INTEGER
+    case c: Class[_] if c == classOf[BigIntType] => java.sql.Types.BIGINT
+    case c: Class[_] if c == classOf[FloatType] => java.sql.Types.FLOAT
+    case c: Class[_] if c == classOf[DoubleType] => java.sql.Types.DOUBLE
+    case c: Class[_] if c == classOf[CharType] => java.sql.Types.CHAR
+    case c: Class[_] if c == classOf[VarCharType] => java.sql.Types.VARCHAR
+    case c: Class[_] if c == classOf[DecimalType] => java.sql.Types.DECIMAL
+    case c: Class[_] if c == classOf[DateType] => java.sql.Types.DATE
+    case c: Class[_] if c == classOf[TimestampType] => java.sql.Types.TIMESTAMP
+    case c: Class[_] if c == classOf[DayTimeIntervalType] => java.sql.Types.OTHER
+    case c: Class[_] if c == classOf[YearMonthIntervalType] => java.sql.Types.OTHER
+    case c: Class[_] if c == classOf[ZonedTimestampType] => java.sql.Types.TIMESTAMP
+    case c: Class[_] if c == classOf[TimeType] => java.sql.Types.TIME
+    case c: Class[_] if c == classOf[BinaryType] => java.sql.Types.BINARY
+    case c: Class[_] if c == classOf[VarBinaryType] => java.sql.Types.BINARY
+    case c: Class[_] if c == classOf[ArrayType] => java.sql.Types.ARRAY
+    case c: Class[_] if c == classOf[MapType] => java.sql.Types.JAVA_OBJECT
+    case c: Class[_] if c == classOf[MultisetType] => java.sql.Types.JAVA_OBJECT
+    case c: Class[_] if c == classOf[StructuredType] => java.sql.Types.STRUCT
+    case c: Class[_] if c == classOf[DistinctType] => java.sql.Types.OTHER
+    case c: Class[_] if c == classOf[RawType[_]] => java.sql.Types.OTHER
+    case c: Class[_] if c == classOf[RowType] => java.sql.Types.STRUCT
+    case c: Class[_] if c == classOf[SymbolType[_]] => java.sql.Types.OTHER
+    case _ => java.sql.Types.OTHER
+  }
+
+  private def getColumnSize(flinkType: LogicalType): Integer = flinkType.getClass match {
+    case c: Class[_] if c == classOf[TinyIntType] => 3
+    case c: Class[_] if c == classOf[SmallIntType] => 5
+    case c: Class[_] if c == classOf[IntType] => 10
+    case c: Class[_] if c == classOf[DateType] => 10
+    case c: Class[_] if c == classOf[BigIntType] => 19
+    case c: Class[_] if c == classOf[FloatType] => 7
+    case c: Class[_] if c == classOf[DoubleType] => 15
+    case c: Class[_] if c == classOf[DecimalType] => flinkType.asInstanceOf[DecimalType].getScale
+    case c: Class[_] if c == classOf[VarCharType] => Integer.MAX_VALUE
+    case c: Class[_] if c == classOf[BinaryType] => Integer.MAX_VALUE
+    case c: Class[_] if c == classOf[TimestampType] => 29
+    case _ => null
+  }
+
+  private def getDecimalDigits(flinkType: LogicalType): Integer = flinkType.getClass match {
+    case c: Class[_] if c == classOf[BooleanType] => 0
+    case c: Class[_] if c == classOf[TinyIntType] => 0
+    case c: Class[_] if c == classOf[SmallIntType] => 0
+    case c: Class[_] if c == classOf[IntType] => 0
+    case c: Class[_] if c == classOf[BigIntType] => 0
+    case c: Class[_] if c == classOf[FloatType] => 7
+    case c: Class[_] if c == classOf[DoubleType] => 15
+    case c: Class[_] if c == classOf[DecimalType] => flinkType.asInstanceOf[DecimalType].getScale
+    case c: Class[_] if c == classOf[TimestampType] => 9
+    case _ => null
+  }
+
+  private def getNumPrecRadix(flinkType: LogicalType): Integer = flinkType.getClass match {
+    case c: Class[_]
+        if c == classOf[TinyIntType] || c == classOf[SmallIntType]
+          || c == classOf[IntType] || c == classOf[BigIntType]
+          || c == classOf[FloatType] || c == classOf[DoubleType]
+          || c == classOf[DecimalType] => 10
+    case _ => null
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
index e0e362fd7..e48853d8f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
@@ -17,15 +17,13 @@
 
 package org.apache.kyuubi.engine.flink.operation
 
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success, Try}
-
 import org.apache.commons.lang3.StringUtils
 import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
+import org.apache.flink.table.catalog.Column
 import org.apache.flink.types.Row
 
 import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.engine.flink.schema.SchemaHelper
 import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
 import org.apache.kyuubi.session.Session
 
@@ -45,22 +43,17 @@ class GetTables(
         if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
         else catalogNameOrEmpty
 
-      val schemaNameRegex = toJavaRegex(schemaNamePattern).r
-      val tableNameRegex = toJavaRegex(tableNamePattern).r
+      val schemaNameRegex = toJavaRegex(schemaNamePattern)
+      val tableNameRegex = toJavaRegex(tableNamePattern)
 
       val tables = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
-        flinkCatalog.listDatabases().asScala
-          .filter { schemaName => schemaNameRegex.pattern.matcher(schemaName).matches() }
+        SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
           .flatMap { schemaName =>
-            flinkCatalog.listTables(schemaName).asScala
-              .filter { tableName => tableNameRegex.pattern.matcher(tableName).matches() }
-              .map { tableName =>
-                val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
-                Try(flinkCatalog.getTable(objPath)) match {
-                  case Success(flinkTable) => (tableName, Some(flinkTable))
-                  case Failure(_) => (tableName, None)
-                }
-              }
+            SchemaHelper.getFlinkTablesWithPattern(
+              flinkCatalog,
+              catalogName,
+              schemaName,
+              tableNameRegex)
               .filter {
                 case (_, None) => false
                 case (_, Some(flinkTable)) => tableTypes.contains(flinkTable.getTableKind.name)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/SchemaHelper.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/SchemaHelper.scala
new file mode 100644
index 000000000..062011462
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/SchemaHelper.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.schema
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.table.catalog.{Catalog, CatalogBaseTable, ObjectIdentifier}
+
+object SchemaHelper {
+
+  def getSchemasWithPattern(flinkCatalog: Catalog, schemaNamePattern: String): Array[String] = {
+    val p = schemaNamePattern.r.pattern
+    flinkCatalog.listDatabases().asScala.toArray
+      .filter { schemaName => p.matcher(schemaName).matches() }
+  }
+
+  def getTablesWithPattern(
+      flinkCatalog: Catalog,
+      schemaName: String,
+      tableNamePattern: String): Array[String] = {
+    val p = tableNamePattern.r.pattern
+    flinkCatalog.listTables(schemaName).asScala.toArray
+      .filter { tableName => p.matcher(tableName).matches() }
+  }
+
+  def getFlinkTablesWithPattern(
+      flinkCatalog: Catalog,
+      catalogName: String,
+      schemaName: String,
+      tableNamePattern: String): Array[(String, Option[CatalogBaseTable])] = {
+    getTablesWithPattern(flinkCatalog, schemaName, tableNamePattern).map { tableName =>
+      getFlinkTable(flinkCatalog, catalogName, schemaName, tableName)
+    }
+  }
+
+  def getFlinkTable(
+      flinkCatalog: Catalog,
+      catalogName: String,
+      schemaName: String,
+      tableName: String): (String, Option[CatalogBaseTable]) = {
+    val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
+    Try(flinkCatalog.getTable(objPath)) match {
+      case Success(flinkTable) => (tableName, Some(flinkTable))
+      case Failure(_) => (tableName, None)
+    }
+  }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 9a1b65f33..3e6ffc6ab 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -70,13 +70,97 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
   }
 
   test("get columns") {
-    withJdbcStatement() { statement =>
-      val exceptionMsg = intercept[Exception](statement.getConnection.getMetaData.getColumns(
-        null,
-        null,
-        null,
-        null)).getMessage
-      assert(exceptionMsg.contains(s"Unsupported Operation type GetColumns"))
+    val tableName = "flink_get_col_operation"
+
+    withJdbcStatement(tableName) { statement =>
+      statement.execute(
+        s"""
+           | create table $tableName (
+           |  c0 boolean,
+           |  c1 tinyint,
+           |  c2 smallint,
+           |  c3 integer,
+           |  c4 bigint,
+           |  c5 float,
+           |  c6 double,
+           |  c7 decimal(38,20),
+           |  c8 decimal(10,2),
+           |  c9 string,
+           |  c10 array<bigint>,
+           |  c11 array<string>,
+           |  c12 map<smallint, tinyint>,
+           |  c13 date,
+           |  c14 timestamp,
+           |  c15 binary
+           | )
+           | with (
+           |   'connector' = 'filesystem'
+           | )
+    """.stripMargin)
+
+      val metaData = statement.getConnection.getMetaData
+
+      Seq("%", null, ".*", "c.*") foreach { columnPattern =>
+        val rowSet = metaData.getColumns("", "", tableName, columnPattern)
+
+        import java.sql.Types._
+        val expectedJavaTypes = Seq(
+          BOOLEAN,
+          TINYINT,
+          SMALLINT,
+          INTEGER,
+          BIGINT,
+          FLOAT,
+          DOUBLE,
+          DECIMAL,
+          DECIMAL,
+          VARCHAR,
+          ARRAY,
+          ARRAY,
+          JAVA_OBJECT,
+          DATE,
+          TIMESTAMP,
+          BINARY)
+
+        val expectedSqlType = Seq(
+          "BOOLEAN",
+          "TINYINT",
+          "SMALLINT",
+          "INT",
+          "BIGINT",
+          "FLOAT",
+          "DOUBLE",
+          "DECIMAL(38, 20)",
+          "DECIMAL(10, 2)",
+          "STRING",
+          "ARRAY<BIGINT>",
+          "ARRAY<STRING>",
+          "MAP<SMALLINT, TINYINT>",
+          "DATE",
+          "TIMESTAMP(6)",
+          "BINARY(1)")
+
+        var pos = 0
+
+        while (rowSet.next()) {
+          assert(rowSet.getString(TABLE_CAT) === "default_catalog")
+          assert(rowSet.getString(TABLE_SCHEM) === "default_database")
+          assert(rowSet.getString(TABLE_NAME) === tableName)
+          assert(rowSet.getString(COLUMN_NAME) === s"c$pos")
+          assert(rowSet.getInt(DATA_TYPE) === expectedJavaTypes(pos))
+          assert(rowSet.getString(TYPE_NAME) === expectedSqlType(pos))
+          assert(rowSet.getInt(BUFFER_LENGTH) === 0)
+          assert(rowSet.getInt(NULLABLE) === 1)
+          assert(rowSet.getInt(ORDINAL_POSITION) === pos)
+          assert(rowSet.getString(IS_NULLABLE) === "YES")
+          assert(rowSet.getString(IS_AUTO_INCREMENT) === "NO")
+          pos += 1
+        }
+
+        assert(pos === expectedSqlType.length, "all columns should have been verified")
+      }
+      val rowSet = metaData.getColumns(null, "*", "not_exist", "not_exist")
+      assert(!rowSet.next())
     }
   }