You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by le...@apache.org on 2022/06/27 09:25:09 UTC

[flink] branch release-1.15 updated: [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases

This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 1d0dac0e007 [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases
1d0dac0e007 is described below

commit 1d0dac0e00739a4cf176cba6dc091662e48170c3
Author: dusukang <68...@users.noreply.github.com>
AuthorDate: Mon Jun 27 17:25:02 2022 +0800

    [FLINK-27794][connectors/jdbc] Fix the wrong primary key for MysqlCatalog when there are tables with same name in different databases
    
    This closes #19741.
---
 .../jdbc/catalog/AbstractJdbcCatalog.java          | 21 ++++++--
 .../flink/connector/jdbc/catalog/MySqlCatalog.java |  2 +-
 .../connector/jdbc/catalog/MySqlCatalogITCase.java | 49 ++++++++++++++++--
 .../jdbc/catalog/MySqlCatalogTestBase.java         | 58 +++++++++++++++-------
 .../mysql-scripts/catalog-init-for-test.sql        | 32 ++++++++++--
 5 files changed, 129 insertions(+), 33 deletions(-)

diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
index a2e46ff0573..82410850420 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/AbstractJdbcCatalog.java
@@ -144,12 +144,17 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
     // ------ retrieve PK constraint ------
 
     protected Optional<UniqueConstraint> getPrimaryKey(
-            DatabaseMetaData metaData, String schema, String table) throws SQLException {
+            DatabaseMetaData metaData, String database, String schema, String table)
+            throws SQLException {
 
         // According to the Javadoc of java.sql.DatabaseMetaData#getPrimaryKeys,
         // the returned primary key columns are ordered by COLUMN_NAME, not by KEY_SEQ.
         // We need to sort them based on the KEY_SEQ value.
-        ResultSet rs = metaData.getPrimaryKeys(null, schema, table);
+        // In the currently supported database dialects MySQL and Postgres,
+        // the database term is equivalent to catalog term.
+        // We need to pass the database name as catalog parameter for retrieving primary keys by
+        // full table identifier.
+        ResultSet rs = metaData.getPrimaryKeys(database, schema, table);
 
         Map<Integer, String> keySeqColumnName = new HashMap<>();
         String pkName = null;
@@ -157,6 +162,9 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
             String columnName = rs.getString("COLUMN_NAME");
             pkName = rs.getString("PK_NAME"); // all the PK_NAME should be the same
             int keySeq = rs.getInt("KEY_SEQ");
+            Preconditions.checkState(
+                    !keySeqColumnName.containsKey(keySeq - 1),
+                    "The field(s) of primary key must be from the same table.");
             keySeqColumnName.put(keySeq - 1, columnName); // KEY_SEQ is 1-based index
         }
         List<String> pkFields =
@@ -228,12 +236,17 @@ public abstract class AbstractJdbcCatalog extends AbstractCatalog {
             throw new TableNotExistException(getName(), tablePath);
         }
 
-        String dbUrl = baseUrl + tablePath.getDatabaseName();
+        String databaseName = tablePath.getDatabaseName();
+        String dbUrl = baseUrl + databaseName;
 
         try (Connection conn = DriverManager.getConnection(dbUrl, username, pwd)) {
             DatabaseMetaData metaData = conn.getMetaData();
             Optional<UniqueConstraint> primaryKey =
-                    getPrimaryKey(metaData, getSchemaName(tablePath), getTableName(tablePath));
+                    getPrimaryKey(
+                            metaData,
+                            databaseName,
+                            getSchemaName(tablePath),
+                            getTableName(tablePath));
 
             PreparedStatement ps =
                     conn.prepareStatement(
diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
index 6e21f818860..de3dfe200b4 100644
--- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
+++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalog.java
@@ -152,7 +152,7 @@ public class MySqlCatalog extends AbstractJdbcCatalog {
 
     @Override
     protected String getSchemaName(ObjectPath tablePath) {
-        return null;
+        return tablePath.getDatabaseName();
     }
 
     @Override
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
index 0e2c48656e3..7f56525ab45 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogITCase.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.connector.jdbc.catalog;
 
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.ObjectPath;
@@ -32,6 +34,8 @@ import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.math.BigDecimal;
 import java.sql.Date;
@@ -49,6 +53,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /** E2E test for {@link MySqlCatalog}. */
+@RunWith(Parameterized.class)
 public class MySqlCatalogITCase extends MySqlCatalogTestBase {
 
     private static final List<Row> ALL_TYPES_ROWS =
@@ -75,7 +80,6 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                             1L,
                             -1,
                             1L,
-                            "{\"k1\": \"v1\"}",
                             null,
                             "col_longtext",
                             null,
@@ -123,7 +127,6 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                             1L,
                             -1,
                             1L,
-                            "{\"k1\": \"v1\"}",
                             null,
                             "col_longtext",
                             null,
@@ -150,6 +153,7 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                             Timestamp.valueOf("2021-08-04 01:53:19.098").toLocalDateTime(),
                             null));
 
+    private final MySqlCatalog catalog;
     private TableEnvironment tEnv;
 
     @Before
@@ -162,6 +166,15 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
         tEnv.useCatalog(TEST_CATALOG_NAME);
     }
 
+    public MySqlCatalogITCase(String version) {
+        catalog = CATALOGS.get(version);
+    }
+
+    @Parameterized.Parameters(name = "version = {0}")
+    public static String[] params() {
+        return DOCKER_IMAGE_NAMES.toArray(new String[0]);
+    }
+
     // ------ databases ------
 
     @Test
@@ -179,7 +192,7 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
     @Test
     public void testListDatabases() {
         List<String> actual = catalog.listDatabases();
-        assertEquals(Collections.singletonList(TEST_DB), actual);
+        assertEquals(Arrays.asList(TEST_DB, TEST_DB2), actual);
     }
 
     @Test
@@ -198,7 +211,8 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
                 Arrays.asList(
                         TEST_TABLE_ALL_TYPES,
                         TEST_SINK_TABLE_ALL_TYPES,
-                        TEST_TABLE_SINK_FROM_GROUPED_BY),
+                        TEST_TABLE_SINK_FROM_GROUPED_BY,
+                        TEST_TABLE_PK),
                 actual);
     }
 
@@ -251,6 +265,33 @@ public class MySqlCatalogITCase extends MySqlCatalogTestBase {
         assertEquals(TABLE_SCHEMA, table.getUnresolvedSchema());
     }
 
+    @Test
+    public void testGetTablePrimaryKey() throws TableNotExistException {
+        // test the PK of test.t_user
+        Schema tableSchemaTestPK1 =
+                Schema.newBuilder()
+                        .column("uid", DataTypes.BIGINT().notNull())
+                        .column("col_bigint", DataTypes.BIGINT())
+                        .primaryKeyNamed("PRIMARY", Collections.singletonList("uid"))
+                        .build();
+        CatalogBaseTable tablePK1 = catalog.getTable(new ObjectPath(TEST_DB, TEST_TABLE_PK));
+        assertEquals(
+                tableSchemaTestPK1.getPrimaryKey().get(),
+                tablePK1.getUnresolvedSchema().getPrimaryKey().get());
+
+        // test the PK of test2.t_user
+        Schema tableSchemaTestPK2 =
+                Schema.newBuilder()
+                        .column("pid", DataTypes.INT().notNull())
+                        .column("col_varchar", DataTypes.VARCHAR(255))
+                        .primaryKeyNamed("PRIMARY", Collections.singletonList("pid"))
+                        .build();
+        CatalogBaseTable tablePK2 = catalog.getTable(new ObjectPath(TEST_DB2, TEST_TABLE_PK));
+        assertEquals(
+                tableSchemaTestPK2.getPrimaryKey().get(),
+                tablePK2.getUnresolvedSchema().getPrimaryKey().get());
+    }
+
     // ------ test select query. ------
 
     @Test
diff --git a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
index d2e01c6bc39..c06d0d6c266 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
+++ b/flink-connectors/flink-connector-jdbc/src/test/java/org/apache/flink/connector/jdbc/catalog/MySqlCatalogTestBase.java
@@ -23,15 +23,18 @@ import org.apache.flink.table.api.Schema;
 
 import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
 
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.MySQLContainer;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.utility.DockerImageName;
 
+import java.sql.SQLException;
+import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /** Test base for {@link MySqlCatalog}. */
@@ -39,14 +42,17 @@ public class MySqlCatalogTestBase {
 
     public static final Logger LOG = LoggerFactory.getLogger(MySqlCatalogTestBase.class);
 
-    protected static final DockerImageName MYSQL_57_IMAGE = DockerImageName.parse("mysql:5.7.34");
+    protected static final List<String> DOCKER_IMAGE_NAMES =
+            Arrays.asList("mysql:5.6.51", "mysql:5.7.34", "mysql:8.0.16");
     protected static final String TEST_CATALOG_NAME = "mysql_catalog";
     protected static final String TEST_USERNAME = "mysql";
     protected static final String TEST_PWD = "mysql";
     protected static final String TEST_DB = "test";
+    protected static final String TEST_DB2 = "test2";
     protected static final String TEST_TABLE_ALL_TYPES = "t_all_types";
     protected static final String TEST_SINK_TABLE_ALL_TYPES = "t_all_types_sink";
     protected static final String TEST_TABLE_SINK_FROM_GROUPED_BY = "t_grouped_by_sink";
+    protected static final String TEST_TABLE_PK = "t_pk";
     protected static final String MYSQL_INIT_SCRIPT = "mysql-scripts/catalog-init-for-test.sql";
     protected static final Map<String, String> DEFAULT_CONTAINER_ENV_MAP =
             new HashMap<String, String>() {
@@ -77,7 +83,6 @@ public class MySqlCatalogTestBase {
                     .column("col_int_unsigned", DataTypes.BIGINT())
                     .column("col_integer", DataTypes.INT())
                     .column("col_integer_unsigned", DataTypes.BIGINT())
-                    .column("col_json", DataTypes.STRING())
                     .column("col_longblob", DataTypes.BYTES())
                     .column("col_longtext", DataTypes.STRING())
                     .column("col_mediumblob", DataTypes.BYTES())
@@ -106,23 +111,38 @@ public class MySqlCatalogTestBase {
                     .primaryKeyNamed("PRIMARY", Lists.newArrayList("pid"))
                     .build();
 
-    @ClassRule
-    public static final MySQLContainer<?> MYSQL_CONTAINER =
-            new MySQLContainer<>(MYSQL_57_IMAGE)
-                    .withUsername("root")
-                    .withPassword("")
-                    .withEnv(DEFAULT_CONTAINER_ENV_MAP)
-                    .withInitScript(MYSQL_INIT_SCRIPT)
-                    .withLogConsumer(new Slf4jLogConsumer(LOG));
-
-    protected static MySqlCatalog catalog;
+    public static final Map<String, MySQLContainer<?>> MYSQL_CONTAINERS = new HashMap<>();
+    public static final Map<String, MySqlCatalog> CATALOGS = new HashMap<>();
 
     @BeforeClass
-    public static void beforeAll() {
-        String baseUrl =
-                MYSQL_CONTAINER
-                        .getJdbcUrl()
-                        .substring(0, MYSQL_CONTAINER.getJdbcUrl().lastIndexOf("/"));
-        catalog = new MySqlCatalog(TEST_CATALOG_NAME, TEST_DB, TEST_USERNAME, TEST_PWD, baseUrl);
+    public static void beforeAll() throws SQLException {
+        for (String dockerImageName : DOCKER_IMAGE_NAMES) {
+            MySQLContainer<?> container =
+                    new MySQLContainer<>(DockerImageName.parse(dockerImageName))
+                            .withUsername("root")
+                            .withPassword("")
+                            .withEnv(DEFAULT_CONTAINER_ENV_MAP)
+                            .withInitScript(MYSQL_INIT_SCRIPT)
+                            .withLogConsumer(new Slf4jLogConsumer(LOG));
+            container.start();
+            MYSQL_CONTAINERS.put(dockerImageName, container);
+            CATALOGS.put(
+                    dockerImageName,
+                    new MySqlCatalog(
+                            TEST_CATALOG_NAME,
+                            TEST_DB,
+                            TEST_USERNAME,
+                            TEST_PWD,
+                            container
+                                    .getJdbcUrl()
+                                    .substring(0, container.getJdbcUrl().lastIndexOf("/"))));
+        }
+    }
+
+    @AfterClass
+    public static void cleanup() {
+        for (MySQLContainer<?> container : MYSQL_CONTAINERS.values()) {
+            container.stop();
+        }
     }
 }
diff --git a/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql b/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql
index a57dc9f4f3a..98f44b438dd 100644
--- a/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql
+++ b/flink-connectors/flink-connector-jdbc/src/test/resources/mysql-scripts/catalog-init-for-test.sql
@@ -17,8 +17,8 @@
  */
 
 /**
-* The test for mysql 5.7.X & 8.0.X versions.
-* The init script contains some types that are incompatible with 5.6.X or lower versions.
+* The test for mysql 5.6.X & 5.7.X & 8.0.X versions.
+* The init script contains some types that are incompatible with lower versions.
 */
 
 -- Creates test user info and grants privileges.
@@ -59,7 +59,6 @@ CREATE TABLE `t_all_types` (
   `col_int_unsigned` int(10) unsigned DEFAULT NULL,
   `col_integer` int(11) DEFAULT NULL,
   `col_integer_unsigned` int(10) unsigned DEFAULT NULL,
-  `col_json` json DEFAULT NULL,
   `col_longblob` longblob,
   `col_longtext` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_bin,
   `col_mediumblob` mediumblob,
@@ -91,8 +90,8 @@ CREATE TABLE `t_all_types` (
 -- ----------------------------
 -- Records of t_all_types
 -- ----------------------------
-INSERT INTO `t_all_types` VALUES (1, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:54:16', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, '{\"k1\": \"v1\"}', null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:54:16', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:54:16.463', '09:33:43.000', '2021-08-04 01:54:16.463', null);
-INSERT INTO `t_all_types` VALUES (2, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:53:19', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, '{\"k1\": \"v1\"}', null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1,set_ele12', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:53:19', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:53:19.098', '09:33:43.000', '2021-08-04 01:53:19.098', null);
+INSERT INTO `t_all_types` VALUES (1, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:54:16', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:54:16', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:54:16.463', '09:33:43.000', '2021-08-04 01:54:16.463', null);
+INSERT INTO `t_all_types` VALUES (2, -1, 1, null, b'1', null, 'hello', '2021-08-04', '2021-08-04 01:53:19', -1, 1, -1, 1, 'enum2', -9.1, 9.1, -1, 1, -1, 1, null, 'col_longtext', null, -1, 1, 'col_mediumtext', -99, 99, -1, 1, 'set_ele1,set_ele12', -1, 1, 'col_text', '10:32:34', '2021-08-04 01:53:19', 'col_tinytext', -1, 1, null, 'col_varchar', '2021-08-04 01:53:19.098', '09:33:43.000', '2021-08-04 01:53:19.098', null);
 
 -- Create test table t_all_types_sink.
 DROP TABLE IF EXISTS `t_all_types_sink`;
@@ -105,3 +104,26 @@ CREATE TABLE `t_grouped_by_sink` (
   `col_bigint` bigint(20) DEFAULT NULL,
   PRIMARY KEY (`pid`) USING BTREE
 ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+-- Create test table t_pk.
+DROP TABLE IF EXISTS `t_pk`;
+CREATE TABLE `t_pk` (
+  `uid` bigint(20) NOT NULL AUTO_INCREMENT,
+  `col_bigint` bigint(20) DEFAULT NULL,
+  PRIMARY KEY (`uid`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;
+
+-- Create the `test2` database.
+DROP DATABASE IF EXISTS `test2`;
+CREATE DATABASE `test2` CHARSET=utf8;
+
+-- Uses `test2` database.
+use `test2`;
+
+-- Create test table t_pk.
+DROP TABLE IF EXISTS `t_pk`;
+CREATE TABLE `t_pk` (
+  `pid` int(11) NOT NULL AUTO_INCREMENT,
+  `col_varchar` varchar(255) DEFAULT NULL,
+  PRIMARY KEY (`pid`)
+) ENGINE=InnoDB DEFAULT CHARSET=utf8;