You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/06/10 08:02:41 UTC

[GitHub] [flink] leonardBang commented on a diff in pull request #19741: [FLINK-27794][connectors/jdbc]Fix the bug of wrong primary key in MysqlCatalog

leonardBang commented on code in PR #19741:
URL: https://github.com/apache/flink/pull/19741#discussion_r894249878


##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)

Review Comment:
   The `catalog` meaning in Database is different with `catalog` meaning in Flink SQL, we'd better make it more clear, we can refer other popular projects as well.



##########
flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java:
##########
@@ -23,30 +23,34 @@
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Test base for {@link MySqlCatalog}. */
 public class MySqlCatalogTestBase {
 
     public static final Logger LOG = LoggerFactory.getLogger(MySqlCatalogTestBase.class);
 
-    protected static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34");
+    protected static final List<String> DOCKER_IMAGE_NAMES =
+            Arrays.asList("mysql:5.7.34", "mysql:8.0.16");

Review Comment:
   Please also add a test for 5.6.x version ?



##########
flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java:
##########
@@ -144,19 +144,23 @@ public String getBaseUrl() {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String catalog, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        ResultSet rs = metaData.getPrimaryKeys(catalog, schema, table);
 
         Map<Integer, String> keySeqColumnName = new HashMap<>();
         String pkName = null;
         while (rs.next()) {
             String columnName = rs.getString("COLUMN_NAME");
             pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
             int keySeq = rs.getInt("KEY_SEQ");
+            Preconditions.checkState(
+                    !keySeqColumnName.containsKey(keySeq - 1),
+                    "The PK constraint should be unique.");

Review Comment:
   Could we improve the exception description? It looks like we meet duplicated records with same primary key.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org