You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/27 11:40:21 UTC

[GitHub] [flink] Airblader commented on a change in pull request #16962: [FLINK-15352][connector-jdbc] Develop MySQLCatalog to connect Flink with MySQL tables and ecosystem.

Airblader commented on a change in pull request #16962:
URL: https://github.com/apache/flink/pull/16962#discussion_r730700156



##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)

Review comment:
       This method is largely a duplicate of the implementation for Postgres. We should refactor this so they can reuse shared code.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            FilterFunction<String> filterFunc,
+            Object... params) {
+        List<String> columnValues = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.filter(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in processing query sql %s, connUrl %s", sql, connUrl),
+                    e);

Review comment:
       ```suggestion
               throw new CatalogException(String.format("The following SQL query could not be executed (%s):%n%n", connUrl, sql), e);
   ```

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            FilterFunction<String> filterFunc,
+            Object... params) {
+        List<String> columnValues = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.filter(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in processing query sql %s, connUrl %s", sql, connUrl),
+                    e);
+        }
+    }
+
+    private String getDatabaseVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            return conn.getMetaData().getDatabaseProductVersion();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting MySQL version by %s.", defaultUrl), e);
+        }
+    }
+
+    private String getDriverVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            String driverVersion = conn.getMetaData().getDriverVersion();
+            Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+            Matcher matcher = regexp.matcher(driverVersion);
+            return matcher.find() ? matcher.group(0) : null;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting mysql driver version by %s.", defaultUrl), e);
+        }
+    }
+
+    /** Converts MySQL type to Flink {@link DataType}. */
+    private DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return DataTypes.BOOLEAN();
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return DataTypes.BYTES();
+            case MYSQL_TINYINT:
+                return DataTypes.TINYINT();
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+                return DataTypes.SMALLINT();
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+                return DataTypes.INT();
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return DataTypes.BIGINT();
+            case MYSQL_BIGINT_UNSIGNED:
+                return DataTypes.DECIMAL(20, 0);
+            case MYSQL_DECIMAL:
+                return DataTypes.DECIMAL(precision, scale);
+            case MYSQL_DECIMAL_UNSIGNED:
+                checkMaxPrecision(tablePath, columnName, precision);
+                return DataTypes.DECIMAL(precision + 1, scale);
+            case MYSQL_FLOAT:
+                return DataTypes.FLOAT();
+            case MYSQL_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);

Review comment:
       Should we really support types if we cannot map them properly? If we have a shortcoming in the type system, we should be collecting that to improve it.

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -458,6 +463,38 @@ SELECT * FROM mydb.`custom_schema.test_table2`;
 SELECT * FROM `custom_schema.test_table2`;
 ```
 
+### JDBC Catalog for MySQL
+
+#### MySQL Metaspace Mapping
+
+The databases in a MySQL instance are at the same mapping level as the databases under the catalog registered with `MySQLCatalog`. A MySQL instance can have multiple databases, each database can have multiple tables.
+In Flink, when querying tables registered by MySQL catalog, users can use either `database.table_name` or just `table_name`. The default value is the default database specified when `MySQLCatalog` was created.
+
+Therefore, the metaspace mapping between Flink Catalog and MySQLCatalog is as following:
+
+| Flink Catalog Metaspace Structure    |   MySQL Metaspace Structure         |
+| :------------------------------------| :-----------------------------------|
+| catalog name (defined in Flink only) | N/A                                 |
+| database name                        | database name                       |
+| table name                           | table_name                          |
+
+The full path of MySQL table in Flink should be ``"`<catalog>`.`<db>`.`<table>`"``.
+
+Here are some examples to access MySQL tables:
+
+```sql
+-- scan table 'test_table', the default database is 'mydb'.
+SELECT * FROM mysql_catalog.mydb.test_table;
+SELECT * FROM mydb.test_table;
+SELECT * FROM test_table;
+
+-- scan table 'test_table' with the given database.
+SELECT * FROM mysql_catalog.given_database.test_table2;
+SELECT * FROM given_database.test_table2;
+```
+
+<span class="label label-danger">Attention</span> At present, when reading data from a table, the `YEAR` type is read as the `DATE` type by default, but writing data in `YEAR` type is not supported by `MySQLCatalog` yet. The same restrictions apply to the reading and writing of the `GEOMETRY` type. The `GEOMETRY` type fields will be read as the `BYTES` type by default in reading data from a table, but the writing of the `GEOMETRY` data type is not supported by `MySQLCatalog` when writing data into a table.

Review comment:
       Why can't we support writing these types?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            FilterFunction<String> filterFunc,
+            Object... params) {
+        List<String> columnValues = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.filter(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in processing query sql %s, connUrl %s", sql, connUrl),
+                    e);
+        }
+    }
+
+    private String getDatabaseVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            return conn.getMetaData().getDatabaseProductVersion();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting MySQL version by %s.", defaultUrl), e);
+        }
+    }
+
+    private String getDriverVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            String driverVersion = conn.getMetaData().getDriverVersion();
+            Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+            Matcher matcher = regexp.matcher(driverVersion);
+            return matcher.find() ? matcher.group(0) : null;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting mysql driver version by %s.", defaultUrl), e);
+        }
+    }
+
+    /** Converts MySQL type to Flink {@link DataType}. */
+    private DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return DataTypes.BOOLEAN();
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return DataTypes.BYTES();
+            case MYSQL_TINYINT:
+                return DataTypes.TINYINT();
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+                return DataTypes.SMALLINT();
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+                return DataTypes.INT();
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return DataTypes.BIGINT();
+            case MYSQL_BIGINT_UNSIGNED:
+                return DataTypes.DECIMAL(20, 0);
+            case MYSQL_DECIMAL:
+                return DataTypes.DECIMAL(precision, scale);
+            case MYSQL_DECIMAL_UNSIGNED:
+                checkMaxPrecision(tablePath, columnName, precision);
+                return DataTypes.DECIMAL(precision + 1, scale);
+            case MYSQL_FLOAT:
+                return DataTypes.FLOAT();
+            case MYSQL_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
+                return DataTypes.FLOAT();
+            case MYSQL_DOUBLE:
+                return DataTypes.DOUBLE();
+            case MYSQL_DOUBLE_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
+                return DataTypes.DOUBLE();
+            case MYSQL_CHAR:
+            case MYSQL_VARCHAR:
+            case MYSQL_TINYTEXT:
+            case MYSQL_MEDIUMTEXT:
+            case MYSQL_TEXT:
+            case MYSQL_JSON:
+                return DataTypes.STRING();
+            case MYSQL_LONGTEXT:
+                LOG.warn(
+                        "The max precision of type '{}' in mysql is 536870911, and the max "
+                                + "precision here has to be set as 2147483647 due to the "
+                                + "limitation of the flink sql types system.",
+                        MYSQL_LONGTEXT);

Review comment:
       ```suggestion
                   LOG.warn("Type '{}' has a maximum precision of 536870911 in MySQL. Due to limitations in the Flink type system, the precision will be set to 2147483647.", MYSQL_LONGTEXT);
   ```

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogITCase.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.junit.Assert.assertEquals;
+
+/** E2E test for {@link MySQLCatalog}. */
+public class MySQLCatalogITCase extends MySQLCatalogTestBase {
+
+    private static final List<Row> ALL_TYPES_ROWS =
+            Lists.newArrayList(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            1L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            null,
+                            null,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            null,
+                            null,
+                            null,
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            null,
+                            null,
+                            -1.0d,
+                            1.0d,
+                            "set_ele1",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Date.valueOf("1999-01-01").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            null),
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            2L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            null,
+                            null,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            null,
+                            null,
+                            null,
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            null,
+                            null,
+                            -1.0d,
+                            1.0d,
+                            "set_ele1,set_ele12",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Date.valueOf("1999-01-01").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
+                            null));
+
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        tEnv.getConfig()
+                .getConfiguration()
+                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+
+        // Use mysql catalog.
+        tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+        tEnv.useCatalog(TEST_CATALOG_NAME);
+    }
+
+    @Test
+    public void testSelectField() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select pid from %s", TEST_TABLE_ALL_TYPES))
+                                .execute()
+                                .collect());
+        assertEquals(
+                Lists.newArrayList(Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 2L)),
+                results);
+    }
+
+    @Test
+    public void testWithoutCatalogDB() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", TEST_TABLE_ALL_TYPES))
+                                .execute()
+                                .collect());
+
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testWithoutCatalog() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from `%s`.`%s`",
+                                                TEST_DB, TEST_TABLE_ALL_TYPES))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testFullPath() throws ParseException {

Review comment:
       What's throwing a `ParseException` here?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";

Review comment:
       None of the constants need to be public, do they? I think we should move these out of here, but I wrote a comment for that on `fromJDBCType` later on.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            FilterFunction<String> filterFunc,
+            Object... params) {
+        List<String> columnValues = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.filter(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in processing query sql %s, connUrl %s", sql, connUrl),
+                    e);
+        }
+    }
+
+    private String getDatabaseVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            return conn.getMetaData().getDatabaseProductVersion();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting MySQL version by %s.", defaultUrl), e);
+        }
+    }
+
+    private String getDriverVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            String driverVersion = conn.getMetaData().getDriverVersion();
+            Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+            Matcher matcher = regexp.matcher(driverVersion);
+            return matcher.find() ? matcher.group(0) : null;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting mysql driver version by %s.", defaultUrl), e);
+        }
+    }
+
+    /** Converts MySQL type to Flink {@link DataType}. */
+    private DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)

Review comment:
       This entire lengthy logic along with all of the constants should be refactored into separate classes. It might even be worth introducing an internal interface here since the conversion will have to be done for every implementation.

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogTest.java
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/** Test for {@link MySQLCatalog}. */
+public class MySQLCatalogTest extends MySQLCatalogTestBase {

Review comment:
       Isn't this also an IT Case since it derives from the base class which launches a container?

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/mysql/MySQLDialect.java
##########
@@ -31,7 +31,7 @@
 import java.util.stream.Collectors;
 
 /** JDBC dialect for MySQL. */
-class MySQLDialect extends AbstractDialect {
+public class MySQLDialect extends AbstractDialect {

Review comment:
       Please add `@Internal` here (and to the other dialect implementations while we're at it).

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            FilterFunction<String> filterFunc,
+            Object... params) {
+        List<String> columnValues = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.filter(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in processing query sql %s, connUrl %s", sql, connUrl),
+                    e);
+        }
+    }
+
+    private String getDatabaseVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            return conn.getMetaData().getDatabaseProductVersion();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting MySQL version by %s.", defaultUrl), e);
+        }
+    }
+
+    private String getDriverVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            String driverVersion = conn.getMetaData().getDriverVersion();
+            Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+            Matcher matcher = regexp.matcher(driverVersion);
+            return matcher.find() ? matcher.group(0) : null;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting mysql driver version by %s.", defaultUrl), e);
+        }
+    }
+
+    /** Converts MySQL type to Flink {@link DataType}. */
+    private DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return DataTypes.BOOLEAN();
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return DataTypes.BYTES();
+            case MYSQL_TINYINT:
+                return DataTypes.TINYINT();
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+                return DataTypes.SMALLINT();
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+                return DataTypes.INT();
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return DataTypes.BIGINT();
+            case MYSQL_BIGINT_UNSIGNED:
+                return DataTypes.DECIMAL(20, 0);
+            case MYSQL_DECIMAL:
+                return DataTypes.DECIMAL(precision, scale);
+            case MYSQL_DECIMAL_UNSIGNED:
+                checkMaxPrecision(tablePath, columnName, precision);
+                return DataTypes.DECIMAL(precision + 1, scale);
+            case MYSQL_FLOAT:
+                return DataTypes.FLOAT();
+            case MYSQL_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
+                return DataTypes.FLOAT();
+            case MYSQL_DOUBLE:
+                return DataTypes.DOUBLE();
+            case MYSQL_DOUBLE_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
+                return DataTypes.DOUBLE();
+            case MYSQL_CHAR:
+            case MYSQL_VARCHAR:
+            case MYSQL_TINYTEXT:
+            case MYSQL_MEDIUMTEXT:
+            case MYSQL_TEXT:
+            case MYSQL_JSON:
+                return DataTypes.STRING();
+            case MYSQL_LONGTEXT:
+                LOG.warn(
+                        "The max precision of type '{}' in mysql is 536870911, and the max "
+                                + "precision here has to be set as 2147483647 due to the "
+                                + "limitation of the flink sql types system.",
+                        MYSQL_LONGTEXT);
+                return DataTypes.STRING();
+            case MYSQL_YEAR:
+                LOG.warn(
+                        "The type {} in mysql catalog is supported in read-mode, "
+                                + "but not in write-mode.",
+                        MYSQL_YEAR);

Review comment:
       Logging this every time seems overkill to me. It is documented, I think we should remove this log here. Same for GEOMETRY below.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            FilterFunction<String> filterFunc,
+            Object... params) {
+        List<String> columnValues = Lists.newArrayList();
+        try (Connection conn = DriverManager.getConnection(connUrl, username, pwd);
+                PreparedStatement ps = conn.prepareStatement(sql)) {
+            if (Objects.nonNull(params) && params.length > 0) {
+                for (int i = 0; i < params.length; i++) {
+                    ps.setObject(i + 1, params[i]);
+                }
+            }
+            ResultSet rs = ps.executeQuery();
+            while (rs.next()) {
+                String columnValue = rs.getString(columnIndex);
+                if (Objects.isNull(filterFunc) || filterFunc.filter(columnValue)) {
+                    columnValues.add(columnValue);
+                }
+            }
+            return columnValues;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in processing query sql %s, connUrl %s", sql, connUrl),
+                    e);
+        }
+    }
+
+    private String getDatabaseVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            return conn.getMetaData().getDatabaseProductVersion();
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting MySQL version by %s.", defaultUrl), e);
+        }
+    }
+
+    private String getDriverVersion() {
+        try (Connection conn = DriverManager.getConnection(defaultUrl, username, pwd)) {
+            String driverVersion = conn.getMetaData().getDriverVersion();
+            Pattern regexp = Pattern.compile("\\d+?\\.\\d+?\\.\\d+");
+            Matcher matcher = regexp.matcher(driverVersion);
+            return matcher.find() ? matcher.group(0) : null;
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed in getting mysql driver version by %s.", defaultUrl), e);
+        }
+    }
+
+    /** Converts MySQL type to Flink {@link DataType}. */
+    private DataType fromJDBCType(ObjectPath tablePath, ResultSetMetaData metadata, int colIndex)
+            throws SQLException {
+        String mysqlType = metadata.getColumnTypeName(colIndex).toUpperCase();
+        String columnName = metadata.getColumnName(colIndex);
+        int precision = metadata.getPrecision(colIndex);
+        int scale = metadata.getScale(colIndex);
+
+        switch (mysqlType) {
+            case MYSQL_BIT:
+                return DataTypes.BOOLEAN();
+            case MYSQL_TINYBLOB:
+            case MYSQL_MEDIUMBLOB:
+            case MYSQL_BLOB:
+            case MYSQL_LONGBLOB:
+            case MYSQL_VARBINARY:
+            case MYSQL_BINARY:
+                return DataTypes.BYTES();
+            case MYSQL_TINYINT:
+                return DataTypes.TINYINT();
+            case MYSQL_TINYINT_UNSIGNED:
+            case MYSQL_SMALLINT:
+                return DataTypes.SMALLINT();
+            case MYSQL_SMALLINT_UNSIGNED:
+            case MYSQL_MEDIUMINT:
+            case MYSQL_MEDIUMINT_UNSIGNED:
+            case MYSQL_INT:
+            case MYSQL_INTEGER:
+                return DataTypes.INT();
+            case MYSQL_INT_UNSIGNED:
+            case MYSQL_INTEGER_UNSIGNED:
+            case MYSQL_BIGINT:
+                return DataTypes.BIGINT();
+            case MYSQL_BIGINT_UNSIGNED:
+                return DataTypes.DECIMAL(20, 0);
+            case MYSQL_DECIMAL:
+                return DataTypes.DECIMAL(precision, scale);
+            case MYSQL_DECIMAL_UNSIGNED:
+                checkMaxPrecision(tablePath, columnName, precision);
+                return DataTypes.DECIMAL(precision + 1, scale);
+            case MYSQL_FLOAT:
+                return DataTypes.FLOAT();
+            case MYSQL_FLOAT_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_FLOAT_UNSIGNED);
+                return DataTypes.FLOAT();
+            case MYSQL_DOUBLE:
+                return DataTypes.DOUBLE();
+            case MYSQL_DOUBLE_UNSIGNED:
+                LOG.warn("{} will probably cause value overflow.", MYSQL_DOUBLE_UNSIGNED);
+                return DataTypes.DOUBLE();
+            case MYSQL_CHAR:
+            case MYSQL_VARCHAR:
+            case MYSQL_TINYTEXT:
+            case MYSQL_MEDIUMTEXT:
+            case MYSQL_TEXT:
+            case MYSQL_JSON:
+                return DataTypes.STRING();
+            case MYSQL_LONGTEXT:
+                LOG.warn(
+                        "The max precision of type '{}' in mysql is 536870911, and the max "
+                                + "precision here has to be set as 2147483647 due to the "
+                                + "limitation of the flink sql types system.",
+                        MYSQL_LONGTEXT);
+                return DataTypes.STRING();
+            case MYSQL_YEAR:
+                LOG.warn(
+                        "The type {} in mysql catalog is supported in read-mode, "
+                                + "but not in write-mode.",
+                        MYSQL_YEAR);
+                return DataTypes.DATE();
+            case MYSQL_DATE:
+                return DataTypes.DATE();
+            case MYSQL_TIME:
+                return isExplicitPrecision(precision, RAW_TIME_LENGTH)
+                        ? DataTypes.TIME(precision - RAW_TIME_LENGTH - 1)
+                        : DataTypes.TIME(0);
+            case MYSQL_DATETIME:
+            case MYSQL_TIMESTAMP:
+                return isExplicitPrecision(precision, RAW_TIMESTAMP_LENGTH)
+                        ? DataTypes.TIMESTAMP(precision - RAW_TIMESTAMP_LENGTH - 1)
+                        : DataTypes.TIMESTAMP(0);
+            case MYSQL_GEOMETRY:
+                LOG.warn(
+                        "{} type in mysql catalog is supported in read-mode by the form of bytes,"
+                                + " but not in write-mode.",
+                        MYSQL_GEOMETRY);
+                return DataTypes.BYTES();
+            case MYSQL_UNKNOWN:
+                return fromJDBCClassType(tablePath, metadata, colIndex);
+            default:
+                throw new UnsupportedOperationException(
+                        String.format(
+                                "Doesn't support mysql type '%s' in mysql version %s, driver version %s yet.",

Review comment:
       Please make sure to spell MySQL correctly, here and everywhere else (will not comment on every occurrence). MySQL is a registered name, and is spelled MySQL, not mysql.

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogITCase.java
##########
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CollectionUtil;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.sql.Date;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.ParseException;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+
+import static org.apache.flink.table.api.config.ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM;
+import static org.junit.Assert.assertEquals;
+
+/** E2E test for {@link MySQLCatalog}. */
+public class MySQLCatalogITCase extends MySQLCatalogTestBase {
+
+    private static final List<Row> ALL_TYPES_ROWS =
+            Lists.newArrayList(
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            1L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            null,
+                            null,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            null,
+                            null,
+                            null,
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            null,
+                            null,
+                            -1.0d,
+                            1.0d,
+                            "set_ele1",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Date.valueOf("1999-01-01").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:54:16.463").toLocalDateTime(),
+                            null),
+                    Row.ofKind(
+                            RowKind.INSERT,
+                            2L,
+                            -1L,
+                            new BigDecimal(1),
+                            null,
+                            true,
+                            null,
+                            "hello",
+                            Date.valueOf("2021-08-04").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            new BigDecimal(-1),
+                            new BigDecimal(1),
+                            -1.0d,
+                            1.0d,
+                            "enum2",
+                            -9.1f,
+                            9.1f,
+                            null,
+                            null,
+                            -1,
+                            1L,
+                            -1,
+                            1L,
+                            "{\"k1\": \"v1\"}",
+                            null,
+                            null,
+                            "col_longtext",
+                            null,
+                            -1,
+                            1,
+                            "col_mediumtext",
+                            null,
+                            null,
+                            null,
+                            new BigDecimal(-99),
+                            new BigDecimal(99),
+                            null,
+                            null,
+                            -1.0d,
+                            1.0d,
+                            "set_ele1,set_ele12",
+                            Short.parseShort("-1"),
+                            1,
+                            "col_text",
+                            Time.valueOf("10:32:34").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:53:19").toLocalDateTime(),
+                            "col_tinytext",
+                            Byte.parseByte("-1"),
+                            Byte.parseByte("1"),
+                            null,
+                            "col_varchar",
+                            Date.valueOf("1999-01-01").toLocalDate(),
+                            Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
+                            Time.valueOf("09:33:43").toLocalTime(),
+                            Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
+                            null));
+
+    private TableEnvironment tEnv;
+
+    @Before
+    public void setup() {
+        this.tEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
+        tEnv.getConfig()
+                .getConfiguration()
+                .setInteger(TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM.key(), 1);
+
+        // Use mysql catalog.
+        tEnv.registerCatalog(TEST_CATALOG_NAME, catalog);
+        tEnv.useCatalog(TEST_CATALOG_NAME);
+    }
+
+    @Test
+    public void testSelectField() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select pid from %s", TEST_TABLE_ALL_TYPES))
+                                .execute()
+                                .collect());
+        assertEquals(
+                Lists.newArrayList(Row.ofKind(RowKind.INSERT, 1L), Row.ofKind(RowKind.INSERT, 2L)),
+                results);
+    }
+
+    @Test
+    public void testWithoutCatalogDB() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(String.format("select * from %s", TEST_TABLE_ALL_TYPES))
+                                .execute()
+                                .collect());
+
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testWithoutCatalog() {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from `%s`.`%s`",
+                                                TEST_DB, TEST_TABLE_ALL_TYPES))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testFullPath() throws ParseException {
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from %s.%s.`%s`",
+                                                TEST_CATALOG_NAME,
+                                                catalog.getDefaultDatabase(),
+                                                TEST_TABLE_ALL_TYPES))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testSelectToInsertWithoutYearType() throws Exception {
+
+        String sql =
+                String.format(
+                        "insert into `%s` select * from `%s`",
+                        TEST_SINK_TABLE_ALL_TYPES_WITHOUT_YEAR_TYPE, TEST_TABLE_ALL_TYPES);
+        tEnv.executeSql(sql).await();
+
+        List<Row> results =
+                CollectionUtil.iteratorToList(
+                        tEnv.sqlQuery(
+                                        String.format(
+                                                "select * from %s",
+                                                TEST_SINK_TABLE_ALL_TYPES_WITHOUT_YEAR_TYPE))
+                                .execute()
+                                .collect());
+        assertEquals(ALL_TYPES_ROWS, results);
+    }
+
+    @Test
+    public void testSelectToInsertWithYearType() throws Exception {
+        exception.expect(ExecutionException.class);
+        exception.expectMessage("TableException: Failed to wait job finish");

Review comment:
       I'm not sure it's worth testing that this doesn't work, but if we do, we should assert on something more meaningful than such a generic error message. An IT case is pretty heavy for this, though.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(
+            String connUrl,
+            String sql,
+            int columnIndex,
+            FilterFunction<String> filterFunc,

Review comment:
       Using the Flink API `FilterFunction` here is completely unnecessary. We can just use `Predicate<String>`.

##########
File path: docs/content/docs/connectors/table/jdbc.md
##########
@@ -308,47 +308,50 @@ As there is no standard syntax for upsert, the following table describes the dat
     </tbody>
 </table>
 
-### Postgres Database as a Catalog
+JDBC Catalog
+------------
 
 The `JdbcCatalog` enables users to connect Flink to relational databases over JDBC protocol.
 
-Currently, `PostgresCatalog` is the only implementation of JDBC Catalog at the moment, `PostgresCatalog` only supports limited `Catalog` methods include:
+Currently, there are two JDBC catalog implementations, `PostgresCatalog` and `MySQLCatalog`. They support the following catalog methods. Other methods are currently not supported.

Review comment:
       Since the classes `PostgresCatalog` and `MySQLCatalog` are internal classes, I think we should refrain from referencing their class names here and just talk about "Postgres catalog" and "MySQL catalog" as names (meaning also without the backticks). This also applies to a few other places in the docs where we refer to them.

##########
File path: flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalog.java
##########
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogDatabase;
+import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.flink.table.catalog.exceptions.CatalogException;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.DecimalType;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.compress.utils.Lists;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.PASSWORD;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.TABLE_NAME;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.URL;
+import static org.apache.flink.connector.jdbc.table.JdbcConnectorOptions.USERNAME;
+import static org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory.IDENTIFIER;
+import static org.apache.flink.table.factories.FactoryUtil.CONNECTOR;
+
+/** Catalog for MySQL. */
+@Internal
+public class MySQLCatalog extends AbstractJdbcCatalog {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MySQLCatalog.class);
+
+    private final String databaseVersion;
+    private final String driverVersion;
+
+    // ============================data types=====================
+
+    public static final String MYSQL_UNKNOWN = "UNKNOWN";
+    public static final String MYSQL_BIT = "BIT";
+
+    // -------------------------number----------------------------
+    public static final String MYSQL_TINYINT = "TINYINT";
+    public static final String MYSQL_TINYINT_UNSIGNED = "TINYINT UNSIGNED";
+    public static final String MYSQL_SMALLINT = "SMALLINT";
+    public static final String MYSQL_SMALLINT_UNSIGNED = "SMALLINT UNSIGNED";
+    public static final String MYSQL_MEDIUMINT = "MEDIUMINT";
+    public static final String MYSQL_MEDIUMINT_UNSIGNED = "MEDIUMINT UNSIGNED";
+    public static final String MYSQL_INT = "INT";
+    public static final String MYSQL_INT_UNSIGNED = "INT UNSIGNED";
+    public static final String MYSQL_INTEGER = "INTEGER";
+    public static final String MYSQL_INTEGER_UNSIGNED = "INTEGER UNSIGNED";
+    public static final String MYSQL_BIGINT = "BIGINT";
+    public static final String MYSQL_BIGINT_UNSIGNED = "BIGINT UNSIGNED";
+    public static final String MYSQL_DECIMAL = "DECIMAL";
+    public static final String MYSQL_DECIMAL_UNSIGNED = "DECIMAL UNSIGNED";
+    public static final String MYSQL_FLOAT = "FLOAT";
+    public static final String MYSQL_FLOAT_UNSIGNED = "FLOAT UNSIGNED";
+    public static final String MYSQL_DOUBLE = "DOUBLE";
+    public static final String MYSQL_DOUBLE_UNSIGNED = "DOUBLE UNSIGNED";
+
+    // -------------------------string----------------------------
+    public static final String MYSQL_CHAR = "CHAR";
+    public static final String MYSQL_VARCHAR = "VARCHAR";
+    public static final String MYSQL_TINYTEXT = "TINYTEXT";
+    public static final String MYSQL_MEDIUMTEXT = "MEDIUMTEXT";
+    public static final String MYSQL_TEXT = "TEXT";
+    public static final String MYSQL_LONGTEXT = "LONGTEXT";
+    public static final String MYSQL_JSON = "JSON";
+
+    // ------------------------------time-------------------------
+    public static final String MYSQL_DATE = "DATE";
+    public static final String MYSQL_DATETIME = "DATETIME";
+    public static final String MYSQL_TIME = "TIME";
+    public static final String MYSQL_TIMESTAMP = "TIMESTAMP";
+    public static final String MYSQL_YEAR = "YEAR";
+
+    // ------------------------------blob-------------------------
+    public static final String MYSQL_TINYBLOB = "TINYBLOB";
+    public static final String MYSQL_MEDIUMBLOB = "MEDIUMBLOB";
+    public static final String MYSQL_BLOB = "BLOB";
+    public static final String MYSQL_LONGBLOB = "LONGBLOB";
+    public static final String MYSQL_BINARY = "BINARY";
+    public static final String MYSQL_VARBINARY = "VARBINARY";
+    public static final String MYSQL_GEOMETRY = "GEOMETRY";
+
+    // column class names
+    public static final String COLUMN_CLASS_BOOLEAN = "java.lang.Boolean";
+    public static final String COLUMN_CLASS_INTEGER = "java.lang.Integer";
+    public static final String COLUMN_CLASS_BIG_INTEGER = "java.math.BigInteger";
+    public static final String COLUMN_CLASS_LONG = "java.lang.Long";
+    public static final String COLUMN_CLASS_FLOAT = "java.lang.Float";
+    public static final String COLUMN_CLASS_DOUBLE = "java.lang.Double";
+    public static final String COLUMN_CLASS_BIG_DECIMAL = "java.math.BigDecimal";
+    public static final String COLUMN_CLASS_BYTE_ARRAY = "[B";
+    public static final String COLUMN_CLASS_STRING = "java.lang.String";
+    public static final String COLUMN_CLASS_DATE = "java.sql.Date";
+    public static final String COLUMN_CLASS_TIME = "java.sql.Time";
+    public static final String COLUMN_CLASS_TIMESTAMP = "java.sql.Timestamp";
+
+    public static final int RAW_TIME_LENGTH = 10;
+    public static final int RAW_TIMESTAMP_LENGTH = 19;
+
+    private static final Set<String> builtinDatabases =
+            new HashSet<String>() {
+                {
+                    add("information_schema");
+                    add("mysql");
+                    add("performance_schema");
+                    add("sys");
+                }
+            };
+
+    public MySQLCatalog(
+            String catalogName,
+            String defaultDatabase,
+            String username,
+            String pwd,
+            String baseUrl) {
+        super(catalogName, defaultDatabase, username, pwd, baseUrl);
+        this.driverVersion =
+                Preconditions.checkNotNull(getDriverVersion(), "driver version must not be null.");
+        this.databaseVersion =
+                Preconditions.checkNotNull(
+                        getDatabaseVersion(), "database version must not be null.");
+        LOG.info("Driver version: {}, database version: {}", driverVersion, databaseVersion);
+    }
+
+    @Override
+    public List<String> listDatabases() throws CatalogException {
+        String sql = "SELECT `SCHEMA_NAME` FROM `INFORMATION_SCHEMA`.`SCHEMATA`;";
+        return extractColumnValuesBySQL(
+                defaultUrl,
+                sql,
+                1,
+                (FilterFunction<String>) dbName -> !builtinDatabases.contains(dbName));
+    }
+
+    @Override
+    public CatalogDatabase getDatabase(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (listDatabases().contains(databaseName)) {
+            return new CatalogDatabaseImpl(Collections.emptyMap(), null);
+        } else {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+    }
+
+    @Override
+    public List<String> listTables(String databaseName)
+            throws DatabaseNotExistException, CatalogException {
+        Preconditions.checkState(
+                StringUtils.isNotBlank(databaseName), "Database name must not be blank.");
+        if (!databaseExists(databaseName)) {
+            throw new DatabaseNotExistException(getName(), databaseName);
+        }
+        String connUrl = baseUrl + databaseName;
+        return extractColumnValuesBySQL(
+                connUrl,
+                "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA = ?",
+                1,
+                null,
+                databaseName);
+    }
+
+    // ------ retrieve PK constraint ------
+
+    private Optional<UniqueConstraint> getPrimaryKey(
+            DatabaseMetaData metaData, String schema, ObjectPath table) throws SQLException {
+
+        // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
+        // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
+        // We need to sort them based on the KEY_SEQ value.
+        ResultSet rs =
+                metaData.getPrimaryKeys(table.getDatabaseName(), schema, table.getObjectName());
+
+        Map<Integer, String> keySeqColumnName = new HashMap<>();
+        String pkName = null;
+        while (rs.next()) {
+            String columnName = rs.getString("COLUMN_NAME");
+            pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
+            int keySeq = rs.getInt("KEY_SEQ");
+            keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
+        }
+        List<String> pkFields =
+                Arrays.asList(new String[keySeqColumnName.size()]); // initialize size
+        keySeqColumnName.forEach(pkFields::set);
+        if (!pkFields.isEmpty()) {
+            // PK_NAME maybe null according to the javadoc, generate an unique name in that case
+            pkName = pkName == null ? "pk_" + String.join("_", pkFields) : pkName;
+            return Optional.of(UniqueConstraint.primaryKey(pkName, pkFields));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public CatalogBaseTable getTable(ObjectPath tablePath)
+            throws TableNotExistException, CatalogException {
+        if (!tableExists(tablePath)) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+        try (Connection conn =
+                        DriverManager.getConnection(
+                                baseUrl + tablePath.getDatabaseName(), username, pwd);
+                PreparedStatement ps =
+                        conn.prepareStatement("SELECT * FROM " + tablePath.getObjectName())) {
+
+            DatabaseMetaData metaData = conn.getMetaData();
+            Optional<UniqueConstraint> primaryKey = getPrimaryKey(metaData, null, tablePath);
+            ResultSetMetaData resultSetMetaData = ps.getMetaData();
+
+            String[] columnNames = new String[resultSetMetaData.getColumnCount()];
+            DataType[] types = new DataType[resultSetMetaData.getColumnCount()];
+            for (int i = 1; i <= resultSetMetaData.getColumnCount(); i++) {
+                columnNames[i - 1] = resultSetMetaData.getColumnName(i);
+                types[i - 1] = fromJDBCType(tablePath, resultSetMetaData, i);
+                if (resultSetMetaData.isNullable(i) == ResultSetMetaData.columnNoNulls) {
+                    types[i - 1] = types[i - 1].notNull();
+                }
+            }
+            Schema.Builder schemaBuilder = Schema.newBuilder().fromFields(columnNames, types);
+            primaryKey.ifPresent(
+                    pk -> schemaBuilder.primaryKeyNamed(pk.getName(), pk.getColumns()));
+            Schema tableSchema = schemaBuilder.build();
+            Map<String, String> props = new HashMap<>();
+            props.put(CONNECTOR.key(), IDENTIFIER);
+            props.put(URL.key(), baseUrl + tablePath.getDatabaseName());
+            props.put(TABLE_NAME.key(), tablePath.getObjectName());
+            props.put(USERNAME.key(), username);
+            props.put(PASSWORD.key(), pwd);
+            return CatalogTable.of(tableSchema, null, Lists.newArrayList(), props);
+        } catch (Exception e) {
+            throw new CatalogException(
+                    String.format("Failed getting table %s", tablePath.getFullName()), e);
+        }
+    }
+
+    @Override
+    public boolean tableExists(ObjectPath tablePath) throws CatalogException {
+        return !extractColumnValuesBySQL(
+                        baseUrl,
+                        "SELECT TABLE_NAME FROM information_schema.`TABLES` WHERE TABLE_SCHEMA=? and TABLE_NAME=?",
+                        1,
+                        null,
+                        tablePath.getDatabaseName(),
+                        tablePath.getObjectName())
+                .isEmpty();
+    }
+
+    private List<String> extractColumnValuesBySQL(

Review comment:
       Would it make sense to move such a method as a protected method into `AbstractJDBCCatalog`?

##########
File path: flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySQLCatalogTestBase.java
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.jdbc.catalog;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.MySQLContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Test base for {@link MySQLCatalog}. */
+public class MySQLCatalogTestBase {
+
+    public static final Logger LOG = LoggerFactory.getLogger(MySQLCatalogTestBase.class);
+
+    protected static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34");
+    protected static final String TEST_CATALOG_NAME = "mysql_catalog";
+    protected static final String TEST_USERNAME = "mysql";
+    protected static final String TEST_PWD = "mysql";
+    protected static final String TEST_DB = "test";
+    protected static final String TEST_TABLE_ALL_TYPES = "t_all_types";
+    protected static final String TEST_SINK_TABLE_ALL_TYPES_WITHOUT_YEAR_TYPE =
+            "t_all_types_sink_without_year_type";
+    protected static final String TEST_SINK_TABLE_ALL_TYPES_WITH_YEAR_TYPE =
+            "t_all_types_sink_with_year_type";
+    protected static final String TEST_TABLE_SINK_FROM_GROUPED_BY = "t_grouped_by_sink";
+    protected static final String MYSQL_INIT_SCRIPT = "mysql-scripts/catalog-init-for-test.sql";
+    protected static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
+            new HashMap<String, String>() {
+                {
+                    put("MYSQL_ROOT_HOST", "%");
+                }
+            };
+
+    protected static final Schema TABLE_SCHEMA =
+            Schema.newBuilder()
+                    .column("pid", DataTypes.BIGINT().notNull())
+                    .column("col_bigint", DataTypes.BIGINT())
+                    .column("col_bigint_unsigned", DataTypes.DECIMAL(20, 0))
+                    .column("col_binary", DataTypes.BYTES())
+                    .column("col_bit", DataTypes.BOOLEAN())
+                    .column("col_blob", DataTypes.BYTES())
+                    .column("col_char", DataTypes.STRING())
+                    .column("col_date", DataTypes.DATE())
+                    .column("col_datetime", DataTypes.TIMESTAMP(0))
+                    .column("col_decimal", DataTypes.DECIMAL(10, 0))
+                    .column("col_decimal_unsigned", DataTypes.DECIMAL(10, 0))
+                    .column("col_double", DataTypes.DOUBLE())
+                    .column("col_double_unsigned", DataTypes.DOUBLE())
+                    .column("col_enum", DataTypes.STRING())
+                    .column("col_float", DataTypes.FLOAT())
+                    .column("col_float_unsigned", DataTypes.FLOAT())
+                    .column("col_geometry", DataTypes.BYTES())
+                    .column("col_geometrycollection", DataTypes.BYTES())
+                    .column("col_int", DataTypes.INT())
+                    .column("col_int_unsigned", DataTypes.BIGINT())
+                    .column("col_integer", DataTypes.INT())
+                    .column("col_integer_unsigned", DataTypes.BIGINT())
+                    .column("col_json", DataTypes.STRING())
+                    .column("col_linestring", DataTypes.BYTES())
+                    .column("col_longblob", DataTypes.BYTES())
+                    .column("col_longtext", DataTypes.STRING())
+                    .column("col_mediumblob", DataTypes.BYTES())
+                    .column("col_mediumint", DataTypes.INT())
+                    .column("col_mediumint_unsigned", DataTypes.INT())
+                    .column("col_mediumtext", DataTypes.STRING())
+                    .column("col_multilinestring", DataTypes.BYTES())
+                    .column("col_multipoint", DataTypes.BYTES())
+                    .column("col_multipolygon", DataTypes.BYTES())
+                    .column("col_numeric", DataTypes.DECIMAL(10, 0))
+                    .column("col_numeric_unsigned", DataTypes.DECIMAL(10, 0))
+                    .column("col_polygon", DataTypes.BYTES())
+                    .column("col_point", DataTypes.BYTES())
+                    .column("col_real", DataTypes.DOUBLE())
+                    .column("col_real_unsigned", DataTypes.DOUBLE())
+                    .column("col_set", DataTypes.STRING())
+                    .column("col_smallint", DataTypes.SMALLINT())
+                    .column("col_smallint_unsigned", DataTypes.INT())
+                    .column("col_text", DataTypes.STRING())
+                    .column("col_time", DataTypes.TIME(0))
+                    .column("col_timestamp", DataTypes.TIMESTAMP(0))
+                    .column("col_tinytext", DataTypes.STRING())
+                    .column("col_tinyint", DataTypes.TINYINT())
+                    .column("col_tinyint_unsinged", DataTypes.TINYINT())
+                    .column("col_tinyblob", DataTypes.BYTES())
+                    .column("col_varchar", DataTypes.STRING())
+                    .column("col_year", DataTypes.DATE())
+                    .column("col_datetime_p3", DataTypes.TIMESTAMP(3).notNull())
+                    .column("col_time_p3", DataTypes.TIME(3))
+                    .column("col_timestamp_p3", DataTypes.TIMESTAMP(3))
+                    .column("col_varbinary", DataTypes.BYTES())
+                    .primaryKeyNamed("PRIMARY", Lists.newArrayList("pid"))
+                    .build();
+
+    public static final MySQLContainer<?> MYSQL_CONTAINER =
+            new MySQLContainer<>(MYSQL_57_IMAGE)
+                    .withUsername("root")
+                    .withPassword("")
+                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                    .withInitScript(MYSQL_INIT_SCRIPT)
+                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+    protected static String baseUrl;
+
+    @Rule public ExpectedException exception = ExpectedException.none();
+    protected static MySQLCatalog catalog;
+
+    @BeforeClass
+    public static void launchContainer() {
+        MYSQL_CONTAINER.start();
+        // Constructs the baseUrl.

Review comment:
       nit: this comment isn't useful.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org