You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2022/06/27 09:28:00 UTC

[jira] [Resolved] (FLINK-27794) The primary key obtained from MySQL is incorrect by using MysqlCatalog

     [ https://issues.apache.org/jira/browse/FLINK-27794?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Leonard Xu resolved FLINK-27794.
--------------------------------
    Resolution: Fixed

> The primary key obtained from MySQL is incorrect by using MysqlCatalog
> ----------------------------------------------------------------------
>
>                 Key: FLINK-27794
>                 URL: https://issues.apache.org/jira/browse/FLINK-27794
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / JDBC
>    Affects Versions: 1.15.0
>            Reporter: dusukang
>            Assignee: dusukang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.15.2
>
>         Attachments: 167908239-c6f3f0ad-af06-436f-87e2-85c60428b400.png
>
>
> I want to use MysqlCatalog to get the primary key of the database table `user`. The database table creation statement is as follows
> {code:java}
> CREATE TABLE flinksql_test.`user` (
>   `uid` bigint(20) NOT NULL,
>   `uname` varchar(36) DEFAULT NULL,
>   `others` varchar(128) DEFAULT NULL,
>   PRIMARY KEY (`uid`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; {code}
>  
> This is my test code:
> {code:java}
> import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.catalog.CatalogBaseTable;
> import org.apache.flink.table.catalog.ObjectPath;
> import org.apache.flink.table.catalog.exceptions.TableNotExistException;import java.util.Optional;public class Demo02 {
>     public static void main(String[] args) throws TableNotExistException {
>         MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql-catalog",
>                 "flinksql_test",
>                 "root",
>                 "123456789",
>                 String.format("jdbc:mysql://127.0.0.1:3306"));
>         CatalogBaseTable table = mySqlCatalog.getTable(new ObjectPath("flinksql_test", "user"));
>         Optional<Schema.UnresolvedPrimaryKey> primaryKey = table
>                 .getUnresolvedSchema()
>                 .getPrimaryKey();
>         System.out.println(primaryKey);
>     }
> } {code}
>  
> The obtained primary key is (Host,User), but the primary key from Database is (uid)
> !167908239-c6f3f0ad-af06-436f-87e2-85c60428b400.png!
>  
> I see, the value of the incoming catalog and schema is null, and the SQL splicing of the database to obtain the primary key does not add " TABLE_SCHEMA LIKE ? AND"
> !https://user-images.githubusercontent.com/68139929/167910188-6df6f3ec-cb33-49cc-91d5-61dcb1167c98.png!
> !https://user-images.githubusercontent.com/68139929/167908960-cf873c66-a227-41fa-99ea-5cff8a181f29.png!
> !https://user-images.githubusercontent.com/68139929/167909024-22b15192-0755-416e-8421-dab0fdfc0d15.png!
> Later, it was found that there was also a user table in the self-contained MySQL database with the following structure:
>  
> {code:java}
> CREATE TABLE mysql.`user` (
>   `Host` char(60) COLLATE utf8_bin NOT NULL DEFAULT '',
>   `User` char(32) COLLATE utf8_bin NOT NULL DEFAULT '',
>   `Select_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Insert_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Update_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Delete_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Create_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Drop_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Reload_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Shutdown_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Process_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `File_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Grant_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `References_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Index_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Alter_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Show_db_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Super_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Create_tmp_table_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Lock_tables_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Execute_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Repl_slave_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Repl_client_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Create_view_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Show_view_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Create_routine_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Alter_routine_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Create_user_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Event_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Trigger_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Create_tablespace_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `ssl_type` enum('','ANY','X509','SPECIFIED') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
>   `ssl_cipher` blob NOT NULL,
>   `x509_issuer` blob NOT NULL,
>   `x509_subject` blob NOT NULL,
>   `max_questions` int(11) unsigned NOT NULL DEFAULT '0',
>   `max_updates` int(11) unsigned NOT NULL DEFAULT '0',
>   `max_connections` int(11) unsigned NOT NULL DEFAULT '0',
>   `max_user_connections` int(11) unsigned NOT NULL DEFAULT '0',
>   `plugin` char(64) COLLATE utf8_bin NOT NULL DEFAULT 'caching_sha2_password',
>   `authentication_string` text COLLATE utf8_bin,
>   `password_expired` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `password_last_changed` timestamp NULL DEFAULT NULL,
>   `password_lifetime` smallint(5) unsigned DEFAULT NULL,
>   `account_locked` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Create_role_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Drop_role_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
>   `Password_reuse_history` smallint(5) unsigned DEFAULT NULL,
>   `Password_reuse_time` smallint(5) unsigned DEFAULT NULL,
>   `Password_require_current` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
>   `User_attributes` json DEFAULT NULL,
>   PRIMARY KEY (`Host`,`User`)
> ) /*!50100 TABLESPACE `mysql` */ ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin STATS_PERSISTENT=0 COMMENT='Users and global privileges'; {code}
>  
> I think it while happen when there are multiple tables which have same table name, so we can pass the table schema to get primary key?



--
This message was sent by Atlassian Jira
(v8.20.7#820007)