You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Leonard Xu (Jira)" <ji...@apache.org> on 2022/06/24 12:05:00 UTC
[jira] [Commented] (FLINK-27794) The primary key obtained from MySQL is incorrect by using MysqlCatalog
[ https://issues.apache.org/jira/browse/FLINK-27794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17558456#comment-17558456 ]
Leonard Xu commented on FLINK-27794:
------------------------------------
Fixed in:
master(1.16) d0cee16e6db5f6279697924a2a18c685e576955b
release-1.15: TODO
> The primary key obtained from MySQL is incorrect by using MysqlCatalog
> ----------------------------------------------------------------------
>
> Key: FLINK-27794
> URL: https://issues.apache.org/jira/browse/FLINK-27794
> Project: Flink
> Issue Type: Bug
> Components: Connectors / JDBC
> Affects Versions: 1.15.0
> Reporter: dusukang
> Assignee: dusukang
> Priority: Major
> Labels: pull-request-available
> Attachments: 167908239-c6f3f0ad-af06-436f-87e2-85c60428b400.png
>
>
> I want to use MysqlCatalog to get the primary key of the database table `user`. The database table creation statement is as follows
> {code:java}
> CREATE TABLE flinksql_test.`user` (
> `uid` bigint(20) NOT NULL,
> `uname` varchar(36) DEFAULT NULL,
> `others` varchar(128) DEFAULT NULL,
> PRIMARY KEY (`uid`)
> ) ENGINE=InnoDB DEFAULT CHARSET=utf8; {code}
>
> This is my test code:
> {code:java}
> import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
> import org.apache.flink.table.api.Schema;
> import org.apache.flink.table.catalog.CatalogBaseTable;
> import org.apache.flink.table.catalog.ObjectPath;
> import org.apache.flink.table.catalog.exceptions.TableNotExistException;import java.util.Optional;public class Demo02 {
> public static void main(String[] args) throws TableNotExistException {
> MySqlCatalog mySqlCatalog = new MySqlCatalog("mysql-catalog",
> "flinksql_test",
> "root",
> "123456789",
> String.format("jdbc:mysql://127.0.0.1:3306"));
> CatalogBaseTable table = mySqlCatalog.getTable(new ObjectPath("flinksql_test", "user"));
> Optional<Schema.UnresolvedPrimaryKey> primaryKey = table
> .getUnresolvedSchema()
> .getPrimaryKey();
> System.out.println(primaryKey);
> }
> } {code}
>
> The obtained primary key is (Host,User), but the primary key from Database is (uid)
> !167908239-c6f3f0ad-af06-436f-87e2-85c60428b400.png!
>
> I see, the value of the incoming catalog and schema is null, and the SQL splicing of the database to obtain the primary key does not add " TABLE_SCHEMA LIKE ? AND"
> !https://user-images.githubusercontent.com/68139929/167910188-6df6f3ec-cb33-49cc-91d5-61dcb1167c98.png!
> !https://user-images.githubusercontent.com/68139929/167908960-cf873c66-a227-41fa-99ea-5cff8a181f29.png!
> !https://user-images.githubusercontent.com/68139929/167909024-22b15192-0755-416e-8421-dab0fdfc0d15.png!
> Later, it was found that there was also a user table in the self-contained MySQL database with the following structure:
>
> {code:java}
> CREATE TABLE mysql.`user` (
> `Host` char(60) COLLATE utf8_bin NOT NULL DEFAULT '',
> `User` char(32) COLLATE utf8_bin NOT NULL DEFAULT '',
> `Select_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Insert_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Update_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Delete_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Create_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Drop_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Reload_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Shutdown_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Process_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `File_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Grant_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `References_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Index_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Alter_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Show_db_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Super_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Create_tmp_table_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Lock_tables_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Execute_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Repl_slave_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Repl_client_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Create_view_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Show_view_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Create_routine_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Alter_routine_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Create_user_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Event_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Trigger_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Create_tablespace_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `ssl_type` enum('','ANY','X509','SPECIFIED') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT '',
> `ssl_cipher` blob NOT NULL,
> `x509_issuer` blob NOT NULL,
> `x509_subject` blob NOT NULL,
> `max_questions` int(11) unsigned NOT NULL DEFAULT '0',
> `max_updates` int(11) unsigned NOT NULL DEFAULT '0',
> `max_connections` int(11) unsigned NOT NULL DEFAULT '0',
> `max_user_connections` int(11) unsigned NOT NULL DEFAULT '0',
> `plugin` char(64) COLLATE utf8_bin NOT NULL DEFAULT 'caching_sha2_password',
> `authentication_string` text COLLATE utf8_bin,
> `password_expired` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `password_last_changed` timestamp NULL DEFAULT NULL,
> `password_lifetime` smallint(5) unsigned DEFAULT NULL,
> `account_locked` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Create_role_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Drop_role_priv` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL DEFAULT 'N',
> `Password_reuse_history` smallint(5) unsigned DEFAULT NULL,
> `Password_reuse_time` smallint(5) unsigned DEFAULT NULL,
> `Password_require_current` enum('N','Y') CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
> `User_attributes` json DEFAULT NULL,
> PRIMARY KEY (`Host`,`User`)
> ) /*!50100 TABLESPACE `mysql` */ ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin STATS_PERSISTENT=0 COMMENT='Users and global privileges'; {code}
>
> I think it while happen when there are multiple tables which have same table name, so we can pass the table schema to get primary key?
--
This message was sent by Atlassian Jira
(v8.20.7#820007)