You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kyuubi.apache.org by ch...@apache.org on 2022/09/28 06:13:43 UTC
[incubator-kyuubi] branch branch-1.6 updated: [KYUUBI #3519] Flink SQL Engine - GetColumns Operation
This is an automated email from the ASF dual-hosted git repository.
chengpan pushed a commit to branch branch-1.6
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/branch-1.6 by this push:
new b87cf97c1 [KYUUBI #3519] Flink SQL Engine - GetColumns Operation
b87cf97c1 is described below
commit b87cf97c1ad6c4d0e8196b89cb78ff4f1de02861
Author: sychen <sy...@ctrip.com>
AuthorDate: Wed Sep 28 14:13:19 2022 +0800
[KYUUBI #3519] Flink SQL Engine - GetColumns Operation
### _Why are the changes needed?_
#1322
#2129
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible
- [x] Add screenshots for manual tests if appropriate
- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request
Closes #3519 from cxzl25/flink_get_columns.
Closes #3519
ab81776f [sychen] add column size
efdf1b90 [sychen] indent
630e907a [sychen] refactor
48a79d5b [sychen] add ut
69763bd3 [sychen] GetColumns
8e5e6c51 [sychen] GetColumns
Authored-by: sychen <sy...@ctrip.com>
Signed-off-by: Cheng Pan <ch...@apache.org>
(cherry picked from commit 8419b7bafd0e39115f9b11b29e5b6f3c7f89451d)
Signed-off-by: Cheng Pan <ch...@apache.org>
---
.../flink/operation/FlinkSQLOperationManager.scala | 11 +-
.../kyuubi/engine/flink/operation/GetColumns.scala | 217 +++++++++++++++++++++
.../kyuubi/engine/flink/operation/GetTables.scala | 27 +--
.../kyuubi/engine/flink/schema/SchemaHelper.scala | 63 ++++++
.../flink/operation/FlinkOperationSuite.scala | 98 +++++++++-
5 files changed, 389 insertions(+), 27 deletions(-)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index 92f84b692..774bfb8be 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -146,9 +146,14 @@ class FlinkSQLOperationManager extends OperationManager("FlinkSQLOperationManage
schemaName: String,
tableName: String,
columnName: String): Operation = {
- throw new UnsupportedOperationException(
- "Unsupported Operation type GetColumns. You can execute " +
- "DESCRIBE statement instead to get column infos.")
+ val op = new GetColumns(
+ session = session,
+ catalogNameOrEmpty = catalogName,
+ schemaNamePattern = schemaName,
+ tableNamePattern = tableName,
+ columnNamePattern = columnName)
+
+ addOperation(op)
}
override def newGetFunctionsOperation(
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
new file mode 100644
index 000000000..63f604976
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetColumns.scala
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.operation
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.flink.table.api.{DataTypes, ResultKind}
+import org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl
+import org.apache.flink.table.catalog.Column
+import org.apache.flink.table.types.logical._
+import org.apache.flink.types.Row
+
+import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.engine.flink.schema.SchemaHelper
+import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
+import org.apache.kyuubi.session.Session
+
+class GetColumns(
+ session: Session,
+ catalogNameOrEmpty: String,
+ schemaNamePattern: String,
+ tableNamePattern: String,
+ columnNamePattern: String)
+ extends FlinkOperation(session) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ val tableEnv = sessionContext.getExecutionContext.getTableEnvironment
+ val resolver = tableEnv match {
+ case impl: StreamTableEnvironmentImpl =>
+ impl.getCatalogManager.getSchemaResolver
+ case _ =>
+ throw new UnsupportedOperationException(
+ "Unsupported Operation type GetColumns. You can execute " +
+ "DESCRIBE statement instead to get column infos.")
+ }
+
+ val catalogName =
+ if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
+ else catalogNameOrEmpty
+
+ val schemaNameRegex = toJavaRegex(schemaNamePattern)
+ val tableNameRegex = toJavaRegex(tableNamePattern)
+ val columnNameRegex = toJavaRegex(columnNamePattern).r
+
+ val columns = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
+ SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
+ .flatMap { schemaName =>
+ SchemaHelper.getFlinkTablesWithPattern(
+ flinkCatalog,
+ catalogName,
+ schemaName,
+ tableNameRegex)
+ .filter { _._2.isDefined }
+ .flatMap { case (tableName, flinkTable) =>
+ val resolvedSchema = flinkTable.get.getUnresolvedSchema.resolve(resolver)
+ resolvedSchema.getColumns.asScala.toArray.zipWithIndex
+ .filter { case (column, _) =>
+ columnNameRegex.pattern.matcher(column.getName).matches()
+ }
+ .map { case (column, pos) =>
+ toColumnResult(catalogName, schemaName, tableName, column, pos)
+ }
+ }
+ }
+ }
+
+ resultSet = ResultSet.builder.resultKind(ResultKind.SUCCESS_WITH_CONTENT)
+ .columns(
+ Column.physical(TABLE_CAT, DataTypes.STRING),
+ Column.physical(TABLE_SCHEM, DataTypes.STRING),
+ Column.physical(TABLE_NAME, DataTypes.STRING),
+ Column.physical(COLUMN_NAME, DataTypes.STRING),
+ Column.physical(DATA_TYPE, DataTypes.INT),
+ Column.physical(TYPE_NAME, DataTypes.STRING),
+ Column.physical(COLUMN_SIZE, DataTypes.INT),
+ Column.physical(BUFFER_LENGTH, DataTypes.TINYINT),
+ Column.physical(DECIMAL_DIGITS, DataTypes.INT),
+ Column.physical(NUM_PREC_RADIX, DataTypes.INT),
+ Column.physical(NULLABLE, DataTypes.INT),
+ Column.physical(REMARKS, DataTypes.STRING),
+ Column.physical(COLUMN_DEF, DataTypes.STRING),
+ Column.physical(SQL_DATA_TYPE, DataTypes.INT),
+ Column.physical(SQL_DATETIME_SUB, DataTypes.INT),
+ Column.physical(CHAR_OCTET_LENGTH, DataTypes.INT),
+ Column.physical(ORDINAL_POSITION, DataTypes.INT),
+ Column.physical(IS_NULLABLE, DataTypes.STRING),
+ Column.physical(SCOPE_CATALOG, DataTypes.STRING),
+ Column.physical(SCOPE_SCHEMA, DataTypes.STRING),
+ Column.physical(SCOPE_TABLE, DataTypes.STRING),
+ Column.physical(SOURCE_DATA_TYPE, DataTypes.SMALLINT),
+ Column.physical(IS_AUTO_INCREMENT, DataTypes.STRING))
+ .data(columns)
+ .build
+ } catch onError()
+ }
+
+ private def toColumnResult(
+ catalogName: String,
+ schemaName: String,
+ tableName: String,
+ column: Column,
+ pos: Int): Row = {
+ val logicalType = column.getDataType.getLogicalType
+ // format: off
+ Row.of(
+ catalogName, // TABLE_CAT
+ schemaName, // TABLE_SCHEM
+ tableName, // TABLE_NAME
+ column.getName, // COLUMN_NAME
+ Integer.valueOf(toJavaSQLType(logicalType)), // DATA_TYPE
+ logicalType.toString.replace(" NOT NULL", ""), // TYPE_NAME
+ getColumnSize(logicalType), // COLUMN_SIZE
+ null, // BUFFER_LENGTH
+ getDecimalDigits(logicalType), // DECIMAL_DIGITS
+ getNumPrecRadix(logicalType), // NUM_PREC_RADIX
+ Integer.valueOf(if (logicalType.isNullable) 1 else 0), // NULLABLE
+ column.getComment.orElse(null), // REMARKS
+ null, // COLUMN_DEF
+ null, // SQL_DATA_TYPE
+ null, // SQL_DATETIME_SUB
+ null, // CHAR_OCTET_LENGTH
+ Integer.valueOf(pos), // ORDINAL_POSITION
+ if (logicalType.isNullable) "YES" else "NO", // IS_NULLABLE
+ null, // SCOPE_CATALOG
+ null, // SCOPE_SCHEMA
+ null, // SCOPE_TABLE
+ null, // SOURCE_DATA_TYPE
+ "NO" // IS_AUTO_INCREMENT
+ )
+ // format: on
+ }
+
+ private def toJavaSQLType(flinkType: LogicalType): Int = flinkType.getClass match {
+ case c: Class[_] if c == classOf[NullType] => java.sql.Types.NULL
+ case c: Class[_] if c == classOf[BooleanType] => java.sql.Types.BOOLEAN
+ case c: Class[_] if c == classOf[TinyIntType] => java.sql.Types.TINYINT
+ case c: Class[_] if c == classOf[SmallIntType] => java.sql.Types.SMALLINT
+ case c: Class[_] if c == classOf[IntType] => java.sql.Types.INTEGER
+ case c: Class[_] if c == classOf[BigIntType] => java.sql.Types.BIGINT
+ case c: Class[_] if c == classOf[FloatType] => java.sql.Types.FLOAT
+ case c: Class[_] if c == classOf[DoubleType] => java.sql.Types.DOUBLE
+ case c: Class[_] if c == classOf[CharType] => java.sql.Types.CHAR
+ case c: Class[_] if c == classOf[VarCharType] => java.sql.Types.VARCHAR
+ case c: Class[_] if c == classOf[DecimalType] => java.sql.Types.DECIMAL
+ case c: Class[_] if c == classOf[DateType] => java.sql.Types.DATE
+ case c: Class[_] if c == classOf[TimestampType] => java.sql.Types.TIMESTAMP
+ case c: Class[_] if c == classOf[DayTimeIntervalType] => java.sql.Types.OTHER
+ case c: Class[_] if c == classOf[YearMonthIntervalType] => java.sql.Types.OTHER
+ case c: Class[_] if c == classOf[ZonedTimestampType] => java.sql.Types.TIMESTAMP
+ case c: Class[_] if c == classOf[TimeType] => java.sql.Types.TIME
+ case c: Class[_] if c == classOf[BinaryType] => java.sql.Types.BINARY
+ case c: Class[_] if c == classOf[VarBinaryType] => java.sql.Types.BINARY
+ case c: Class[_] if c == classOf[ArrayType] => java.sql.Types.ARRAY
+ case c: Class[_] if c == classOf[MapType] => java.sql.Types.JAVA_OBJECT
+ case c: Class[_] if c == classOf[MultisetType] => java.sql.Types.JAVA_OBJECT
+ case c: Class[_] if c == classOf[StructuredType] => java.sql.Types.STRUCT
+ case c: Class[_] if c == classOf[DistinctType] => java.sql.Types.OTHER
+ case c: Class[_] if c == classOf[RawType[_]] => java.sql.Types.OTHER
+ case c: Class[_] if c == classOf[RowType] => java.sql.Types.STRUCT
+ case c: Class[_] if c == classOf[SymbolType[_]] => java.sql.Types.OTHER
+ case _ => java.sql.Types.OTHER
+ }
+
+ private def getColumnSize(flinkType: LogicalType): Integer = flinkType.getClass match {
+ case c: Class[_] if c == classOf[TinyIntType] => 3
+ case c: Class[_] if c == classOf[SmallIntType] => 5
+ case c: Class[_] if c == classOf[IntType] => 10
+ case c: Class[_] if c == classOf[DateType] => 10
+ case c: Class[_] if c == classOf[BigIntType] => 19
+ case c: Class[_] if c == classOf[FloatType] => 7
+ case c: Class[_] if c == classOf[DoubleType] => 15
+ case c: Class[_] if c == classOf[DecimalType] => flinkType.asInstanceOf[DecimalType].getScale
+ case c: Class[_] if c == classOf[VarCharType] => Integer.MAX_VALUE
+ case c: Class[_] if c == classOf[BinaryType] => Integer.MAX_VALUE
+ case c: Class[_] if c == classOf[TimestampType] => 29
+ case _ => null
+ }
+
+ private def getDecimalDigits(flinkType: LogicalType): Integer = flinkType.getClass match {
+ case c: Class[_] if c == classOf[BooleanType] => 0
+ case c: Class[_] if c == classOf[TinyIntType] => 0
+ case c: Class[_] if c == classOf[SmallIntType] => 0
+ case c: Class[_] if c == classOf[IntType] => 0
+ case c: Class[_] if c == classOf[BigIntType] => 0
+ case c: Class[_] if c == classOf[FloatType] => 7
+ case c: Class[_] if c == classOf[DoubleType] => 15
+ case c: Class[_] if c == classOf[DecimalType] => flinkType.asInstanceOf[DecimalType].getScale
+ case c: Class[_] if c == classOf[TimestampType] => 9
+ case _ => null
+ }
+
+ private def getNumPrecRadix(flinkType: LogicalType): Integer = flinkType.getClass match {
+ case c: Class[_]
+ if c == classOf[TinyIntType] || c == classOf[SmallIntType]
+ || c == classOf[IntType] || c == classOf[BigIntType]
+ || c == classOf[FloatType] || c == classOf[DoubleType]
+ || c == classOf[DecimalType] => 10
+ case _ => null
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
index e0e362fd7..e48853d8f 100644
--- a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/GetTables.scala
@@ -17,15 +17,13 @@
package org.apache.kyuubi.engine.flink.operation
-import scala.collection.JavaConverters._
-import scala.util.{Failure, Success, Try}
-
import org.apache.commons.lang3.StringUtils
import org.apache.flink.table.api.{DataTypes, ResultKind}
-import org.apache.flink.table.catalog.{Column, ObjectIdentifier}
+import org.apache.flink.table.catalog.Column
import org.apache.flink.types.Row
import org.apache.kyuubi.engine.flink.result.ResultSet
+import org.apache.kyuubi.engine.flink.schema.SchemaHelper
import org.apache.kyuubi.operation.meta.ResultSetSchemaConstant._
import org.apache.kyuubi.session.Session
@@ -45,22 +43,17 @@ class GetTables(
if (StringUtils.isEmpty(catalogNameOrEmpty)) tableEnv.getCurrentCatalog
else catalogNameOrEmpty
- val schemaNameRegex = toJavaRegex(schemaNamePattern).r
- val tableNameRegex = toJavaRegex(tableNamePattern).r
+ val schemaNameRegex = toJavaRegex(schemaNamePattern)
+ val tableNameRegex = toJavaRegex(tableNamePattern)
val tables = tableEnv.getCatalog(catalogName).asScala.toArray.flatMap { flinkCatalog =>
- flinkCatalog.listDatabases().asScala
- .filter { schemaName => schemaNameRegex.pattern.matcher(schemaName).matches() }
+ SchemaHelper.getSchemasWithPattern(flinkCatalog, schemaNameRegex)
.flatMap { schemaName =>
- flinkCatalog.listTables(schemaName).asScala
- .filter { tableName => tableNameRegex.pattern.matcher(tableName).matches() }
- .map { tableName =>
- val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
- Try(flinkCatalog.getTable(objPath)) match {
- case Success(flinkTable) => (tableName, Some(flinkTable))
- case Failure(_) => (tableName, None)
- }
- }
+ SchemaHelper.getFlinkTablesWithPattern(
+ flinkCatalog,
+ catalogName,
+ schemaName,
+ tableNameRegex)
.filter {
case (_, None) => false
case (_, Some(flinkTable)) => tableTypes.contains(flinkTable.getTableKind.name)
diff --git a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/SchemaHelper.scala b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/SchemaHelper.scala
new file mode 100644
index 000000000..062011462
--- /dev/null
+++ b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/schema/SchemaHelper.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.flink.schema
+
+import scala.collection.JavaConverters._
+import scala.util.{Failure, Success, Try}
+
+import org.apache.flink.table.catalog.{Catalog, CatalogBaseTable, ObjectIdentifier}
+
+object SchemaHelper {
+
+ def getSchemasWithPattern(flinkCatalog: Catalog, schemaNamePattern: String): Array[String] = {
+ val p = schemaNamePattern.r.pattern
+ flinkCatalog.listDatabases().asScala.toArray
+ .filter { schemaName => p.matcher(schemaName).matches() }
+ }
+
+ def getTablesWithPattern(
+ flinkCatalog: Catalog,
+ schemaName: String,
+ tableNamePattern: String): Array[String] = {
+ val p = tableNamePattern.r.pattern
+ flinkCatalog.listTables(schemaName).asScala.toArray
+ .filter { tableName => p.matcher(tableName).matches() }
+ }
+
+ def getFlinkTablesWithPattern(
+ flinkCatalog: Catalog,
+ catalogName: String,
+ schemaName: String,
+ tableNamePattern: String): Array[(String, Option[CatalogBaseTable])] = {
+ getTablesWithPattern(flinkCatalog, schemaName, tableNamePattern).map { tableName =>
+ getFlinkTable(flinkCatalog, catalogName, schemaName, tableName)
+ }
+ }
+
+ def getFlinkTable(
+ flinkCatalog: Catalog,
+ catalogName: String,
+ schemaName: String,
+ tableName: String): (String, Option[CatalogBaseTable]) = {
+ val objPath = ObjectIdentifier.of(catalogName, schemaName, tableName).toObjectPath
+ Try(flinkCatalog.getTable(objPath)) match {
+ case Success(flinkTable) => (tableName, Some(flinkTable))
+ case Failure(_) => (tableName, None)
+ }
+ }
+}
diff --git a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
index 9a1b65f33..3e6ffc6ab 100644
--- a/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
+++ b/externals/kyuubi-flink-sql-engine/src/test/scala/org/apache/kyuubi/engine/flink/operation/FlinkOperationSuite.scala
@@ -70,13 +70,97 @@ class FlinkOperationSuite extends WithFlinkSQLEngine with HiveJDBCTestHelper {
}
test("get columns") {
- withJdbcStatement() { statement =>
- val exceptionMsg = intercept[Exception](statement.getConnection.getMetaData.getColumns(
- null,
- null,
- null,
- null)).getMessage
- assert(exceptionMsg.contains(s"Unsupported Operation type GetColumns"))
+ val tableName = "flink_get_col_operation"
+
+ withJdbcStatement(tableName) { statement =>
+ statement.execute(
+ s"""
+ | create table $tableName (
+ | c0 boolean,
+ | c1 tinyint,
+ | c2 smallint,
+ | c3 integer,
+ | c4 bigint,
+ | c5 float,
+ | c6 double,
+ | c7 decimal(38,20),
+ | c8 decimal(10,2),
+ | c9 string,
+ | c10 array<bigint>,
+ | c11 array<string>,
+ | c12 map<smallint, tinyint>,
+ | c13 date,
+ | c14 timestamp,
+ | c15 binary
+ | )
+ | with (
+ | 'connector' = 'filesystem'
+ | )
+ """.stripMargin)
+
+ val metaData = statement.getConnection.getMetaData
+
+ Seq("%", null, ".*", "c.*") foreach { columnPattern =>
+ val rowSet = metaData.getColumns("", "", tableName, columnPattern)
+
+ import java.sql.Types._
+ val expectedJavaTypes = Seq(
+ BOOLEAN,
+ TINYINT,
+ SMALLINT,
+ INTEGER,
+ BIGINT,
+ FLOAT,
+ DOUBLE,
+ DECIMAL,
+ DECIMAL,
+ VARCHAR,
+ ARRAY,
+ ARRAY,
+ JAVA_OBJECT,
+ DATE,
+ TIMESTAMP,
+ BINARY)
+
+ val expectedSqlType = Seq(
+ "BOOLEAN",
+ "TINYINT",
+ "SMALLINT",
+ "INT",
+ "BIGINT",
+ "FLOAT",
+ "DOUBLE",
+ "DECIMAL(38, 20)",
+ "DECIMAL(10, 2)",
+ "STRING",
+ "ARRAY<BIGINT>",
+ "ARRAY<STRING>",
+ "MAP<SMALLINT, TINYINT>",
+ "DATE",
+ "TIMESTAMP(6)",
+ "BINARY(1)")
+
+ var pos = 0
+
+ while (rowSet.next()) {
+ assert(rowSet.getString(TABLE_CAT) === "default_catalog")
+ assert(rowSet.getString(TABLE_SCHEM) === "default_database")
+ assert(rowSet.getString(TABLE_NAME) === tableName)
+ assert(rowSet.getString(COLUMN_NAME) === s"c$pos")
+ assert(rowSet.getInt(DATA_TYPE) === expectedJavaTypes(pos))
+ assert(rowSet.getString(TYPE_NAME) === expectedSqlType(pos))
+ assert(rowSet.getInt(BUFFER_LENGTH) === 0)
+ assert(rowSet.getInt(NULLABLE) === 1)
+ assert(rowSet.getInt(ORDINAL_POSITION) === pos)
+ assert(rowSet.getString(IS_NULLABLE) === "YES")
+ assert(rowSet.getString(IS_AUTO_INCREMENT) === "NO")
+ pos += 1
+ }
+
+ assert(pos === expectedSqlType.length, "all columns should have been verified")
+ }
+ val rowSet = metaData.getColumns(null, "*", "not_exist", "not_exist")
+ assert(!rowSet.next())
}
}