You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/08 02:31:12 UTC

[GitHub] [flink] dusukang opened a new pull request, #19741: [FLINK-27794][connector/jdbc] Fix the way of getting primary key in MysqlCatalog

dusukang opened a new pull request, #19741:
URL: https://github.com/apache/flink/pull/19741

   ## What is the purpose of the change
   Using MysqlCatalog to get table primary key of the same table name in different databases
   
   ## Brief change log
   Modify the method parameters for get MySQL primary key
   
   ## Verifying this change
   This change is a code cleanup without any test coverage.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1153336369

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1166291016

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1143211688

   Thanks @fsk119 for the test cases and @RocMarshal reply. I test different versions of MySQL 5.6.51 and 5.7.35 and 8.0.16 and 8.0.26 locally. It works normally in my test cases, here: 
   
   ```
   CREATE TABLE flinksql_test01.order_info (
     `order_id` bigint(20) NOT NULL,
     `order_name` varchar(128) DEFAULT NULL,
     `pro_id` bigint(20) DEFAULT NULL,
     `pro_name` varchar(128) DEFAULT NULL,
     PRIMARY KEY (`order_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   
   CREATE TABLE flinksql_test02.order_info (
     `order_id` bigint(20) DEFAULT NULL,
     `order_name` varchar(128) DEFAULT NULL,
     `pro_id` bigint(20) NOT NULL,
     `pro_name` varchar(128) DEFAULT NULL,
     PRIMARY KEY (`pro_id`)
   ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
   ```
   
   ```
   import org.apache.flink.connector.jdbc.catalog.MySqlCatalog;
   import org.apache.flink.table.catalog.CatalogBaseTable;
   import org.apache.flink.table.catalog.ObjectPath;
   import org.apache.flink.table.catalog.exceptions.TableNotExistException;
   
   public class MySqlCatalogTest {
       public static void main(String[] args) throws TableNotExistException {
           MySqlCatalog mySqlCatalog =
                   new MySqlCatalog(
                           "mysql-catalog",
                           "flinksql_test01",
                           "root",
                           "123456",
                           "jdbc:mysql://127.0.0.1:3307");
           CatalogBaseTable table =
                   mySqlCatalog.getTable(new ObjectPath("flinksql_test01", "order_info"));
           System.out.println(table.getUnresolvedSchema());
       }
   }
   ```
   @RocMarshal could you have a look? 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1148291175

   Please use rebase and push your code forcibly. In the community, git merge is not suggested.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1148289530

   Yes. Just modify as you wish.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1160435276

   @leonardBang yeah. I will do it soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang merged pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
leonardBang merged PR #19741:
URL: https://github.com/apache/flink/pull/19741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1157818778

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #19741: The table primary key obtained by MysqlCalalog is incorrect

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1128408013

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "3a5ea7083ab8c3097cc3258e8524f857e7afd426",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "3a5ea7083ab8c3097cc3258e8524f857e7afd426",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 3a5ea7083ab8c3097cc3258e8524f857e7afd426 UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1151851087

   @fsk119 Hi. It has passed the CI test. Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894124861


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,12 +144,13 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(catalog, schema, table);

Review Comment:
   Could you add checks in Line 161? 
   ```
   Preconditions.checkState(!keySeqColumnName.containsKey(keySeq - 1), "The PK constrants should be unique.");
   
   ```
   



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java:
##########
@@ -251,6 +267,21 @@ public void testGetTable() throws TableNotExistException {
         assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
     }
 
+    @Test
+    public void testGetTablePrimaryKey() throws TableNotExistException {
+        Schema tableSchemaUser =
+                Schema.newBuilder()
+                        .column("uid", DataTypes.BIGINT().notNull())
+                        .column("uname", DataTypes.VARCHAR(36))
+                        .column("others", DataTypes.VARCHAR(128))
+                        .primaryKeyNamed("PRIMARY", Lists.newArrayList("uid"))

Review Comment:
   Use Collections.singletonList here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1166785797

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
leonardBang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1160417548

   > Shouldn't we first get this fixed in `master` before backporting this to `release-1.15` ? @leonardBang ?
   
   Yes, fixing in master firstly and then backport to other released branches is the right work flow. 
   I will have a final review about this PR, please also open a PR for master @dusukang 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1148466110

   Thanks. I'll try it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1166944578

   @leonardBang Hi, It has passed the CI tests. Could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1150832255

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1157248524

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1157335524

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
leonardBang commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894249878


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)

Review Comment:
   The `catalog` meaning in Database(MySQL) is different with `catalog` meaning in Flink SQL, we'd better make it more clear, we can refer other popular projects as well. Eg we can use `database` as parameter name and add proper java doc, because `database` == `catalog` in Postgresql and MySQL 8.0.x. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1151905642

   Yeah. Thanks for the comment. I also think it is a good improvement to verify. I'll change it soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1158428833

   @leonardBang Hi. Thanks for your comments, I have modified the PR. I found that databaseName can be obtained by `tablePath.getDatabaseName()`, please take a look when you have time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1165521349

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] RocMarshal commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
RocMarshal commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1142100626

   Thanks @dusukang for the driving this and @fsk119 ping. 
   +1 to me to fix it.
   But I'm not sure whether the current fix is compatible with different versions of MySQL.
   Would you mind  giving some test cases for different mysql versions to verify the fix ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] MartijnVisser commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
MartijnVisser commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1160360827

   Shouldn't we first get this fixed in `master` before backporting this to `release-1.15` ? @leonardBang  ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1141125489

   > I don't reproduce the problem in the issue. Could you also tell us which MySql version you use and the mysql driver version?
   
   Thank you very much for your reply, My MySQL version is 8.0.16. There is a table named mysql.`user` in the MySQL, when i create a table named `user` and get the primary key, but the returned primary key is of mysql.`user`.  It while happen when there are tables which have the same name in different databases, when we use catalog to get the primary key for a table, the result may be from other tables which have same names. Can you help me verify it?
   
   This is my case:
   [https://issues.apache.org/jira/browse/FLINK-27794](url)
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1166224215

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1166769356

   > The failure test is related to [https://issues.apache.org/jira/browse/FLINK-28226,](https://issues.apache.org/jira/browse/FLINK-28226%EF%BC%8C) let's rebase util the [FLINK-28226](https://issues.apache.org/jira/browse/FLINK-28226) fix has been backported to release-1.15, just wait a minute @dusukang . @HuangXingBo is working for the fixing.
   Thanks. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
leonardBang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1166764398

   The failure test is related to https://issues.apache.org/jira/browse/FLINK-28226, let's rebase util the FLINK-28226 fix has been backported to release-1.15, just wait a minute @dusukang . @HuangXingBo is working for the fixing.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1151049030

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1155031623

   @leonardBang Hi. I have updated the pr. After adding the 5.6.x test, it compiled fails. I found that there is no json type in 5.6.x. I made a little change to `catalog-init-for-test.sql`, could you review this when you have time?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1151963744

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1151985970

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connector/jdbc] Fix the way of getting primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1150673986

   Yeah. I'll check it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
leonardBang commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894249878


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)

Review Comment:
   The `catalog` meaning in Database is different with `catalog` meaning in Flink SQL, we'd better make it more clear, we can refer other popular projects as well.



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java:
##########
@@ -23,30 +23,34 @@
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Test base for {@link MySqlCatalog}. */
 public class MySqlCatalogTestBase {
 
     public static final Logger LOG = LoggerFactory.getLogger(MySqlCatalogTestBase.class);
 
-    protected static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34");
+    protected static final List<String> DOCKER_IMAGE_NAMES =
+            Arrays.asList("mysql:5.7.34", "mysql:8.0.16");

Review Comment:
   Please also add a test for 5.6.x version ?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(catalog, schema, table);
 
         Map<Integer, String> keySeqColumnName = new HashMap<>();
         String pkName = null;
         while (rs.next()) {
             String columnName = rs.getString("COLUMN_NAME");
             pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
             int keySeq = rs.getInt("KEY_SEQ");
+            Preconditions.checkState(
+                    !keySeqColumnName.containsKey(keySeq - 1),
+                    "The PK constraint should be unique.");

Review Comment:
   Could we improve the exception description? It looks like we meet duplicated records with same primary key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1152151064

   Thanks @leonardBang  for the comments. I agree, I'll change it soon. But If there are other databases you need both `catalog`,`database`,`table` to define a unique table?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1152272036

   @leonardBang Hi. I have modified the PR with your suggestion. Could you have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
leonardBang commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r897818461


##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java:
##########
@@ -251,6 +267,21 @@ public void testGetTable() throws TableNotExistException {
         assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
     }
 
+    @Test
+    public void testGetTablePrimaryKey() throws TableNotExistException {
+        Schema tableSchemaUser =

Review Comment:
   IIUC, we need another table in another database with same name 'user' and different pk 'another_pk'  to validate your PR works



##########
flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql:
##########
@@ -59,7 +58,7 @@ CREATE TABLE `t_all_types` (
   `col_int_unsigned` int(10) unsigned DEFAULT NULL,
   `col_integer` int(11) DEFAULT NULL,
   `col_integer_unsigned` int(10) unsigned DEFAULT NULL,
-  `col_json` json DEFAULT NULL,
+  `col_json` longtext DEFAULT NULL,

Review Comment:
   The column name `col_json` and type `longtext` is not matched, we can skip the json type test by removing this field. Although this is a minor test fallback, I think it's okay as we did not cover all types like GEO type too.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,25 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String database, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        // In the currently supported databases, database is equivalent to catalog.
+        // We need to pass the name of the database to represent catalog.
+        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
 
         Map<Integer, String> keySeqColumnName = new HashMap<>();
         String pkName = null;
         while (rs.next()) {
             String columnName = rs.getString("COLUMN_NAME");
             pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
             int keySeq = rs.getInt("KEY_SEQ");
+            Preconditions.checkState(
+                    !keySeqColumnName.containsKey(keySeq - 1),
+                    "The PK must be from the same table.");

Review Comment:
   The field(s) of primary key must be from the same table.



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java:
##########
@@ -106,23 +116,51 @@ public class MySqlCatalogTestBase {
                     .primaryKeyNamed("PRIMARY", Lists.newArrayList("pid"))
                     .build();
 
-    @ClassRule
-    public static final MySQLContainer<?> MYSQL_CONTAINER =
-            new MySQLContainer<>(MYSQL_57_IMAGE)
-                    .withUsername("root")
-                    .withPassword("")
-                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
-                    .withInitScript(MYSQL_INIT_SCRIPT)
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
+    public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new HashMap<>();
 
-    protected static MySqlCatalog catalog;
+    public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
 
     @BeforeClass
-    public static void beforeAll() {
-        String baseUrl =
-                MYSQL_CONTAINER
-                        .getJdbcUrl()
-                        .substring(0, MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/"));
-        catalog = new MySqlCatalog(TEST_CATALOG_NAME, TEST_DB, TEST_USERNAME, TEST_PWD, baseUrl);
+    public static void beforeAll() throws SQLException {
+        for (String dockerImageName : DOCKER_IMAGE_NAMES) {
+            MySQLContainer<?> container =
+                    new MySQLContainer<>(DockerImageName.parse(dockerImageName))
+                            .withUsername("root")
+                            .withPassword("")
+                            .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                            .withInitScript(MYSQL_INIT_SCRIPT)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+            container.start();
+            MYSQL_CONTAINERS.put(dockerImageName, container);
+
+            String baseUrl =
+                    container.getJdbcUrl().substring(0, container.getJdbcUrl().lastIndexOf("/"));
+
+            CATALOGS.put(
+                    dockerImageName,
+                    new MySqlCatalog(TEST_CATALOG_NAME, TEST_DB, TEST_USERNAME, TEST_PWD, baseUrl));
+
+            try (Connection conn = DriverManager.getConnection(baseUrl, TEST_USERNAME, TEST_PWD);
+                    Statement stat = conn.createStatement()) {
+                if (!dockerImageName.equals("mysql:5.6.51")) {
+                    // The tables ddl according to 5.6.x,
+                    // we need to modify the type of the field col_json to json in 5.7.x and 8.0.x.

Review Comment:
   Let’s remove the json type cover for simplification.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,25 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String database, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        // In the currently supported databases, database is equivalent to catalog.

Review Comment:
    // In the currently supported database dialects MySQL and Postgres, the database term is equivalent to catalog term. We need to pass the database name as catalog parameter for retrieving primary keys by full table identifier  



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/PostgresCatalog.java:
##########
@@ -169,6 +169,11 @@ protected String getSchemaName(ObjectPath tablePath) {
         return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();
     }
 
+    @Override
+    protected String getDatabaseName(ObjectPath tablePath) {
+        return PostgresTablePath.fromFlinkTableName(tablePath.getObjectName()).getPgSchemaName();

Review Comment:
   Please add a test for this, I remember we use 'catalog.db.\`schema.table\`' in Flink for Postgres table representation, the databaseName is diff with schemaName.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1161572540

   I opened a PR for master[https://github.com/apache/flink/pull/20025](https://github.com/apache/flink/pull/20025)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1147441150

   I push my thoughts to the https://github.com/fsk119/flink/pull/new/mysql-catalog
   
   Could you modify the PR as I do and add more tests?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1141602620

   Hi. Thanks for your update. I produce the problem in my local environment after I using the MySql 8.0.16. Let me summarize the problem here: in the MySql 5.7, JDBC uses the scheam as the synonym of the database but in the MySql 8.0 it uses the catalog as the synonym of the database[1].  In the MySql 8.0, it uses the following sql to fetch pks
   
   ```
   SELECT TABLE_SCHEMA AS TABLE_CAT, NULL AS TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, SEQ_IN_INDEX AS KEY_SEQ, 'PRIMARY' AS PK_NAME FROM INFORMATION_SCHEMA.STATISTICS WHERE TABLE_NAME = 'user' AND INDEX_NAME='PRIMARY' ORDER BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX
   ```
   
   We can notice  `TABLE_CAT` is `NULL` in the SQL. 
   
   So I think the fix is on the correct way. WDYT, @RocMarshal ?
   
   [1] https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-connp-props-connection.html#cj-conn-prop_databaseTerm
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1144402454

   Hi, all. After digging in, I find the SQL
   
   ```
   SELECT TABLE_SCHEMA AS TABLE_CAT, NULL AS TABLE_SCHEM, TABLE_NAME, COLUMN_NAME, SEQ_IN_INDEX AS KEY_SEQ, 'PRIMARY' AS PK_NAME FROM INFORMATION_SCHEMA.STATISTICS
   WHERE TABLE_NAME = 't_grouped_by_sink' AND INDEX_NAME='PRIMARY' ORDER BY TABLE_SCHEMA, TABLE_NAME, INDEX_NAME, SEQ_IN_INDEX;
   ```
   will get the PK name whose name is `t_grouped_by_sink` only. But the original logic that converts the ResultSet to PK
   
   ```
          while (rs.next()) {
               String columnName = rs.getString("COLUMN_NAME");
               pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
               int keySeq = rs.getInt("KEY_SEQ");
               System.out.println(columnName); //Print all the column name
               keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
           }
   ```
   
   may overwrite the pk here. You can notice `keySeqColumnName` is a Map. If you have two tables whose name is equal and they both have pks, it will only keep the PK in the last table in the `keySeqColumnName`. 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1140838911

   I don't reproduce the problem in the issue. Could you also tell us which MySql version you use and the mysql driver version? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
leonardBang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1165496312

   Thanks @dusukang for the update, LGTM. But could you use `git rebase release-1.15` which can avoid conflicts instead of `git merge release-1.5` command ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
leonardBang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1152206662

   > Thanks @leonardBang for the comments. I agree, I'll change it soon. But If there are other databases you need both `catalog`,`database`,`table` to define a unique table?
   
   Currently,Flink Catalog uses three level table path design, we need adjust other databases' table path to this style, otherwise we need a new catalog API design, in this case , a FLIP is required.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1151985487

   Thanks for your reply. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1154782595

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r884449039


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -149,7 +149,7 @@ protected Optional<UniqueConstraint> getPrimaryKey(
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(schema, schema, table);

Review Comment:
   In the SQL world, catalog and schema are different things.  We can not mix them up. You can refer to the doc to see the difference[1]. In the MySql world, they has the same meaning[2]. 
   But in the PostgreSQL, catalog has many schemas[3].
   
   
   Therefore, I think you should do as the schema and let the specified catalog to determine the catalog of the specified table.
   
   
   
   [1] https://stackoverflow.com/questions/7022755/whats-the-difference-between-a-catalog-and-a-schema-in-a-relational-database 
   [2] https://dev.mysql.com/doc/connector-odbc/en/connector-odbc-usagenotes-functionality-catalog-schema.html
   [3] https://stackoverflow.com/questions/48312675/how-can-i-show-the-catalogs-and-schemas-in-postgresql



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java:
##########
@@ -152,7 +152,7 @@ protected String getTableName(ObjectPath tablePath) {
 
     @Override
     protected String getSchemaName(ObjectPath tablePath) {
-        return null;
+        return tablePath.getDatabaseName();

Review Comment:
   Please add some comments to tell others that the schema is equal to the database[1].
   
   [1] https://dev.mysql.com/doc/connector-odbc/en/connector-odbc-usagenotes-functionality-catalog-schema.html



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1144456068

   Thanks for your update. I also mentioned this in the previous issue.
    I have debugged this case, it will not happen when we use mysql-connector-java-5.x. in `com.mysql.jdbc.DatabaseMetaData`, it uses the sql to get PK such as:
   ```
   show keys from ${table_name} from ${database_name};
   ```
   but it may happen when we use mysql-connector-java-8.x. in `com.mysql.cj.jdbc.DatabaseMetaDataUsingInfoSchema`, it will use the database value to the catalog when the following condition established, but it's not established, so the catalog is null. then it uses the sql such as @fsk119 provided to get PK
   ```
           if (catalog == null) {
               if (this.conn.getNullCatalogMeansCurrent()) {
                   catalog = this.database;
               }
           }
   ```
   
   so I think we can do some filtering to resolve tables whose have same name. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1143083598

   Hi, guys. I push to my repo about how to reproduce the problem[1]. @dusukang  I think you can add a similar test. 
   
   [1] https://github.com/apache/flink/compare/master...fsk119:mysql-catalog-fix?expand=1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r890139979


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/JdbcCatalog.java:
##########
@@ -42,15 +42,16 @@ public class JdbcCatalog extends AbstractJdbcCatalog {
 
     public JdbcCatalog(
             String catalogName,
+            String dbCatalog,

Review Comment:
   We should infer the catalog from the input table path.



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -516,4 +519,8 @@ protected String getSchemaName(ObjectPath tablePath) {
     protected String getSchemaTableName(ObjectPath tablePath) {
         throw new UnsupportedOperationException();
     }
+
+    protected String getDbCatalog() {

Review Comment:
   It should be like getSchemaName(ObjectPath tablePath)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1148256940

   Thank you for @fsk119  suggestion. I have modified the PR in this way and added one test. I want to create same tables in two databases to reproduce the case, so should I modify the file `catalog-init-for-test.sql`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang closed pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang closed pull request #19741: [FLINK-27794][connector/jdbc] The primary key obtained from MySQL is incorrect by using MysqlCatalog
URL: https://github.com/apache/flink/pull/19741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on pull request #19741: [FLINK-27794][connector/jdbc] Fix the way of getting primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1150634252

   @dusukang Hi. The CI test fails. Could you check whether your modification breaks the test?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1152847189

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] leonardBang commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
leonardBang commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894249878


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)

Review Comment:
   The `catalog` meaning in Database(MySQL) is different with `catalog` meaning in Flink SQL, we'd better make it more clear, we can refer other popular projects as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] fsk119 commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
fsk119 commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894124861


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,12 +144,13 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(catalog, schema, table);

Review Comment:
   Could you add checks in Line 161? 
   ```
   Preconditions.checkState(!keySeqColumnName.containsKey(keySeq - 1), "The PK constraint should be unique.");
   
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1152233281

   > > Thanks @leonardBang for the comments. I agree, I'll change it soon. But If there are other databases you need both `catalog`,`database`,`table` to define a unique table?
   > 
   > Currently,Flink Catalog uses three level table path design, we need adjust other databases' table path to this style, otherwise we need a new catalog API design, in this case , a FLIP is required.
   
   Yeah. Thanks for your reply. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1166767237

   @leonardBang the ci report build failed on `e2e_n_ci`, it seems that the connectors module is not related. If I am wrong, please help correct it. Is this because of the python3 package version?
   ```
   ERROR: Complete output from command python setup.py egg_info:
   Jun 25 15:22:37     ERROR: Traceback (most recent call last):
   Jun 25 15:22:37       File "<string>", line 1, in <module>
   Jun 25 15:22:37       File "/tmp/pip-install-fixi7jcz/grpcio/setup.py", line 263, in <module>
   Jun 25 15:22:37         if check_linker_need_libatomic():
   Jun 25 15:22:37       File "/tmp/pip-install-fixi7jcz/grpcio/setup.py", line 213, in check_linker_need_libatomic
   Jun 25 15:22:37         stderr=PIPE)
   Jun 25 15:22:37       File "/tmp/.conda/lib/python3.7/subprocess.py", line 775, in __init__
   Jun 25 15:22:37         restore_signals, start_new_session)
   Jun 25 15:22:37       File "/tmp/.conda/lib/python3.7/subprocess.py", line 1522, in _execute_child
   Jun 25 15:22:37         raise child_exception_type(errno_num, err_msg, err_filename)
   Jun 25 15:22:37     FileNotFoundError: [Errno 2] No such file or directory: 'c++': 'c++'
   Jun 25 15:22:37     ----------------------------------------
   Jun 25 15:22:37 ERROR: Command "python setup.py egg_info" failed with error code 1 in /tmp/pip-install-fixi7jcz/grpcio/
   The command '/bin/sh -c /tmp/.conda/bin/python -m pip install apache-flink-1.15.dev0.tar.gz' returned a non-zero code: 1
   ```
   Sorry to bother you, could you help me see if the upgrade of [https://github.com/apache/flink/pull/20044](https://github.com/apache/flink/pull/20044) is applicable to release-1.15? @MartijnVisser 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1165512120

   > Thanks @dusukang for the update, LGTM. But could you use `git rebase release-1.15` which can avoid conflicts instead of `git merge release-1.5` command ?
   
   Yes. I haven't used rebase before... There may be some usage problems. As a result, I have to merge before rebase every time.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang commented on pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang commented on PR #19741:
URL: https://github.com/apache/flink/pull/19741#issuecomment-1165528513

   @flinkbot run azure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] dusukang closed pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

Posted by GitBox <gi...@apache.org>.
dusukang closed pull request #19741: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases
URL: https://github.com/apache/flink/pull/19741


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org