You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/10 03:38:13 UTC

[GitHub] [flink] fsk119 commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

fsk119 commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894124861


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,12 +144,13 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(catalog, schema, table);

Review Comment:
   Could you add checks in Line 161? 
   ```
   Preconditions.checkState(!keySeqColumnName.containsKey(keySeq - 1), "The PK constrants should be unique.");
   
   ```
   



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java:
##########
@@ -251,6 +267,21 @@ public void testGetTable() throws TableNotExistException {
         assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
     }
 
+    @Test
+    public void testGetTablePrimaryKey() throws TableNotExistException {
+        Schema tableSchemaUser =
+                Schema.newBuilder()
+                        .column("uid", DataTypes.BIGINT().notNull())
+                        .column("uname", DataTypes.VARCHAR(36))
+                        .column("others", DataTypes.VARCHAR(128))
+                        .primaryKeyNamed("PRIMARY", Lists.newArrayList("uid"))

Review Comment:
   Use Collections.singletonList here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org