You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/01/07 13:11:27 UTC

[GitHub] [flink] twalthr commented on a change in pull request #16962: [FLINK-15352][connector-jdbc] Develop MySQLCatalog to connect Flink with MySQL tables and ecosystem.

twalthr commented on a change in pull request #16962:
URL: https://github.com/apache/flink/pull/16962#discussion_r780240029



##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -188,8 +205,72 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+
+        Preconditions.checkState(
+                org.apache.commons.lang3.StringUtils.isNotBlank(databaseName),

Review comment:
       use Flink's `StringUtils`

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -387,4 +468,57 @@ public void alterPartitionColumnStatistics(
             throws PartitionNotExistException, CatalogException {
         throw new UnsupportedOperationException();
     }
+
+    protected List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            Predicate<String> filterFunc,
+            Object... params) {
+
+        List<String> columnValues = Lists.newArrayList();
+
+        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.test(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format(
+                            "The following SQL query could not be executed (%s): %s", connUrl, sql),
+                    e);
+        }
+    }
+
+    protected DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+
+        throw new NotImplementedException("This method has no available default implements.");

Review comment:
       declare the method `abstract`? in any case, don't use `NotImplementedException` because it is coming from a library. we can simply use Java's `UnsupportedOperationException`.

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogITCase.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** E2E test for {@link MySQLCatalog}. */
+public class MySQLCatalogITCase extends MySQLCatalogTestBase {
+
+    private static final List<Row> ALL_TYPES_ROWS =
+            Lists.newArrayList(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            1L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            null),
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            2L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1,set_ele12",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),

Review comment:
       Use `LocalDataTime` directly?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
##########
@@ -188,8 +205,72 @@ public void alterDatabase(String name, CatalogDatabase newDatabase, boolean igno
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+
+        Preconditions.checkState(
+                org.apache.commons.lang3.StringUtils.isNotBlank(databaseName),
+                "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
     // ------ tables and views ------
 
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+
+        String dbUrl = baseUrl + tablePath.getDatabaseName();
+
+        try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {

Review comment:
       Is the connection pooled somewhere? I'm just wondering if we need to setup a new connection for every table lookup? This could be quite expensive. Actually, the `open()` method was meant for creating connections once during creation.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
+import org.apache.flink.connector.jdbc.dialect.mysql.MySQLTypeMapper;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final JdbcDialectTypeMapper dialectTypeMapper;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+
+        String driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "Driver version must not be null.");
+        String databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "Database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+        this.dialectTypeMapper = new MySQLTypeMapper(databaseVersion, driverVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+

Review comment:
       nit: remove empty first line here and in other methods.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLTypeMapper.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/** MySQLTypeMapper util class. */
+@Internal
+public class MySQLTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLTypeMapper.class);
+
+    // ============================data types=====================
+
+    private static final String MYSQL_UNKNOWN = "UNKNOWN";
+    private static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String MYSQL_TINYINT = "TINYINT";
+    private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String MYSQL_SMALLINT = "SMALLINT";
+    private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String MYSQL_INT = "INT";
+    private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String MYSQL_INTEGER = "INTEGER";
+    private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String MYSQL_BIGINT = "BIGINT";
+    private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String MYSQL_DECIMAL = "DECIMAL";
+    private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String MYSQL_FLOAT = "FLOAT";
+    private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String MYSQL_DOUBLE = "DOUBLE";
+    private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String MYSQL_CHAR = "CHAR";
+    private static final String MYSQL_VARCHAR = "VARCHAR";
+    private static final String MYSQL_TINYTEXT = "TINYTEXT";
+    private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String MYSQL_TEXT = "TEXT";
+    private static final String MYSQL_LONGTEXT = "LONGTEXT";
+    private static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String MYSQL_DATE = "DATE";
+    private static final String MYSQL_DATETIME = "DATETIME";
+    private static final String MYSQL_TIME = "TIME";
+    private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    private static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String MYSQL_TINYBLOB = "TINYBLOB";
+    private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String MYSQL_BLOB = "BLOB";
+    private static final String MYSQL_LONGBLOB = "LONGBLOB";
+    private static final String MYSQL_BINARY = "BINARY";
+    private static final String MYSQL_VARBINARY = "VARBINARY";
+    private static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    private static final int RAW_TIME_LENGTH = 10;
+    private static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    public MySQLTypeMapper(String databaseVersion, String driverVersion) {
+        this.databaseVersion = databaseVersion;
+        this.driverVersion = driverVersion;
+    }
+
+    @Override
+    public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return DataTypes.BOOLEAN();
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return DataTypes.BYTES();
+            case MYSQL_TINYINT:
+                return DataTypes.TINYINT();
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+                return DataTypes.SMALLINT();
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+                return DataTypes.INT();
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return DataTypes.BIGINT();
+            case MYSQL_BIGINT_UNSIGNED:
+                return DataTypes.DECIMAL(20, 0);
+            case MYSQL_DECIMAL:
+                return DataTypes.DECIMAL(precision, scale);
+            case MYSQL_DECIMAL_UNSIGNED:
+                checkMaxPrecision(tablePath, columnName, precision);
+                return DataTypes.DECIMAL(precision + 1, scale);
+            case MYSQL_FLOAT:
+                return DataTypes.FLOAT();
+            case MYSQL_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
+                return DataTypes.FLOAT();
+            case MYSQL_DOUBLE:
+                return DataTypes.DOUBLE();
+            case MYSQL_DOUBLE_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
+                return DataTypes.DOUBLE();
+            case MYSQL_CHAR:

Review comment:
       Flink supports CHAR and VARCHAR. We should do this more fine-grained otherwise we get exceptions in sink when we try to store a `VARCHAR(MAX)` into a `VARCHAR(200)` for example.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLTypeMapper.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/** MySQLTypeMapper util class. */
+@Internal
+public class MySQLTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLTypeMapper.class);
+
+    // ============================data types=====================
+
+    private static final String MYSQL_UNKNOWN = "UNKNOWN";
+    private static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String MYSQL_TINYINT = "TINYINT";
+    private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String MYSQL_SMALLINT = "SMALLINT";
+    private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String MYSQL_INT = "INT";
+    private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String MYSQL_INTEGER = "INTEGER";
+    private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String MYSQL_BIGINT = "BIGINT";
+    private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String MYSQL_DECIMAL = "DECIMAL";
+    private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String MYSQL_FLOAT = "FLOAT";
+    private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String MYSQL_DOUBLE = "DOUBLE";
+    private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String MYSQL_CHAR = "CHAR";
+    private static final String MYSQL_VARCHAR = "VARCHAR";
+    private static final String MYSQL_TINYTEXT = "TINYTEXT";
+    private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String MYSQL_TEXT = "TEXT";
+    private static final String MYSQL_LONGTEXT = "LONGTEXT";
+    private static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String MYSQL_DATE = "DATE";
+    private static final String MYSQL_DATETIME = "DATETIME";
+    private static final String MYSQL_TIME = "TIME";
+    private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    private static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String MYSQL_TINYBLOB = "TINYBLOB";
+    private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String MYSQL_BLOB = "BLOB";
+    private static final String MYSQL_LONGBLOB = "LONGBLOB";
+    private static final String MYSQL_BINARY = "BINARY";
+    private static final String MYSQL_VARBINARY = "VARBINARY";
+    private static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    private static final int RAW_TIME_LENGTH = 10;
+    private static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    public MySQLTypeMapper(String databaseVersion, String driverVersion) {
+        this.databaseVersion = databaseVersion;
+        this.driverVersion = driverVersion;
+    }
+
+    @Override
+    public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return DataTypes.BOOLEAN();
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return DataTypes.BYTES();
+            case MYSQL_TINYINT:
+                return DataTypes.TINYINT();
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+                return DataTypes.SMALLINT();
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+                return DataTypes.INT();
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return DataTypes.BIGINT();
+            case MYSQL_BIGINT_UNSIGNED:
+                return DataTypes.DECIMAL(20, 0);
+            case MYSQL_DECIMAL:
+                return DataTypes.DECIMAL(precision, scale);
+            case MYSQL_DECIMAL_UNSIGNED:
+                checkMaxPrecision(tablePath, columnName, precision);
+                return DataTypes.DECIMAL(precision + 1, scale);
+            case MYSQL_FLOAT:
+                return DataTypes.FLOAT();
+            case MYSQL_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
+                return DataTypes.FLOAT();
+            case MYSQL_DOUBLE:
+                return DataTypes.DOUBLE();
+            case MYSQL_DOUBLE_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
+                return DataTypes.DOUBLE();
+            case MYSQL_CHAR:
+            case MYSQL_VARCHAR:
+            case MYSQL_TINYTEXT:
+            case MYSQL_MEDIUMTEXT:
+            case MYSQL_TEXT:
+            case MYSQL_JSON:
+                return DataTypes.STRING();
+            case MYSQL_LONGTEXT:
+                LOG.warn(
+                        "Type '{}' has a maximum precision of 536870911 in MySQL. "
+                                + "Due to limitations in the Flink type system, "
+                                + "the precision will be set to 2147483647.",
+                        MYSQL_LONGTEXT);
+                return DataTypes.STRING();
+            case MYSQL_DATE:
+                return DataTypes.DATE();
+            case MYSQL_TIME:
+                return isExplicitPrecision(precision, RAW_TIME_LENGTH)
+                        ? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1)
+                        : DataTypes.TIME(0);
+            case MYSQL_DATETIME:
+            case MYSQL_TIMESTAMP:
+                boolean explicitPrecision = isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH);
+                if (explicitPrecision) {
+                    int p = precision - RAW_TIMESTAMP_LENGTH - 1;
+                    if (p <= 6 && p >= 0) {
+                        return DataTypes.TIMESTAMP(p);
+                    }
+                    return p > 6 ? DataTypes.TIMESTAMP(6) : DataTypes.TIMESTAMP(0);
+                }
+                return DataTypes.TIMESTAMP(0);
+
+            case MYSQL_YEAR:
+            case MYSQL_GEOMETRY:
+            case MYSQL_UNKNOWN:

Review comment:
       please verify this list again, e.g. the `ENUM`, `SET` types are missing here

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLTypeMapper.java
##########
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.dialect.mysql;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.jdbc.dialect.JdbcDialectTypeMapper;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+
+/** MySQLTypeMapper util class. */
+@Internal
+public class MySQLTypeMapper implements JdbcDialectTypeMapper {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLTypeMapper.class);
+
+    // ============================data types=====================
+
+    private static final String MYSQL_UNKNOWN = "UNKNOWN";
+    private static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    private static final String MYSQL_TINYINT = "TINYINT";
+    private static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    private static final String MYSQL_SMALLINT = "SMALLINT";
+    private static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    private static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    private static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    private static final String MYSQL_INT = "INT";
+    private static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    private static final String MYSQL_INTEGER = "INTEGER";
+    private static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    private static final String MYSQL_BIGINT = "BIGINT";
+    private static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    private static final String MYSQL_DECIMAL = "DECIMAL";
+    private static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    private static final String MYSQL_FLOAT = "FLOAT";
+    private static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    private static final String MYSQL_DOUBLE = "DOUBLE";
+    private static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    private static final String MYSQL_CHAR = "CHAR";
+    private static final String MYSQL_VARCHAR = "VARCHAR";
+    private static final String MYSQL_TINYTEXT = "TINYTEXT";
+    private static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    private static final String MYSQL_TEXT = "TEXT";
+    private static final String MYSQL_LONGTEXT = "LONGTEXT";
+    private static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    private static final String MYSQL_DATE = "DATE";
+    private static final String MYSQL_DATETIME = "DATETIME";
+    private static final String MYSQL_TIME = "TIME";
+    private static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    private static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    private static final String MYSQL_TINYBLOB = "TINYBLOB";
+    private static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    private static final String MYSQL_BLOB = "BLOB";
+    private static final String MYSQL_LONGBLOB = "LONGBLOB";
+    private static final String MYSQL_BINARY = "BINARY";
+    private static final String MYSQL_VARBINARY = "VARBINARY";
+    private static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    private static final int RAW_TIME_LENGTH = 10;
+    private static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    public MySQLTypeMapper(String databaseVersion, String driverVersion) {
+        this.databaseVersion = databaseVersion;
+        this.driverVersion = driverVersion;
+    }
+
+    @Override
+    public DataType mapping(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return DataTypes.BOOLEAN();
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:

Review comment:
       Flink supports BINARY and VARBINARY, can we do this more fine-grained?

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogITCase.java
##########
@@ -0,0 +1,337 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** E2E test for {@link MySQLCatalog}. */
+public class MySQLCatalogITCase extends MySQLCatalogTestBase {
+
+    private static final List<Row> ALL_TYPES_ROWS =
+            Lists.newArrayList(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            1L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            null),
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            2L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            -1.0d,
+                            1.0d,
+                            "set_ele1,set_ele12",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
+                            null));
+
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        tEnv.getConfig()
+                .getConfiguration()
+                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+
+        // Use mysql catalog.
+        tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+        tEnv.useCatalog(TEST_CATALOG_NAME);
+    }
+
+    // ------ databases ------
+
+    @Test
+    public void testGetDb_DatabaseNotExistException() throws Exception {
+        String databaseNotExist = "nonexistent";
+        exception.expect(DatabaseNotExistException.class);
+        exception.expectMessage(

Review comment:
       Use our new assertj infrastructure. Checkout `org.apache.flink.table.factories.FactoryUtilTest#testCatalogFactoryHelper` for an example of testing exceptions.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalogUtils.java
##########
@@ -47,6 +48,8 @@ public static AbstractJdbcCatalog createCatalog(
 
         if (dialect instanceof PostgresDialect) {
             return new PostgresCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
+        } else if (dialect instanceof MySQLDialect) {

Review comment:
       nit: in Flink we usually use camel case `MySQLDialect` -> `MySqlDialect `




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org